You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/02/28 17:42:44 UTC

[spark] branch branch-3.4 updated: Revert "[SPARK-40034][SQL] PathOutputCommitters to support dynamic partitions"

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 57aa3d1ca6e Revert "[SPARK-40034][SQL] PathOutputCommitters to support dynamic partitions"
57aa3d1ca6e is described below

commit 57aa3d1ca6e17e4c6b934d74176ea22ca56d60f7
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Tue Feb 28 09:27:00 2023 -0800

    Revert "[SPARK-40034][SQL] PathOutputCommitters to support dynamic partitions"
    
    This reverts commit 5a599dec507786139fb2ecb7ce1a44c83fd06b0d.
---
 docs/cloud-integration.md                          | 112 +--------------
 .../io/cloud/BindingParquetOutputCommitter.scala   |   8 +-
 .../io/cloud/PathOutputCommitProtocol.scala        |  86 ++----------
 .../internal/io/cloud/CommitterBindingSuite.scala  | 155 +++------------------
 .../io/cloud/StubPathOutputCommitter.scala         |  60 ++------
 5 files changed, 47 insertions(+), 374 deletions(-)

diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md
index 06342645e6d..991ba69c8cb 100644
--- a/docs/cloud-integration.md
+++ b/docs/cloud-integration.md
@@ -248,13 +248,8 @@ exhibits eventual consistency (example: S3), and often slower than classic
 filesystem renames.
 
 Some object store connectors provide custom committers to commit tasks and
-jobs without using rename. 
-
-### Hadoop S3A committers
-
-In versions of Spark built with Hadoop 3.1 or later,
-the hadoop-aws JAR contains committers safe to use for S3 storage
-accessed via the s3a connector.
+jobs without using rename. In versions of Spark built with Hadoop 3.1 or later,
+the S3A connector for AWS S3 is such a committer.
 
 Instead of writing data to a temporary directory on the store for renaming,
 these committers write the files to the final destination, but do not issue
