You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/11/02 01:07:34 UTC

spark git commit: [SPARK-18025] Use commit protocol API in structured streaming

Repository: spark
Updated Branches:
  refs/heads/master 91c33a0ca -> 77a98162d


[SPARK-18025] Use commit protocol API in structured streaming

## What changes were proposed in this pull request?
This patch adds a new commit protocol implementation ManifestFileCommitProtocol that follows the existing streaming flow, and uses it in FileStreamSink to consolidate the write path in structured streaming with the batch mode write path.

This deletes a lot of code, and would make it trivial to support other functionalities that are currently available in batch but not in streaming, including all file formats and bucketing.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin <rx...@databricks.com>

Closes #15710 from rxin/SPARK-18025.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77a98162
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77a98162
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77a98162

Branch: refs/heads/master
Commit: 77a98162d1ec28247053b8b3ad4af28baa950797
Parents: 91c33a0
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Nov 1 18:06:57 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Nov 1 18:06:57 2016 -0700

----------------------------------------------------------------------
 .../datasources/FileCommitProtocol.scala        |  11 +-
 .../sql/execution/datasources/FileFormat.scala  |  14 -
 .../datasources/FileFormatWriter.scala          | 400 ++++++++++++++++++
 .../InsertIntoHadoopFsRelationCommand.scala     |  25 +-
 .../sql/execution/datasources/WriteOutput.scala | 406 -------------------
 .../datasources/parquet/ParquetFileFormat.scala |  11 -
 .../parquet/ParquetOutputWriter.scala           | 116 +-----
 .../execution/streaming/FileStreamSink.scala    | 229 ++---------
 .../streaming/ManifestFileCommitProtocol.scala  | 114 ++++++
 .../org/apache/spark/sql/internal/SQLConf.scala |   3 +-
 .../sql/streaming/FileStreamSinkSuite.scala     | 106 +----
 11 files changed, 567 insertions(+), 868 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