@@ -277,111 +272,22 @@ It has been tested with the most common formats supported by Spark.
 mydataframe.write.format("parquet").save("s3a://bucket/destination")
 ```
 
-More details on these committers can be found in 
-[the latest Hadoop documentation](https://hadoop.apache.org/docs/current/)
-with S3A committer detail covered in
-[Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html).
+More details on these committers can be found in the latest Hadoop documentation.
 
 Note: depending upon the committer used, in-progress statistics may be
 under-reported with Hadoop versions before 3.3.1.
 
-### Amazon EMR: the EMRFS S3-optimized committer
-
-Amazon EMR has its own S3-aware committers for parquet data.
-For instructions on use, see
-[the EMRFS S3-optimized committer](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)
-
-For implementation and performanc details, see
-["Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer"](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/
-
-
-### Azure and Google cloud storage: MapReduce Intermediate Manifest Committer.
-
-Versions of the hadoop-mapreduce-core JAR shipped after September 2022 (3.3.5 and later)
-contain a committer optimized for performance and resilience on
-Azure ADLS Generation 2 and Google Cloud Storage.
-
-This committer, the "manifest committer" uses a manifest file to propagate
-directory listing information from the task committers to the job committer.
-These manifests can be written atomically, without relying on atomic directory rename,
-something GCS lacks.
-
-The job committer reads these manifests and will rename files from the task output
-directories directly into the destination directory, in parallel, with optional
-rate limiting to avoid throttling IO.
-This delivers performance and scalability on the object stores.
-
-It is not critical for job correctness to use this with Azure storage; the
-classic FileOutputCommitter is safe there -however this new committer scales
-better for large jobs with deep and wide directory trees.
-
-Because Google GCS does not support atomic directory renaming,
-the manifest committer should be used where available.
-
-This committer does support  "dynamic partition overwrite" (see below). 
-
-For details on availability and use of this committer, consult
-the hadoop documentation for the Hadoop release used.
-
-It is not available on Hadoop 3.3.4 or earlier.
-
-### IBM Cloud Object Storage: Stocator
-
-IBM provide the Stocator output committer for IBM Cloud Object Storage and OpenStack Swift.
-
-Source, documentation and releasea can be found at
-[https://github.com/CODAIT/stocator](Stocator - Storage Connector for Apache Spark).
-
-
-## Cloud Committers and `INSERT OVERWRITE TABLE`
-
-Spark has a feature called "dynamic partition overwrite"; a table can be updated and only those
-partitions into which new data is added will have their contents replaced.
-
-This is used in SQL statements of the form `INSERT OVERWRITE TABLE`,
-and when Datasets are written in mode "overwrite"
-
-{% highlight scala %}
-eventDataset.write
-  .mode("overwrite")
-  .partitionBy("year", "month")
-  .format("parquet")
-  .save(tablePath)
-{% endhighlight %}
-
-This feature uses file renaming and has specific requirements of
-both the committer and the filesystem:
-
-1. The committer's working directory must be in the destination filesystem.
-2. The target filesystem must support file rename efficiently.
-
-These conditions are _not_ met by the S3A committers and AWS S3 storage.
-
-Committers for other cloud stores _may_ support this feature, and
-declare to spark that they are compatible. If dynamic partition overwrite
-is required when writing data through a hadoop committer, Spark
-will always permit this when the original `FileOutputCommitter`
-is used. For other committers, after their instantiation, Spark
-will probe for their declaration of compatibility, and
-permit the operation if state that they are compatible.
-
-If the committer is not compatible, the operation will fail with
-the error message
-`PathOutputCommitter does not support dynamicPartitionOverwrite`
-
-Unless there is a compatible committer for the target filesystem,
-the sole solution is to use a cloud-friendly format for data
-storage.
-
 ## Further Reading
 
 Here is the documentation on the standard connectors both from Apache and the cloud providers.
 
+* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
 * [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-azure/index.html).
 * [Azure Blob Filesystem (ABFS) and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html).
 * [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
 * [Amazon S3 Strong Consistency](https://aws.amazon.com/s3/consistency/)
 * [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
+* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current2/hadoop-aws/tools/hadoop-aws/index.html).
 * [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon.
 * [Using the EMRFS S3-optimized Committer](https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)
 * [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). From Google.
@@ -389,11 +295,3 @@ Here is the documentation on the standard connectors both from Apache and the cl
 * IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator),
   [IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM.
 * [Using JindoFS SDK to access Alibaba Cloud OSS](https://github.com/aliyun/alibabacloud-jindofs).
-
-The Cloud Committer problem and hive-compatible solutions
-* [Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html)
-* [Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/)
-* [The Manifest Committer for Azure and Google Cloud Storage](https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md)
-* [A Zero-rename committer](https://github.com/steveloughran/zero-rename-committer/releases/).
-* [Stocator: A High Performance Object Store Connector for Spark](http://arxiv.org/abs/1709.01812)
-
diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala
index 1e740a6e778..81a57385dd9 100644
--- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala
+++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud
 
 import java.io.IOException
 
-import org.apache.hadoop.fs.{Path, StreamCapabilities}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter}
 import org.apache.parquet.hadoop.ParquetOutputCommitter
@@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging
 class BindingParquetOutputCommitter(
     path: Path,
     context: TaskAttemptContext)
-  extends ParquetOutputCommitter(path, context) with Logging with StreamCapabilities {
+  extends ParquetOutputCommitter(path, context) with Logging {
 
   logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path")
 
@@ -119,8 +119,4 @@ class BindingParquetOutputCommitter(
   }
 
   override def toString: String = s"BindingParquetOutputCommitter($committer)"
-
-  override def hasCapability(capability: String): Boolean =
-    committer.isInstanceOf[StreamCapabilities] &&
-      committer.asInstanceOf[StreamCapabilities].hasCapability(capability)
 }
diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
index 44a521bd636..fc5d0a0b3a7 100644
--- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
+++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
@@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud
 
 import java.io.IOException
 
-import org.apache.hadoop.fs.{Path, StreamCapabilities}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.TaskAttemptContext
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}
 
@@ -38,28 +38,27 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
  * In `setupCommitter` the factory is identified and instantiated;
  * this factory then creates the actual committer implementation.
  *
- * Dynamic Partition support will be determined once the committer is
- * instantiated in the setupJob/setupTask methods. If this
- * class was instantiated with `dynamicPartitionOverwrite` set to true,
- * then the instantiated committer must either be an instance of
- * `FileOutputCommitter` or it must implement the `StreamCapabilities`
- * interface and declare that it has the capability
- * `mapreduce.job.committer.dynamic.partitioning`.
- * That feature is available on Hadoop releases with the Intermediate
- * Manifest Committer for GCS and ABFS; it is not supported by the
- * S3A committers.
- * @constructor Instantiate.
+ * @constructor Instantiate. dynamic partition overwrite is not supported,
+ *              so that committers for stores which do not support rename
+ *              will not get confused.
  * @param jobId                     job
  * @param dest                      destination
  * @param dynamicPartitionOverwrite does the caller want support for dynamic
- *                                  partition overwrite?
+ *                                  partition overwrite. If so, it will be
+ *                                  refused.
  */
 class PathOutputCommitProtocol(
     jobId: String,
     dest: String,
     dynamicPartitionOverwrite: Boolean = false)
-  extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite)
-    with Serializable {
+  extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable {
+
+  if (dynamicPartitionOverwrite) {
+    // until there's explicit extensions to the PathOutputCommitProtocols
+    // to support the spark mechanism, it's left to the individual committer
+    // choice to handle partitioning.
+    throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
+  }
 
   /** The committer created. */
   @transient private var committer: PathOutputCommitter = _
@@ -115,33 +114,10 @@ class PathOutputCommitProtocol(
         // failures. Warn
         logTrace(s"Committer $committer may not be tolerant of task commit failures")
       }
-    } else {
-      // if required other committers need to be checked for dynamic partition
-      // compatibility through a StreamCapabilities probe.
-      if (dynamicPartitionOverwrite) {
-        if (supportsDynamicPartitions) {
-          logDebug(
-            s"Committer $committer has declared compatibility with dynamic partition overwrite")
-        } else {
-          throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
-        }
-      }
     }
     committer
   }
 
-
-  /**
-   * Does the instantiated committer support dynamic partitions?
-   * @return true if the committer declares itself compatible.
-   */
-  private def supportsDynamicPartitions = {
-    committer.isInstanceOf[FileOutputCommitter] ||
-      (committer.isInstanceOf[StreamCapabilities] &&
-        committer.asInstanceOf[StreamCapabilities]
-          .hasCapability(CAPABILITY_DYNAMIC_PARTITIONING))
-  }
-
   /**
    * Create a temporary file for a task.
    *
@@ -164,28 +140,6 @@ class PathOutputCommitProtocol(
     file.toString
   }
 
-  /**
-   * Reject any requests for an absolute path file on a committer which
-   * is not compatible with it.
-   *
-   * @param taskContext task context
-   * @param absoluteDir final directory
-   * @param spec output filename
-   * @return a path string
-   * @throws UnsupportedOperationException if incompatible
-   */
-  override def newTaskTempFileAbsPath(
-    taskContext: TaskAttemptContext,
-    absoluteDir: String,
-    spec: FileNameSpec): String = {
-
-    if (supportsDynamicPartitions) {
-      super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec)
-    } else {
-      throw new UnsupportedOperationException(s"Absolute output locations not supported" +
-        s" by committer $committer")
-    }
-  }
 }
 
 object PathOutputCommitProtocol {
@@ -207,17 +161,7 @@ object PathOutputCommitProtocol {
   val REJECT_FILE_OUTPUT_DEFVAL = false
 
   /** Error string for tests. */
-  private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" +
+  private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not support" +
     " dynamicPartitionOverwrite"
 
-  /**
-   * Stream Capabilities probe for spark dynamic partitioning compatibility.
-   */
-  private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING =
-    "mapreduce.job.committer.dynamic.partitioning"
-
-  /**
-   * Scheme prefix for per-filesystem scheme committers.
-   */
-  private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme"
 }
diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
index 984c7dbc2cb..546f54229ea 100644
--- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
+++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
@@ -18,17 +18,17 @@
 package org.apache.spark.internal.io.cloud
 
 import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream}
+import java.lang.reflect.InvocationTargetException
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, StreamCapabilities}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.IOUtils
-import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptContext, TaskAttemptID}
-import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat}
+import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptID}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
-import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME}
+import org.apache.spark.internal.io.FileCommitProtocol
 
 class CommitterBindingSuite extends SparkFunSuite {
 
@@ -49,20 +49,18 @@ class CommitterBindingSuite extends SparkFunSuite {
    * [[BindingParquetOutputCommitter]] committer bind to the schema-specific
    * committer declared for the destination path? And that lifecycle events
    * are correctly propagated?
-   * This only works with a hadoop build where BindingPathOutputCommitter
-   * does passthrough of stream capabilities, so check that first.
    */
   test("BindingParquetOutputCommitter binds to the inner committer") {
-
     val path = new Path("http://example/data")
     val job = newJob(path)
     val conf = job.getConfiguration
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
     conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
-    StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
-    val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+
+    StubPathOutputCommitterFactory.bind(conf, "http")
+    val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
     val parquet = new BindingParquetOutputCommitter(path, tContext)
-    val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning]
+    val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitter]
     parquet.setupJob(tContext)
     assert(inner.jobSetup, s"$inner job not setup")
     parquet.setupTask(tContext)
@@ -78,18 +76,6 @@ class CommitterBindingSuite extends SparkFunSuite {
     assert(inner.jobCommitted, s"$inner job not committed")
     parquet.abortJob(tContext, JobStatus.State.RUNNING)
     assert(inner.jobAborted, s"$inner job not aborted")
-
-    val binding = new BindingPathOutputCommitter(path, tContext)
-    // MAPREDUCE-7403 only arrived after hadoop 3.3.4; this test case
-    // is designed to work with versions with and without the feature.
-    if (binding.isInstanceOf[StreamCapabilities]) {
-      // this version of hadoop does support hasCapability probes
-      // through the BindingPathOutputCommitter used by the
-      // parquet committer, so verify that it goes through
-      // to the stub committer.
-      assert(parquet.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING),
-        s"committer $parquet does not declare dynamic partition support")
-    }
   }
 
   /**
@@ -144,124 +130,17 @@ class CommitterBindingSuite extends SparkFunSuite {
     assert("file:///tmp" === protocol.destination)
   }
 
-  /*
-   * Bind a job to a committer which doesn't support dynamic partitioning.
-   * Job setup must fail, and calling `newTaskTempFileAbsPath()` must
-   * raise `UnsupportedOperationException`.
-   */
-  test("reject dynamic partitioning if not supported") {
-    val path = new Path("http://example/data")
-    val job = newJob(path)
-    val conf = job.getConfiguration
-    conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
-    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
-    StubPathOutputCommitterBinding.bind(conf, "http")
-    val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
-    val committer = FileCommitProtocol.instantiate(
-      pathCommitProtocolClassname,
-      jobId,
-      path.toUri.toString,
-      true)
-    val ioe = intercept[IOException] {
-      committer.setupJob(tContext)
-    }
-    if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
-      throw ioe
-    }
-
-    // calls to newTaskTempFileAbsPath() will be rejected
-    intercept[UnsupportedOperationException] {
-      verifyAbsTempFileWorks(tContext, committer)
-    }
-  }
-
-  /*
-   * Bind to a committer with dynamic partitioning support,
-   * verify that job and task setup works, and that
-   * `newTaskTempFileAbsPath()` creates a temp file which
-   * can be moved to an absolute path later.
-   */
-  test("permit dynamic partitioning if the committer says it works") {
-    val path = new Path("http://example/data")
-    val job = newJob(path)
-    val conf = job.getConfiguration
-    conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
-    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
-    StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
-    val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
-    val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate(
-      pathCommitProtocolClassname,
-      jobId,
-      path.toUri.toString,
-      true).asInstanceOf[PathOutputCommitProtocol]
-    committer.setupJob(tContext)
-    committer.setupTask(tContext)
-    verifyAbsTempFileWorks(tContext, committer)
-  }
-
-  /*
-   * Create a FileOutputCommitter through the PathOutputCommitProtocol
-   * using the relevant factory in hadoop-mapreduce-core JAR.
-   */
-  test("FileOutputCommitter through PathOutputCommitProtocol") {
-    // temp path; use a unique filename
-    val jobCommitDir = File.createTempFile(
-      "FileOutputCommitter-through-PathOutputCommitProtocol",
-      "")
-    try {
-      // delete the temp file and create a temp dir.
-      jobCommitDir.delete();
-      val jobUri = jobCommitDir.toURI
-      // hadoop path of the job
-      val path = new Path(jobUri)
-      val job = newJob(path)
-      val conf = job.getConfiguration
-      conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
-      conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
-      bindToFileOutputCommitterFactory(conf, "file")
-      val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
-      val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate(
+  test("reject dynamic partitioning") {
+    val cause = intercept[InvocationTargetException] {
+      FileCommitProtocol.instantiate(
         pathCommitProtocolClassname,
-        jobId,
-        jobUri.toString,
-        true).asInstanceOf[PathOutputCommitProtocol]
-      committer.setupJob(tContext)
-      committer.setupTask(tContext)
-      verifyAbsTempFileWorks(tContext, committer)
-    } finally {
-      jobCommitDir.delete();
+        jobId, "file:///tmp", true)
+    }.getCause
+    if (cause == null || !cause.isInstanceOf[IOException]
+        || !cause.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
+      throw cause
     }
   }
 
-  /**
-   * Verify that a committer supports `newTaskTempFileAbsPath()`.
-   *
-   * @param tContext task context
-   * @param committer committer
-   */
-  private def verifyAbsTempFileWorks(
-    tContext: TaskAttemptContextImpl,
-    committer: FileCommitProtocol): Unit = {
-    val spec = FileNameSpec(".lotus.", ".123")
-    val absPath = committer.newTaskTempFileAbsPath(
-      tContext,
-      "/tmp",
-      spec)
-    assert(absPath.endsWith(".123"), s"wrong suffix in $absPath")
-    assert(absPath.contains("lotus"), s"wrong prefix in $absPath")
-  }
-
-  /**
-   * Given a hadoop configuration, explicitly set up the factory binding for the scheme
-   * to a committer factory which always creates FileOutputCommitters.
-   *
-   * @param conf   config to patch
-   * @param scheme filesystem scheme.
-   */
-  def bindToFileOutputCommitterFactory(conf: Configuration, scheme: String): Unit = {
-    conf.set(OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme,
-      "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory")
-  }
-
 }
 
diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala
index 5a0dba45ba8..88a36d227b1 100644
--- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala
+++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala
@@ -18,12 +18,10 @@
 package org.apache.spark.internal.io.cloud
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, StreamCapabilities}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, PathOutputCommitterFactory}
 
-import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME}
-
 /**
  * A local path output committer which tracks its state, for use in tests.
  * @param outputPath final destination.
@@ -93,45 +91,10 @@ class StubPathOutputCommitterFactory extends PathOutputCommitterFactory {
   }
 
   private def workPath(out: Path): Path = new Path(out,
-    StubPathOutputCommitterBinding.TEMP_DIR_NAME)
-}
-
-/**
- * An extension which declares that it supports dynamic partitioning.
- * @param outputPath final destination.
- * @param workPath work path
- * @param context task/job attempt.
- */
-class StubPathOutputCommitterWithDynamicPartioning(
-  outputPath: Path,
-  workPath: Path,
-  context: TaskAttemptContext) extends StubPathOutputCommitter(outputPath, workPath, context)
-  with StreamCapabilities {
-
-  override def hasCapability(capability: String): Boolean =
-    CAPABILITY_DYNAMIC_PARTITIONING == capability
-
+    StubPathOutputCommitterFactory.TEMP_DIR_NAME)
 }
 
-
-class StubPathOutputCommitterWithDynamicPartioningFactory extends PathOutputCommitterFactory {
-
-  override def createOutputCommitter(
-      outputPath: Path,
-      context: TaskAttemptContext): PathOutputCommitter = {
-    new StubPathOutputCommitterWithDynamicPartioning(outputPath, workPath(outputPath), context)
-  }
-
-  private def workPath(out: Path): Path = new Path(out,
-    StubPathOutputCommitterBinding.TEMP_DIR_NAME)
-}
-
-
-/**
- * Class to help binding job configurations to the different
- * stub committers available.
- */
-object StubPathOutputCommitterBinding {
+object StubPathOutputCommitterFactory {
 
   /**
    * This is the "Pending" directory of the FileOutputCommitter;
@@ -139,6 +102,11 @@ object StubPathOutputCommitterBinding {
    */
   val TEMP_DIR_NAME = "_temporary"
 
+  /**
+   * Scheme prefix for per-filesystem scheme committers.
+   */
+  val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme"
+
   /**
    * Given a hadoop configuration, set up the factory binding for the scheme.
    * @param conf config to patch
@@ -149,16 +117,4 @@ object StubPathOutputCommitterBinding {
     conf.set(key, classOf[StubPathOutputCommitterFactory].getName())
   }
 
-  /**
-   * Bind the configuration/scheme to the stub committer which
-   * declares support for dynamic partitioning.
-   *
-   * @param conf   config to patch
-   * @param scheme filesystem scheme.
-   */
-  def bindWithDynamicPartitioning(conf: Configuration, scheme: String): Unit = {
-    val key = OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme
-    conf.set(key,
-      classOf[StubPathOutputCommitterWithDynamicPartioningFactory].getName())
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org