index 1ce9ae4..f5dd5ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
@@ -32,9 +32,9 @@ import org.apache.spark.util.Utils
 
 
 object FileCommitProtocol {
-  class TaskCommitMessage(obj: Any) extends Serializable
+  class TaskCommitMessage(val obj: Any) extends Serializable
 
-  object EmptyTaskCommitMessage extends TaskCommitMessage(Unit)
+  object EmptyTaskCommitMessage extends TaskCommitMessage(null)
 
   /**
    * Instantiates a FileCommitProtocol using the given className.
@@ -62,8 +62,11 @@ object FileCommitProtocol {
 
 
 /**
- * An interface to define how a Spark job commits its outputs. Implementations must be serializable,
- * as the committer instance instantiated on the driver will be used for tasks on executors.
+ * An interface to define how a single Spark job commits its outputs. Two notes:
+ *
+ * 1. Implementations must be serializable, as the committer instance instantiated on the driver
+ *    will be used for tasks on executors.
+ * 2. A committer should not be reused across multiple Spark jobs.
  *
  * The proper call sequence is:
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 9d153ce..4f4aaaa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -56,20 +56,6 @@ trait FileFormat {
       dataSchema: StructType): OutputWriterFactory
 
   /**
-   * Returns a [[OutputWriterFactory]] for generating output writers that can write data.
-   * This method is current used only by FileStreamSinkWriter to generate output writers that
-   * does not use output committers to write data. The OutputWriter generated by the returned
-   * [[OutputWriterFactory]] must implement the method `newWriter(path)`..
-   */
-  def buildWriter(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      options: Map[String, String]): OutputWriterFactory = {
-    // TODO: Remove this default implementation when the other formats have been ported
-    throw new UnsupportedOperationException(s"buildWriter is not supported for $this")
-  }
-
-  /**
    * Returns whether this format support returning columnar batch or not.
    *
    * TODO: we should just have different traits for the different formats.

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
new file mode 100644
index 0000000..bc00a0a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.{Date, UUID}
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter}
+import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
+
+
+/** A helper object for writing FileFormat data out to a location. */
+object FileFormatWriter extends Logging {
+
+  /** A shared job description for all the write tasks. */
+  private class WriteJobDescription(
+      val uuid: String,  // prevent collision between different (appending) write jobs
+      val serializableHadoopConf: SerializableConfiguration,
+      val outputWriterFactory: OutputWriterFactory,
+      val allColumns: Seq[Attribute],
+      val partitionColumns: Seq[Attribute],
+      val nonPartitionColumns: Seq[Attribute],
+      val bucketSpec: Option[BucketSpec],
+      val path: String)
+    extends Serializable {
+
+    assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ nonPartitionColumns),
+      s"""
+         |All columns: ${allColumns.mkString(", ")}
+         |Partition columns: ${partitionColumns.mkString(", ")}
+         |Non-partition columns: ${nonPartitionColumns.mkString(", ")}
+       """.stripMargin)
+  }
+
+  /**
+   * Basic work flow of this command is:
+   * 1. Driver side setup, including output committer initialization and data source specific
+   *    preparation work for the write job to be issued.
+   * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
+   *    rows within an RDD partition.
+   * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task;  If any
+   *    exception is thrown during task commitment, also aborts that task.
+   * 4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
+   *    thrown during job commitment, also aborts the job.
+   */
+  def write(
+      sparkSession: SparkSession,
+      plan: LogicalPlan,
+      fileFormat: FileFormat,
+      committer: FileCommitProtocol,
+      outputPath: String,
+      hadoopConf: Configuration,
+      partitionColumns: Seq[Attribute],
+      bucketSpec: Option[BucketSpec],
+      refreshFunction: (Seq[TablePartitionSpec]) => Unit,
+      options: Map[String, String]): Unit = {
+
+    val job = Job.getInstance(hadoopConf)
+    job.setOutputKeyClass(classOf[Void])
+    job.setOutputValueClass(classOf[InternalRow])
+    FileOutputFormat.setOutputPath(job, new Path(outputPath))
+
+    val partitionSet = AttributeSet(partitionColumns)
+    val dataColumns = plan.output.filterNot(partitionSet.contains)
+    val queryExecution = Dataset.ofRows(sparkSession, plan).queryExecution
+
+    // Note: prepareWrite has side effect. It sets "job".
+    val outputWriterFactory =
+      fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)
+
+    val description = new WriteJobDescription(
+      uuid = UUID.randomUUID().toString,
+      serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
+      outputWriterFactory = outputWriterFactory,
+      allColumns = plan.output,
+      partitionColumns = partitionColumns,
+      nonPartitionColumns = dataColumns,
+      bucketSpec = bucketSpec,
+      path = outputPath)
+
+    SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
+      // This call shouldn't be put into the `try` block below because it only initializes and
+      // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
+      committer.setupJob(job)
+
+      try {
+        val ret = sparkSession.sparkContext.runJob(queryExecution.toRdd,
+          (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
+            executeTask(
+              description = description,
+              sparkStageId = taskContext.stageId(),
+              sparkPartitionId = taskContext.partitionId(),
+              sparkAttemptNumber = taskContext.attemptNumber(),
+              committer,
+              iterator = iter)
+          })
+
+        val commitMsgs = ret.map(_._1)
+        val updatedPartitions = ret.flatMap(_._2).distinct.map(PartitioningUtils.parsePathFragment)
+
+        committer.commitJob(job, commitMsgs)
+        logInfo(s"Job ${job.getJobID} committed.")
+        refreshFunction(updatedPartitions)
+      } catch { case cause: Throwable =>
+        logError(s"Aborting job ${job.getJobID}.", cause)
+        committer.abortJob(job)
+        throw new SparkException("Job aborted.", cause)
+      }
+    }
+  }
+
+  /** Writes data out in a single Spark task. */
+  private def executeTask(
+      description: WriteJobDescription,
+      sparkStageId: Int,
+      sparkPartitionId: Int,
+      sparkAttemptNumber: Int,
+      committer: FileCommitProtocol,
+      iterator: Iterator[InternalRow]): (TaskCommitMessage, Set[String]) = {
+
+    val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId)
+    val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
+    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
+
+    // Set up the attempt context required to use in the output committer.
+    val taskAttemptContext: TaskAttemptContext = {
+      // Set up the configuration object
+      val hadoopConf = description.serializableHadoopConf.value
+      hadoopConf.set("mapred.job.id", jobId.toString)
+      hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
+      hadoopConf.set("mapred.task.id", taskAttemptId.toString)
+      hadoopConf.setBoolean("mapred.task.is.map", true)
+      hadoopConf.setInt("mapred.task.partition", 0)
+
+      new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
+    }
+
+    committer.setupTask(taskAttemptContext)
+
+    val writeTask =
+      if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
+        new SingleDirectoryWriteTask(description, taskAttemptContext, committer)
+      } else {
+        new DynamicPartitionWriteTask(description, taskAttemptContext, committer)
+      }
+
+    try {
+      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+        // Execute the task to write rows out and commit the task.
+        val outputPartitions = writeTask.execute(iterator)
+        writeTask.releaseResources()
+        (committer.commitTask(taskAttemptContext), outputPartitions)
+      })(catchBlock = {
+        // If there is an error, release resource and then abort the task
+        try {
+          writeTask.releaseResources()
+        } finally {
+          committer.abortTask(taskAttemptContext)
+          logError(s"Job $jobId aborted.")
+        }
+      })
+    } catch {
+      case t: Throwable =>
+        throw new SparkException("Task failed while writing rows", t)
+    }
+  }
+
+  /**
+   * A simple trait for writing out data in a single Spark task, without any concerns about how
+   * to commit or abort tasks. Exceptions thrown by the implementation of this trait will
+   * automatically trigger task aborts.
+   */
+  private trait ExecuteWriteTask {
+    /**
+     * Writes data out to files, and then returns the list of partition strings written out.
+     * The list of partitions is sent back to the driver and used to update the catalog.
+     */
+    def execute(iterator: Iterator[InternalRow]): Set[String]
+    def releaseResources(): Unit
+  }
+
+  /** Writes data to a single directory (used for non-dynamic-partition writes). */
+  private class SingleDirectoryWriteTask(
+      description: WriteJobDescription,
+      taskAttemptContext: TaskAttemptContext,
+      committer: FileCommitProtocol) extends ExecuteWriteTask {
+
+    private[this] var outputWriter: OutputWriter = {
+      val tmpFilePath = committer.newTaskTempFile(
+        taskAttemptContext,
+        None,
+        description.outputWriterFactory.getFileExtension(taskAttemptContext))
+
+      val outputWriter = description.outputWriterFactory.newInstance(
+        path = tmpFilePath,
+        dataSchema = description.nonPartitionColumns.toStructType,
+        context = taskAttemptContext)
+      outputWriter.initConverter(dataSchema = description.nonPartitionColumns.toStructType)
+      outputWriter
+    }
+
+    override def execute(iter: Iterator[InternalRow]): Set[String] = {
+      while (iter.hasNext) {
+        val internalRow = iter.next()
+        outputWriter.writeInternal(internalRow)
+      }
+      Set.empty
+    }
+
+    override def releaseResources(): Unit = {
+      if (outputWriter != null) {
+        outputWriter.close()
+        outputWriter = null
+      }
+    }
+  }
+
+  /**
+   * Writes data to using dynamic partition writes, meaning this single function can write to
+   * multiple directories (partitions) or files (bucketing).
+   */
+  private class DynamicPartitionWriteTask(
+      description: WriteJobDescription,
+      taskAttemptContext: TaskAttemptContext,
+      committer: FileCommitProtocol) extends ExecuteWriteTask {
+
+    // currentWriter is initialized whenever we see a new key
+    private var currentWriter: OutputWriter = _
+
+    private val bucketColumns: Seq[Attribute] = description.bucketSpec.toSeq.flatMap {
+      spec => spec.bucketColumnNames.map(c => description.allColumns.find(_.name == c).get)
+    }
+
+    private val sortColumns: Seq[Attribute] = description.bucketSpec.toSeq.flatMap {
+      spec => spec.sortColumnNames.map(c => description.allColumns.find(_.name == c).get)
+    }
+
+    private def bucketIdExpression: Option[Expression] = description.bucketSpec.map { spec =>
+      // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
+      // guarantee the data distribution is same between shuffle and bucketed data source, which
+      // enables us to only shuffle one side when join a bucketed table and a normal one.
+      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
+    }
+
+    /** Expressions that given a partition key build a string like: col1=val/col2=val/... */
+    private def partitionStringExpression: Seq[Expression] = {
+      description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
+        val escaped = ScalaUDF(
+          PartitioningUtils.escapePathName _,
+          StringType,
+          Seq(Cast(c, StringType)),
+          Seq(StringType))
+        val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped)
+        val partitionName = Literal(c.name + "=") :: str :: Nil
+        if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
+      }
+    }
+
+    /**
+     * Open and returns a new OutputWriter given a partition key and optional bucket id.
+     * If bucket id is specified, we will append it to the end of the file name, but before the
+     * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
+     */
+    private def newOutputWriter(key: InternalRow, partString: UnsafeProjection): OutputWriter = {
+      val partDir =
+        if (description.partitionColumns.isEmpty) None else Option(partString(key).getString(0))
+
+      // If the bucket spec is defined, the bucket column is right after the partition columns
+      val bucketId = if (description.bucketSpec.isDefined) {
+        BucketingUtils.bucketIdToString(key.getInt(description.partitionColumns.length))
+      } else {
+        ""
+      }
+      val ext = bucketId + description.outputWriterFactory.getFileExtension(taskAttemptContext)
+
+      val path = committer.newTaskTempFile(taskAttemptContext, partDir, ext)
+      val newWriter = description.outputWriterFactory.newInstance(
+        path = path,
+        dataSchema = description.nonPartitionColumns.toStructType,
+        context = taskAttemptContext)
+      newWriter.initConverter(description.nonPartitionColumns.toStructType)
+      newWriter
+    }
+
+    override def execute(iter: Iterator[InternalRow]): Set[String] = {
+      // We should first sort by partition columns, then bucket id, and finally sorting columns.
+      val sortingExpressions: Seq[Expression] =
+        description.partitionColumns ++ bucketIdExpression ++ sortColumns
+      val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)
+
+      val sortingKeySchema = StructType(sortingExpressions.map {
+        case a: Attribute => StructField(a.name, a.dataType, a.nullable)
+        // The sorting expressions are all `Attribute` except bucket id.
+        case _ => StructField("bucketId", IntegerType, nullable = false)
+      })
+
+      // Returns the data columns to be written given an input row
+      val getOutputRow = UnsafeProjection.create(
+        description.nonPartitionColumns, description.allColumns)
+
+      // Returns the partition path given a partition key.
+      val getPartitionString = UnsafeProjection.create(
+        Seq(Concat(partitionStringExpression)), description.partitionColumns)
+
+      // Sorts the data before write, so that we only need one writer at the same time.
+      val sorter = new UnsafeKVExternalSorter(
+        sortingKeySchema,
+        StructType.fromAttributes(description.nonPartitionColumns),
+        SparkEnv.get.blockManager,
+        SparkEnv.get.serializerManager,
+        TaskContext.get().taskMemoryManager().pageSizeBytes,
+        SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+          UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
+
+      while (iter.hasNext) {
+        val currentRow = iter.next()
+        sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
+      }
+      logInfo(s"Sorting complete. Writing out partition files one at a time.")
+
+      val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) {
+        identity
+      } else {
+        UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map {
+          case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable)
+        })
+      }
+
+      val sortedIterator = sorter.sortedIterator()
+
+      // If anything below fails, we should abort the task.
+      var currentKey: UnsafeRow = null
+      val updatedPartitions = mutable.Set[String]()
+      while (sortedIterator.next()) {
+        val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
+        if (currentKey != nextKey) {
+          if (currentWriter != null) {
+            currentWriter.close()
+            currentWriter = null
+          }
+          currentKey = nextKey.copy()
+          logDebug(s"Writing partition: $currentKey")
+
+          currentWriter = newOutputWriter(currentKey, getPartitionString)
+          val partitionPath = getPartitionString(currentKey).getString(0)
+          if (partitionPath.nonEmpty) {
+            updatedPartitions.add(partitionPath)
+          }
+        }
+        currentWriter.writeInternal(sortedIterator.getValue)
+      }
+      if (currentWriter != null) {
+        currentWriter.close()
+        currentWriter = null
+      }
+      updatedPartitions.toSet
+    }
+
+    override def releaseResources(): Unit = {
+      if (currentWriter != null) {
+        currentWriter.close()
+        currentWriter = null
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index a1221d0..230c74a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -84,17 +84,22 @@ case class InsertIntoHadoopFsRelationCommand(
     val isAppend = pathExists && (mode == SaveMode.Append)
 
     if (doInsertion) {
-      WriteOutput.write(
-        sparkSession,
-        query,
-        fileFormat,
-        qualifiedOutputPath,
-        hadoopConf,
-        partitionColumns,
-        bucketSpec,
-        refreshFunction,
-        options,
+      val committer = FileCommitProtocol.instantiate(
+        sparkSession.sessionState.conf.fileCommitProtocolClass,
+        outputPath.toString,
         isAppend)
+
+      FileFormatWriter.write(
+        sparkSession = sparkSession,
+        plan = query,
+        fileFormat = fileFormat,
+        committer = committer,
+        outputPath = qualifiedOutputPath.toString,
+        hadoopConf = hadoopConf,
+        partitionColumns = partitionColumns,
+        bucketSpec = bucketSpec,
+        refreshFunction = refreshFunction,
+        options = options)
     } else {
       logInfo("Skipping insertion into a relation that already exists.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
deleted file mode 100644
index a078551..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import java.util.{Date, UUID}
-
-import scala.collection.mutable
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter}
-import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
-
-
-/** A helper object for writing data out to a location. */
-object WriteOutput extends Logging {
-
-  /** A shared job description for all the write tasks. */
-  private class WriteJobDescription(
-      val uuid: String,  // prevent collision between different (appending) write jobs
-      val serializableHadoopConf: SerializableConfiguration,
-      val outputWriterFactory: OutputWriterFactory,
-      val allColumns: Seq[Attribute],
-      val partitionColumns: Seq[Attribute],
-      val nonPartitionColumns: Seq[Attribute],
-      val bucketSpec: Option[BucketSpec],
-      val isAppend: Boolean,
-      val path: String)
-    extends Serializable {
-
-    assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ nonPartitionColumns),
-      s"""
-         |All columns: ${allColumns.mkString(", ")}
-         |Partition columns: ${partitionColumns.mkString(", ")}
-         |Non-partition columns: ${nonPartitionColumns.mkString(", ")}
-       """.stripMargin)
-  }
-
-  /**
-   * Basic work flow of this command is:
-   * 1. Driver side setup, including output committer initialization and data source specific
-   *    preparation work for the write job to be issued.
-   * 2. Issues a write job consists of one or more executor side tasks, each of which writes all
-   *    rows within an RDD partition.
-   * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task;  If any
-   *    exception is thrown during task commitment, also aborts that task.
-   * 4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
-   *    thrown during job commitment, also aborts the job.
-   */
-  def write(
-      sparkSession: SparkSession,
-      plan: LogicalPlan,
-      fileFormat: FileFormat,
-      outputPath: Path,
-      hadoopConf: Configuration,
-      partitionColumns: Seq[Attribute],
-      bucketSpec: Option[BucketSpec],
-      refreshFunction: (Seq[TablePartitionSpec]) => Unit,
-      options: Map[String, String],
-      isAppend: Boolean): Unit = {
-
-    val job = Job.getInstance(hadoopConf)
-    job.setOutputKeyClass(classOf[Void])
-    job.setOutputValueClass(classOf[InternalRow])
-    FileOutputFormat.setOutputPath(job, outputPath)
-
-    val partitionSet = AttributeSet(partitionColumns)
-    val dataColumns = plan.output.filterNot(partitionSet.contains)
-    val queryExecution = Dataset.ofRows(sparkSession, plan).queryExecution
-
-    // Note: prepareWrite has side effect. It sets "job".
-    val outputWriterFactory =
-      fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)
-
-    val description = new WriteJobDescription(
-      uuid = UUID.randomUUID().toString,
-      serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
-      outputWriterFactory = outputWriterFactory,
-      allColumns = plan.output,
-      partitionColumns = partitionColumns,
-      nonPartitionColumns = dataColumns,
-      bucketSpec = bucketSpec,
-      isAppend = isAppend,
-      path = outputPath.toString)
-
-    SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
-      // This call shouldn't be put into the `try` block below because it only initializes and
-      // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
-      val committer = FileCommitProtocol.instantiate(
-        sparkSession.sessionState.conf.fileCommitProtocolClass,
-        outputPath.toString,
-        isAppend)
-      committer.setupJob(job)
-
-      try {
-        val ret = sparkSession.sparkContext.runJob(queryExecution.toRdd,
-          (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
-            executeTask(
-              description = description,
-              sparkStageId = taskContext.stageId(),
-              sparkPartitionId = taskContext.partitionId(),
-              sparkAttemptNumber = taskContext.attemptNumber(),
-              committer,
-              iterator = iter)
-          })
-
-        val commitMsgs = ret.map(_._1)
-        val updatedPartitions = ret.flatMap(_._2).distinct.map(PartitioningUtils.parsePathFragment)
-
-        committer.commitJob(job, commitMsgs)
-        logInfo(s"Job ${job.getJobID} committed.")
-        refreshFunction(updatedPartitions)
-      } catch { case cause: Throwable =>
-        logError(s"Aborting job ${job.getJobID}.", cause)
-        committer.abortJob(job)
-        throw new SparkException("Job aborted.", cause)
-      }
-    }
-  }
-
-  /** Writes data out in a single Spark task. */
-  private def executeTask(
-      description: WriteJobDescription,
-      sparkStageId: Int,
-      sparkPartitionId: Int,
-      sparkAttemptNumber: Int,
-      committer: FileCommitProtocol,
-      iterator: Iterator[InternalRow]): (TaskCommitMessage, Set[String]) = {
-
-    val jobId = SparkHadoopWriter.createJobID(new Date, sparkStageId)
-    val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
-    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
-
-    // Set up the attempt context required to use in the output committer.
-    val taskAttemptContext: TaskAttemptContext = {
-      // Set up the configuration object
-      val hadoopConf = description.serializableHadoopConf.value
-      hadoopConf.set("mapred.job.id", jobId.toString)
-      hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
-      hadoopConf.set("mapred.task.id", taskAttemptId.toString)
-      hadoopConf.setBoolean("mapred.task.is.map", true)
-      hadoopConf.setInt("mapred.task.partition", 0)
-
-      new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
-    }
-
-    committer.setupTask(taskAttemptContext)
-
-    val writeTask =
-      if (description.partitionColumns.isEmpty && description.bucketSpec.isEmpty) {
-        new SingleDirectoryWriteTask(description, taskAttemptContext, committer)
-      } else {
-        new DynamicPartitionWriteTask(description, taskAttemptContext, committer)
-      }
-
-    try {
-      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-        // Execute the task to write rows out and commit the task.
-        val outputPartitions = writeTask.execute(iterator)
-        writeTask.releaseResources()
-        (committer.commitTask(taskAttemptContext), outputPartitions)
-      })(catchBlock = {
-        // If there is an error, release resource and then abort the task
-        try {
-          writeTask.releaseResources()
-        } finally {
-          committer.abortTask(taskAttemptContext)
-          logError(s"Job $jobId aborted.")
-        }
-      })
-    } catch {
-      case t: Throwable =>
-        throw new SparkException("Task failed while writing rows", t)
-    }
-  }
-
-  /**
-   * A simple trait for writing out data in a single Spark task, without any concerns about how
-   * to commit or abort tasks. Exceptions thrown by the implementation of this trait will
-   * automatically trigger task aborts.
-   */
-  private trait ExecuteWriteTask {
-    /**
-     * Writes data out to files, and then returns the list of partition strings written out.
-     * The list of partitions is sent back to the driver and used to update the catalog.
-     */
-    def execute(iterator: Iterator[InternalRow]): Set[String]
-    def releaseResources(): Unit
-  }
-
-  /** Writes data to a single directory (used for non-dynamic-partition writes). */
-  private class SingleDirectoryWriteTask(
-      description: WriteJobDescription,
-      taskAttemptContext: TaskAttemptContext,
-      committer: FileCommitProtocol) extends ExecuteWriteTask {
-
-    private[this] var outputWriter: OutputWriter = {
-      val tmpFilePath = committer.newTaskTempFile(
-        taskAttemptContext,
-        None,
-        description.outputWriterFactory.getFileExtension(taskAttemptContext))
-
-      val outputWriter = description.outputWriterFactory.newInstance(
-        path = tmpFilePath,
-        dataSchema = description.nonPartitionColumns.toStructType,
-        context = taskAttemptContext)
-      outputWriter.initConverter(dataSchema = description.nonPartitionColumns.toStructType)
-      outputWriter
-    }
-
-    override def execute(iter: Iterator[InternalRow]): Set[String] = {
-      while (iter.hasNext) {
-        val internalRow = iter.next()
-        outputWriter.writeInternal(internalRow)
-      }
-      Set.empty
-    }
-
-    override def releaseResources(): Unit = {
-      if (outputWriter != null) {
-        outputWriter.close()
-        outputWriter = null
-      }
-    }
-  }
-
-  /**
-   * Writes data to using dynamic partition writes, meaning this single function can write to
-   * multiple directories (partitions) or files (bucketing).
-   */
-  private class DynamicPartitionWriteTask(
-      description: WriteJobDescription,
-      taskAttemptContext: TaskAttemptContext,
-      committer: FileCommitProtocol) extends ExecuteWriteTask {
-
-    // currentWriter is initialized whenever we see a new key
-    private var currentWriter: OutputWriter = _
-
-    private val bucketColumns: Seq[Attribute] = description.bucketSpec.toSeq.flatMap {
-      spec => spec.bucketColumnNames.map(c => description.allColumns.find(_.name == c).get)
-    }
-
-    private val sortColumns: Seq[Attribute] = description.bucketSpec.toSeq.flatMap {
-      spec => spec.sortColumnNames.map(c => description.allColumns.find(_.name == c).get)
-    }
-
-    private def bucketIdExpression: Option[Expression] = description.bucketSpec.map { spec =>
-      // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
-      // guarantee the data distribution is same between shuffle and bucketed data source, which
-      // enables us to only shuffle one side when join a bucketed table and a normal one.
-      HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
-    }
-
-    /** Expressions that given a partition key build a string like: col1=val/col2=val/... */
-    private def partitionStringExpression: Seq[Expression] = {
-      description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
-        val escaped = ScalaUDF(
-          PartitioningUtils.escapePathName _,
-          StringType,
-          Seq(Cast(c, StringType)),
-          Seq(StringType))
-        val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped)
-        val partitionName = Literal(c.name + "=") :: str :: Nil
-        if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
-      }
-    }
-
-    /**
-     * Open and returns a new OutputWriter given a partition key and optional bucket id.
-     * If bucket id is specified, we will append it to the end of the file name, but before the
-     * file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
-     */
-    private def newOutputWriter(key: InternalRow, partString: UnsafeProjection): OutputWriter = {
-      val partDir =
-        if (description.partitionColumns.isEmpty) None else Option(partString(key).getString(0))
-
-      // If the bucket spec is defined, the bucket column is right after the partition columns
-      val bucketId = if (description.bucketSpec.isDefined) {
-        BucketingUtils.bucketIdToString(key.getInt(description.partitionColumns.length))
-      } else {
-        ""
-      }
-      val ext = bucketId + description.outputWriterFactory.getFileExtension(taskAttemptContext)
-
-      val path = committer.newTaskTempFile(taskAttemptContext, partDir, ext)
-      val newWriter = description.outputWriterFactory.newInstance(
-        path = path,
-        dataSchema = description.nonPartitionColumns.toStructType,
-        context = taskAttemptContext)
-      newWriter.initConverter(description.nonPartitionColumns.toStructType)
-      newWriter
-    }
-
-    override def execute(iter: Iterator[InternalRow]): Set[String] = {
-      // We should first sort by partition columns, then bucket id, and finally sorting columns.
-      val sortingExpressions: Seq[Expression] =
-        description.partitionColumns ++ bucketIdExpression ++ sortColumns
-      val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)
-
-      val sortingKeySchema = StructType(sortingExpressions.map {
-        case a: Attribute => StructField(a.name, a.dataType, a.nullable)
-        // The sorting expressions are all `Attribute` except bucket id.
-        case _ => StructField("bucketId", IntegerType, nullable = false)
-      })
-
-      // Returns the data columns to be written given an input row
-      val getOutputRow = UnsafeProjection.create(
-        description.nonPartitionColumns, description.allColumns)
-
-      // Returns the partition path given a partition key.
-      val getPartitionString = UnsafeProjection.create(
-        Seq(Concat(partitionStringExpression)), description.partitionColumns)
-
-      // Sorts the data before write, so that we only need one writer at the same time.
-      val sorter = new UnsafeKVExternalSorter(
-        sortingKeySchema,
-        StructType.fromAttributes(description.nonPartitionColumns),
-        SparkEnv.get.blockManager,
-        SparkEnv.get.serializerManager,
-        TaskContext.get().taskMemoryManager().pageSizeBytes,
-        SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
-          UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
-
-      while (iter.hasNext) {
-        val currentRow = iter.next()
-        sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
-      }
-      logInfo(s"Sorting complete. Writing out partition files one at a time.")
-
-      val getBucketingKey: InternalRow => InternalRow = if (sortColumns.isEmpty) {
-        identity
-      } else {
-        UnsafeProjection.create(sortingExpressions.dropRight(sortColumns.length).zipWithIndex.map {
-          case (expr, ordinal) => BoundReference(ordinal, expr.dataType, expr.nullable)
-        })
-      }
-
-      val sortedIterator = sorter.sortedIterator()
-
-      // If anything below fails, we should abort the task.
-      var currentKey: UnsafeRow = null
-      val updatedPartitions = mutable.Set[String]()
-      while (sortedIterator.next()) {
-        val nextKey = getBucketingKey(sortedIterator.getKey).asInstanceOf[UnsafeRow]
-        if (currentKey != nextKey) {
-          if (currentWriter != null) {
-            currentWriter.close()
-            currentWriter = null
-          }
-          currentKey = nextKey.copy()
-          logDebug(s"Writing partition: $currentKey")
-
-          currentWriter = newOutputWriter(currentKey, getPartitionString)
-          val partitionPath = getPartitionString(currentKey).getString(0)
-          if (partitionPath.nonEmpty) {
-            updatedPartitions.add(partitionPath)
-          }
-        }
-        currentWriter.writeInternal(sortedIterator.getValue)
-      }
-      if (currentWriter != null) {
-        currentWriter.close()
-        currentWriter = null
-      }
-      updatedPartitions.toSet
-    }
-
-    override def releaseResources(): Unit = {
-      if (currentWriter != null) {
-        currentWriter.close()
-        currentWriter = null
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 77c83ba..b8ea7f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -415,17 +415,6 @@ class ParquetFileFormat
       }
     }
   }
-
-  override def buildWriter(
-      sqlContext: SQLContext,
-      dataSchema: StructType,
-      options: Map[String, String]): OutputWriterFactory = {
-    new ParquetOutputWriterFactory(
-      sqlContext.conf,
-      dataSchema,
-      sqlContext.sessionState.newHadoopConf(),
-      options)
-  }
 }
 
 object ParquetFileFormat extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
index 92d4f27..5c0f8af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
@@ -17,125 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetRecordWriter}
-import org.apache.parquet.hadoop.codec.CodecConfig
-import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SerializableConfiguration
-
-
-/**
- * A factory for generating OutputWriters for writing parquet files. This implemented is different
- * from the [[ParquetOutputWriter]] as this does not use any [[OutputCommitter]]. It simply
- * writes the data to the path used to generate the output writer. Callers of this factory
- * has to ensure which files are to be considered as committed.
- */
-private[parquet] class ParquetOutputWriterFactory(
-    sqlConf: SQLConf,
-    dataSchema: StructType,
-    hadoopConf: Configuration,
-    options: Map[String, String])
-  extends OutputWriterFactory {
-
-  private val serializableConf: SerializableConfiguration = {
-    val job = Job.getInstance(hadoopConf)
-    val conf = ContextUtil.getConfiguration(job)
-    val parquetOptions = new ParquetOptions(options, sqlConf)
-
-    // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
-    // it in `ParquetOutputWriter` to support appending and dynamic partitioning.  The reason why
-    // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
-    // bundled with `ParquetOutputFormat[Row]`.
-    job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
-
-    ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
-
-    // We want to clear this temporary metadata from saving into Parquet file.
-    // This metadata is only useful for detecting optional columns when pushing down filters.
-    val dataSchemaToWrite = StructType.removeMetadata(
-      StructType.metadataKeyForOptionalField,
-      dataSchema).asInstanceOf[StructType]
-    ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
-
-    // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
-    // and `CatalystWriteSupport` (writing actual rows to Parquet files).
-    conf.set(
-      SQLConf.PARQUET_BINARY_AS_STRING.key,
-      sqlConf.isParquetBinaryAsString.toString)
-
-    conf.set(
-      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-      sqlConf.isParquetINT96AsTimestamp.toString)
-
-    conf.set(
-      SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
-      sqlConf.writeLegacyParquetFormat.toString)
-
-    // Sets compression scheme
-    conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
-    new SerializableConfiguration(conf)
-  }
-
-  /**
-   * Returns a [[OutputWriter]] that writes data to the give path without using
-   * [[OutputCommitter]].
-   */
-  override def newWriter(path: String): OutputWriter = new OutputWriter {
-
-    // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter
-    private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0)
-    private val hadoopAttemptContext = new TaskAttemptContextImpl(
-      serializableConf.value, hadoopTaskAttemptId)
-
-    // Instance of ParquetRecordWriter that does not use OutputCommitter
-    private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext)
-
-    override def write(row: Row): Unit = {
-      throw new UnsupportedOperationException("call writeInternal")
-    }
-
-    protected[sql] override def writeInternal(row: InternalRow): Unit = {
-      recordWriter.write(null, row)
-    }
-
-    override def close(): Unit = recordWriter.close(hadoopAttemptContext)
-  }
-
-  /** Create a [[ParquetRecordWriter]] that writes the given path without using OutputCommitter */
-  private def createNoCommitterRecordWriter(
-      path: String,
-      hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, InternalRow] = {
-    // Custom ParquetOutputFormat that disable use of committer and writes to the given path
-    val outputFormat = new ParquetOutputFormat[InternalRow]() {
-      override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter = { null }
-      override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): Path = { new Path(path) }
-    }
-    outputFormat.getRecordWriter(hadoopAttemptContext)
-  }
-
-  /** Disable the use of the older API. */
-  override def newInstance(
-      path: String,
-      dataSchema: StructType,
-      context: TaskAttemptContext): OutputWriter = {
-    throw new UnsupportedOperationException("this version of newInstance not supported for " +
-        "ParquetOutputWriterFactory")
-  }
-
-  override def getFileExtension(context: TaskAttemptContext): String = {
-    CodecConfig.from(context).getCodec.getExtension + ".parquet"
-  }
-}
-
+import org.apache.spark.sql.execution.datasources.OutputWriter
 
 // NOTE: This class is instantiated and used on executor side only, no need to be serializable.
 private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 02c5b85..daec2b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -17,23 +17,12 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import java.util.UUID
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext, TaskContextImpl}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.UnsafeKVExternalSorter
-import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils}
-import org.apache.spark.sql.types.{StringType, StructType}
-import org.apache.spark.util.SerializableConfiguration
-import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
+import org.apache.spark.sql.execution.datasources.{FileCommitProtocol, FileFormat, FileFormatWriter}
 
 object FileStreamSink {
   // The name of the subdirectory that is used to store metadata about which files are valid.
@@ -59,207 +48,41 @@ class FileStreamSink(
   private val fileLog =
     new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
   private val hadoopConf = sparkSession.sessionState.newHadoopConf()
-  private val fs = basePath.getFileSystem(hadoopConf)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
       logInfo(s"Skipping already committed batch $batchId")
     } else {
-      val writer = new FileStreamSinkWriter(
-        data, fileFormat, path, partitionColumnNames, hadoopConf, options)
-      val fileStatuses = writer.write()
-      if (fileLog.add(batchId, fileStatuses)) {
-        logInfo(s"Committed batch $batchId")
-      } else {
-        throw new IllegalStateException(s"Race while writing batch $batchId")
+      val committer = FileCommitProtocol.instantiate(
+        sparkSession.sessionState.conf.streamingFileCommitProtocolClass, path, isAppend = false)
+      committer match {
+        case manifestCommitter: ManifestFileCommitProtocol =>
+          manifestCommitter.setupManifestOptions(fileLog, batchId)
+        case _ =>  // Do nothing
       }
-    }
-  }
-
-  override def toString: String = s"FileSink[$path]"
-}
-
-
-/**
- * Writes data given to a [[FileStreamSink]] to the given `basePath` in the given `fileFormat`,
- * partitioned by the given `partitionColumnNames`. This writer always appends data to the
- * directory if it already has data.
- */
-class FileStreamSinkWriter(
-    data: DataFrame,
-    fileFormat: FileFormat,
-    basePath: String,
-    partitionColumnNames: Seq[String],
-    hadoopConf: Configuration,
-    options: Map[String, String]) extends Serializable with Logging {
-
-  PartitioningUtils.validatePartitionColumn(
-    data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis)
-
-  private val serializableConf = new SerializableConfiguration(hadoopConf)
-  private val dataSchema = data.schema
-  private val dataColumns = data.logicalPlan.output
-
-  // Get the actual partition columns as attributes after matching them by name with
-  // the given columns names.
-  private val partitionColumns = partitionColumnNames.map { col =>
-    val nameEquality = data.sparkSession.sessionState.conf.resolver
-    data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
-      throw new RuntimeException(s"Partition column $col not found in schema $dataSchema")
-    }
-  }
-
-  // Columns that are to be written to the files. If there are partitioning columns, then
-  // those will not be written to the files.
-  private val writeColumns = {
-    val partitionSet = AttributeSet(partitionColumns)
-    dataColumns.filterNot(partitionSet.contains)
-  }
-
-  // An OutputWriterFactory for generating writers in the executors for writing the files.
-  private val outputWriterFactory =
-    fileFormat.buildWriter(data.sqlContext, writeColumns.toStructType, options)
-
-  /** Expressions that given a partition key build a string like: col1=val/col2=val/... */
-  private def partitionStringExpression: Seq[Expression] = {
-    partitionColumns.zipWithIndex.flatMap { case (c, i) =>
-      val escaped =
-        ScalaUDF(
-          PartitioningUtils.escapePathName _,
-          StringType,
-          Seq(Cast(c, StringType)),
-          Seq(StringType))
-      val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped)
-      val partitionName = Literal(c.name + "=") :: str :: Nil
-      if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
-    }
-  }
-
-  /** Generate a new output writer from the writer factory */
-  private def newOutputWriter(path: Path): OutputWriter = {
-    val newWriter = outputWriterFactory.newWriter(path.toString)
-    newWriter.initConverter(dataSchema)
-    newWriter
-  }
 
-  /** Write the dataframe to files. This gets called in the driver by the [[FileStreamSink]]. */
-  def write(): Array[SinkFileStatus] = {
-    data.sqlContext.sparkContext.runJob(
-      data.queryExecution.toRdd,
-      (taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
-        if (partitionColumns.isEmpty) {
-          Seq(writePartitionToSingleFile(iterator))
-        } else {
-          writePartitionToPartitionedFiles(iterator)
+      // Get the actual partition columns as attributes after matching them by name with
+      // the given columns names.
+      val partitionColumns: Seq[Attribute] = partitionColumnNames.map { col =>
+        val nameEquality = data.sparkSession.sessionState.conf.resolver
+        data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
+          throw new RuntimeException(s"Partition column $col not found in schema ${data.schema}")
         }
-      }).flatten
-  }
-
-  /**
-   * Writes a RDD partition to a single file without dynamic partitioning.
-   * This gets called in the executor, and it uses a [[OutputWriter]] to write the data.
-   */
-  def writePartitionToSingleFile(iterator: Iterator[InternalRow]): SinkFileStatus = {
-    var writer: OutputWriter = null
-    try {
-      val path = new Path(basePath, UUID.randomUUID.toString)
-      val fs = path.getFileSystem(serializableConf.value)
-      writer = newOutputWriter(path)
-      while (iterator.hasNext) {
-        writer.writeInternal(iterator.next)
-      }
-      writer.close()
-      writer = null
-      SinkFileStatus(fs.getFileStatus(path))
-    } catch {
-      case cause: Throwable =>
-        logError("Aborting task.", cause)
-        // call failure callbacks first, so we could have a chance to cleanup the writer.
-        TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
-        throw new SparkException("Task failed while writing rows.", cause)
-    } finally {
-      if (writer != null) {
-        writer.close()
       }
-    }
-  }
-
-  /**
-   * Writes a RDD partition to multiple dynamically partitioned files.
-   * This gets called in the executor. It first sorts the data based on the partitioning columns
-   * and then writes the data of each key to separate files using [[OutputWriter]]s.
-   */
-  def writePartitionToPartitionedFiles(iterator: Iterator[InternalRow]): Seq[SinkFileStatus] = {
-
-    // Returns the partitioning columns for sorting
-    val getSortingKey = UnsafeProjection.create(partitionColumns, dataColumns)
-
-    // Returns the data columns to be written given an input row
-    val getOutputRow = UnsafeProjection.create(writeColumns, dataColumns)
-
-    // Returns the partition path given a partition key
-    val getPartitionString =
-      UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns)
 
-    // Sort the data before write, so that we only need one writer at the same time.
-    val sorter = new UnsafeKVExternalSorter(
-      partitionColumns.toStructType,
-      StructType.fromAttributes(writeColumns),
-      SparkEnv.get.blockManager,
-      SparkEnv.get.serializerManager,
-      TaskContext.get().taskMemoryManager().pageSizeBytes,
-      SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
-        UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
-
-    while (iterator.hasNext) {
-      val currentRow = iterator.next()
-      sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
-    }
-    logDebug(s"Sorting complete. Writing out partition files one at a time.")
-
-    val sortedIterator = sorter.sortedIterator()
-    val paths = new ArrayBuffer[Path]
-
-    // Write the sorted data to partitioned files, one for each unique key
-    var currentWriter: OutputWriter = null
-    try {
-      var currentKey: UnsafeRow = null
-      while (sortedIterator.next()) {
-        val nextKey = sortedIterator.getKey
-
-        // If key changes, close current writer, and open a new writer to a new partitioned file
-        if (currentKey != nextKey) {
-          if (currentWriter != null) {
-            currentWriter.close()
-            currentWriter = null
-          }
-          currentKey = nextKey.copy()
-          val partitionPath = getPartitionString(currentKey).getString(0)
-          val path = new Path(new Path(basePath, partitionPath), UUID.randomUUID.toString)
-          paths += path
-          currentWriter = newOutputWriter(path)
-          logInfo(s"Writing partition $currentKey to $path")
-        }
-        currentWriter.writeInternal(sortedIterator.getValue)
-      }
-      if (currentWriter != null) {
-        currentWriter.close()
-        currentWriter = null
-      }
-      if (paths.nonEmpty) {
-        val fs = paths.head.getFileSystem(serializableConf.value)
-        paths.map(p => SinkFileStatus(fs.getFileStatus(p)))
-      } else Seq.empty
-    } catch {
-      case cause: Throwable =>
-        logError("Aborting task.", cause)
-        // call failure callbacks first, so we could have a chance to cleanup the writer.
-        TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
-        throw new SparkException("Task failed while writing rows.", cause)
-    } finally {
-      if (currentWriter != null) {
-        currentWriter.close()
-      }
+      FileFormatWriter.write(
+        sparkSession = sparkSession,
+        plan = data.logicalPlan,
+        fileFormat = fileFormat,
+        committer = committer,
+        outputPath = path,
+        hadoopConf = hadoopConf,
+        partitionColumns = partitionColumns,
+        bucketSpec = None,
+        refreshFunction = _ => (),
+        options = options)
     }
   }
+
+  override def toString: String = s"FileSink[$path]"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
new file mode 100644
index 0000000..5103122
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.datasources.FileCommitProtocol
+import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage
+
+/**
+ * A [[FileCommitProtocol]] that tracks the list of valid files in a manifest file, used in
+ * structured streaming.
+ *
+ * @param path path to write the final output to.
+ */
+class ManifestFileCommitProtocol(path: String)
+  extends FileCommitProtocol with Serializable with Logging {
+
+  // Track the list of files added by a task, only used on the executors.
+  @transient private var addedFiles: ArrayBuffer[String] = _
+
+  @transient private var fileLog: FileStreamSinkLog = _
+  private var batchId: Long = _
+
+  /**
+   * Sets up the manifest log output and the batch id for this job.
+   * Must be called before any other function.
+   */
+  def setupManifestOptions(fileLog: FileStreamSinkLog, batchId: Long): Unit = {
+    this.fileLog = fileLog
+    this.batchId = batchId
+  }
+
+  override def setupJob(jobContext: JobContext): Unit = {
+    require(fileLog != null, "setupManifestOptions must be called before this function")
+    // Do nothing
+  }
+
+  override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
+    require(fileLog != null, "setupManifestOptions must be called before this function")
+    val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray
+
+    if (fileLog.add(batchId, fileStatuses)) {
+      logInfo(s"Committed batch $batchId")
+    } else {
+      throw new IllegalStateException(s"Race while writing batch $batchId")
+    }
+  }
+
+  override def abortJob(jobContext: JobContext): Unit = {
+    require(fileLog != null, "setupManifestOptions must be called before this function")
+    // Do nothing
+  }
+
+  override def setupTask(taskContext: TaskAttemptContext): Unit = {
+    addedFiles = new ArrayBuffer[String]
+  }
+
+  override def newTaskTempFile(
+      taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
+    // The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+    // Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
+    // the file name is fine and won't overflow.
+    val split = taskContext.getTaskAttemptID.getTaskID.getId
+    val uuid = UUID.randomUUID.toString
+    val filename = f"part-$split%05d-$uuid$ext"
+
+    val file = dir.map { d =>
+      new Path(new Path(path, d), filename).toString
+    }.getOrElse {
+      new Path(path, filename).toString
+    }
+
+    addedFiles += file
+    file
+  }
+
+  override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
+    if (addedFiles.nonEmpty) {
+      val fs = new Path(addedFiles.head).getFileSystem(taskContext.getConfiguration)
+      val statuses: Seq[SinkFileStatus] =
+        addedFiles.map(f => SinkFileStatus(fs.getFileStatus(new Path(f))))
+      new TaskCommitMessage(statuses)
+    } else {
+      new TaskCommitMessage(Seq.empty[SinkFileStatus])
+    }
+  }
+
+  override def abortTask(taskContext: TaskAttemptContext): Unit = {
+    // Do nothing
+    // TODO: we can also try delete the addedFiles as a best-effort cleanup.
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 29e7984..7bb3ac0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.catalyst.CatalystConf
 import org.apache.spark.sql.execution.datasources.HadoopCommitProtocolWrapper
+import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
 import org.apache.spark.util.Utils
 
 ////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -523,7 +524,7 @@ object SQLConf {
     SQLConfigBuilder("spark.sql.streaming.commitProtocolClass")
       .internal()
       .stringConf
-      .createWithDefault(classOf[HadoopCommitProtocolWrapper].getName)
+      .createWithDefault(classOf[ManifestFileCommitProtocol].getName)
 
   val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
     .internal()

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 18b42a8..902cf05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,106 +17,16 @@
 
 package org.apache.spark.sql.streaming
 
-import java.io.File
-
-import org.apache.commons.io.FileUtils
-import org.apache.commons.io.filefilter.{DirectoryFileFilter, RegexFileFilter}
-
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileIndex}
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 
 class FileStreamSinkSuite extends StreamTest {
   import testImplicits._
 
-
-  test("FileStreamSinkWriter - unpartitioned data") {
-    val path = Utils.createTempDir()
-    path.delete()
-
-    val hadoopConf = spark.sparkContext.hadoopConfiguration
-    val fileFormat = new parquet.ParquetFileFormat()
-
-    def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
-      val df = spark
-        .range(start, end, 1, numPartitions)
-        .select($"id", lit(100).as("data"))
-      val writer = new FileStreamSinkWriter(
-        df, fileFormat, path.toString, partitionColumnNames = Nil, hadoopConf, Map.empty)
-      writer.write().map(_.path.stripPrefix("file://"))
-    }
-
-    // Write and check whether new files are written correctly
-    val files1 = writeRange(0, 10, 2)
-    assert(files1.size === 2, s"unexpected number of files: $files1")
-    checkFilesExist(path, files1, "file not written")
-    checkAnswer(spark.read.load(path.getCanonicalPath), (0 until 10).map(Row(_, 100)))
-
-    // Append and check whether new files are written correctly and old files still exist
-    val files2 = writeRange(10, 20, 3)
-    assert(files2.size === 3, s"unexpected number of files: $files2")
-    assert(files2.intersect(files1).isEmpty, "old files returned")
-    checkFilesExist(path, files2, s"New file not written")
-    checkFilesExist(path, files1, s"Old file not found")
-    checkAnswer(spark.read.load(path.getCanonicalPath), (0 until 20).map(Row(_, 100)))
-  }
-
-  test("FileStreamSinkWriter - partitioned data") {
-    implicit val e = ExpressionEncoder[java.lang.Long]
-    val path = Utils.createTempDir()
-    path.delete()
-
-    val hadoopConf = spark.sparkContext.hadoopConfiguration
-    val fileFormat = new parquet.ParquetFileFormat()
-
-    def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
-      val df = spark
-        .range(start, end, 1, numPartitions)
-        .flatMap(x => Iterator(x, x, x)).toDF("id")
-        .select($"id", lit(100).as("data1"), lit(1000).as("data2"))
-
-      require(df.rdd.partitions.size === numPartitions)
-      val writer = new FileStreamSinkWriter(
-        df, fileFormat, path.toString, partitionColumnNames = Seq("id"), hadoopConf, Map.empty)
-      writer.write().map(_.path.stripPrefix("file://"))
-    }
-
-    def checkOneFileWrittenPerKey(keys: Seq[Int], filesWritten: Seq[String]): Unit = {
-      keys.foreach { id =>
-        assert(
-          filesWritten.count(_.contains(s"/id=$id/")) == 1,
-          s"no file for id=$id. all files: \n\t${filesWritten.mkString("\n\t")}"
-        )
-      }
-    }
-
-    // Write and check whether new files are written correctly
-    val files1 = writeRange(0, 10, 2)
-    assert(files1.size === 10, s"unexpected number of files:\n${files1.mkString("\n")}")
-    checkFilesExist(path, files1, "file not written")
-    checkOneFileWrittenPerKey(0 until 10, files1)
-
-    val answer1 = (0 until 10).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _))
-    checkAnswer(spark.read.load(path.getCanonicalPath), answer1)
-
-    // Append and check whether new files are written correctly and old files still exist
-    val files2 = writeRange(0, 20, 3)
-    assert(files2.size === 20, s"unexpected number of files:\n${files2.mkString("\n")}")
-    assert(files2.intersect(files1).isEmpty, "old files returned")
-    checkFilesExist(path, files2, s"New file not written")
-    checkFilesExist(path, files1, s"Old file not found")
-    checkOneFileWrittenPerKey(0 until 20, files2)
-
-    val answer2 = (0 until 20).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _))
-    checkAnswer(spark.read.load(path.getCanonicalPath), answer1 ++ answer2)
-  }
-
   test("FileStreamSink - unpartitioned writing and batch reading") {
     val inputData = MemoryStream[Int]
     val df = inputData.toDF()
@@ -270,18 +180,4 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 
-  private def checkFilesExist(dir: File, expectedFiles: Seq[String], msg: String): Unit = {
-    import scala.collection.JavaConverters._
-    val files =
-      FileUtils.listFiles(dir, new RegexFileFilter("[^.]+"), DirectoryFileFilter.DIRECTORY)
-        .asScala
-        .map(_.getCanonicalPath)
-        .toSet
-
-    expectedFiles.foreach { f =>
-      assert(files.contains(f),
-        s"\n$msg\nexpected file:\n\t$f\nfound files:\n${files.mkString("\n\t")}")
-    }
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org