You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/02/28 17:42:44 UTC
[spark] branch branch-3.4 updated: Revert "[SPARK-40034][SQL] PathOutputCommitters to support dynamic partitions"
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 57aa3d1ca6e Revert "[SPARK-40034][SQL] PathOutputCommitters to support dynamic partitions"
57aa3d1ca6e is described below
commit 57aa3d1ca6e17e4c6b934d74176ea22ca56d60f7
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Tue Feb 28 09:27:00 2023 -0800
Revert "[SPARK-40034][SQL] PathOutputCommitters to support dynamic partitions"
This reverts commit 5a599dec507786139fb2ecb7ce1a44c83fd06b0d.
---
docs/cloud-integration.md | 112 +--------------
.../io/cloud/BindingParquetOutputCommitter.scala | 8 +-
.../io/cloud/PathOutputCommitProtocol.scala | 86 ++----------
.../internal/io/cloud/CommitterBindingSuite.scala | 155 +++------------------
.../io/cloud/StubPathOutputCommitter.scala | 60 ++------
5 files changed, 47 insertions(+), 374 deletions(-)
diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md
index 06342645e6d..991ba69c8cb 100644
--- a/docs/cloud-integration.md
+++ b/docs/cloud-integration.md
@@ -248,13 +248,8 @@ exhibits eventual consistency (example: S3), and often slower than classic
filesystem renames.
Some object store connectors provide custom committers to commit tasks and
-jobs without using rename.
-
-### Hadoop S3A committers
-
-In versions of Spark built with Hadoop 3.1 or later,
-the hadoop-aws JAR contains committers safe to use for S3 storage
-accessed via the s3a connector.
+jobs without using rename. In versions of Spark built with Hadoop 3.1 or later,
+the S3A connector for AWS S3 is such a committer.
Instead of writing data to a temporary directory on the store for renaming,
these committers write the files to the final destination, but do not issue
@@ -277,111 +272,22 @@ It has been tested with the most common formats supported by Spark.
mydataframe.write.format("parquet").save("s3a://bucket/destination")
```
-More details on these committers can be found in
-[the latest Hadoop documentation](https://hadoop.apache.org/docs/current/)
-with S3A committer detail covered in
-[Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html).
+More details on these committers can be found in the latest Hadoop documentation.
Note: depending upon the committer used, in-progress statistics may be
under-reported with Hadoop versions before 3.3.1.
-### Amazon EMR: the EMRFS S3-optimized committer
-
-Amazon EMR has its own S3-aware committers for parquet data.
-For instructions on use, see
-[the EMRFS S3-optimized committer](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)
-
-For implementation and performanc details, see
-["Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer"](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/
-
-
-### Azure and Google cloud storage: MapReduce Intermediate Manifest Committer.
-
-Versions of the hadoop-mapreduce-core JAR shipped after September 2022 (3.3.5 and later)
-contain a committer optimized for performance and resilience on
-Azure ADLS Generation 2 and Google Cloud Storage.
-
-This committer, the "manifest committer" uses a manifest file to propagate
-directory listing information from the task committers to the job committer.
-These manifests can be written atomically, without relying on atomic directory rename,
-something GCS lacks.
-
-The job committer reads these manifests and will rename files from the task output
-directories directly into the destination directory, in parallel, with optional
-rate limiting to avoid throttling IO.
-This delivers performance and scalability on the object stores.
-
-It is not critical for job correctness to use this with Azure storage; the
-classic FileOutputCommitter is safe there -however this new committer scales
-better for large jobs with deep and wide directory trees.
-
-Because Google GCS does not support atomic directory renaming,
-the manifest committer should be used where available.
-
-This committer does support "dynamic partition overwrite" (see below).
-
-For details on availability and use of this committer, consult
-the hadoop documentation for the Hadoop release used.
-
-It is not available on Hadoop 3.3.4 or earlier.
-
-### IBM Cloud Object Storage: Stocator
-
-IBM provide the Stocator output committer for IBM Cloud Object Storage and OpenStack Swift.
-
-Source, documentation and releasea can be found at
-[https://github.com/CODAIT/stocator](Stocator - Storage Connector for Apache Spark).
-
-
-## Cloud Committers and `INSERT OVERWRITE TABLE`
-
-Spark has a feature called "dynamic partition overwrite"; a table can be updated and only those
-partitions into which new data is added will have their contents replaced.
-
-This is used in SQL statements of the form `INSERT OVERWRITE TABLE`,
-and when Datasets are written in mode "overwrite"
-
-{% highlight scala %}
-eventDataset.write
- .mode("overwrite")
- .partitionBy("year", "month")
- .format("parquet")
- .save(tablePath)
-{% endhighlight %}
-
-This feature uses file renaming and has specific requirements of
-both the committer and the filesystem:
-
-1. The committer's working directory must be in the destination filesystem.
-2. The target filesystem must support file rename efficiently.
-
-These conditions are _not_ met by the S3A committers and AWS S3 storage.
-
-Committers for other cloud stores _may_ support this feature, and
-declare to spark that they are compatible. If dynamic partition overwrite
-is required when writing data through a hadoop committer, Spark
-will always permit this when the original `FileOutputCommitter`
-is used. For other committers, after their instantiation, Spark
-will probe for their declaration of compatibility, and
-permit the operation if state that they are compatible.
-
-If the committer is not compatible, the operation will fail with
-the error message
-`PathOutputCommitter does not support dynamicPartitionOverwrite`
-
-Unless there is a compatible committer for the target filesystem,
-the sole solution is to use a cloud-friendly format for data
-storage.
-
## Further Reading
Here is the documentation on the standard connectors both from Apache and the cloud providers.
+* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
* [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-azure/index.html).
* [Azure Blob Filesystem (ABFS) and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-azure/abfs.html).
* [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
* [Amazon S3 Strong Consistency](https://aws.amazon.com/s3/consistency/)
* [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
+* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current2/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon.
* [Using the EMRFS S3-optimized Committer](https://docs.amazonaws.cn/en_us/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html)
* [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/dataproc/docs/concepts/connectors/cloud-storage). From Google.
@@ -389,11 +295,3 @@ Here is the documentation on the standard connectors both from Apache and the cl
* IBM Cloud Object Storage connector for Apache Spark: [Stocator](https://github.com/CODAIT/stocator),
[IBM Object Storage](https://www.ibm.com/cloud/object-storage). From IBM.
* [Using JindoFS SDK to access Alibaba Cloud OSS](https://github.com/aliyun/alibabacloud-jindofs).
-
-The Cloud Committer problem and hive-compatible solutions
-* [Committing work to S3 with the S3A Committers](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committers.html)
-* [Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/)
-* [The Manifest Committer for Azure and Google Cloud Storage](https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/markdown/manifest_committer.md)
-* [A Zero-rename committer](https://github.com/steveloughran/zero-rename-committer/releases/).
-* [Stocator: A High Performance Object Store Connector for Spark](http://arxiv.org/abs/1709.01812)
-
diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala
index 1e740a6e778..81a57385dd9 100644
--- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala
+++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/BindingParquetOutputCommitter.scala
@@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud
import java.io.IOException
-import org.apache.hadoop.fs.{Path, StreamCapabilities}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter}
import org.apache.parquet.hadoop.ParquetOutputCommitter
@@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging
class BindingParquetOutputCommitter(
path: Path,
context: TaskAttemptContext)
- extends ParquetOutputCommitter(path, context) with Logging with StreamCapabilities {
+ extends ParquetOutputCommitter(path, context) with Logging {
logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path")
@@ -119,8 +119,4 @@ class BindingParquetOutputCommitter(
}
override def toString: String = s"BindingParquetOutputCommitter($committer)"
-
- override def hasCapability(capability: String): Boolean =
- committer.isInstanceOf[StreamCapabilities] &&
- committer.asInstanceOf[StreamCapabilities].hasCapability(capability)
}
diff --git a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
index 44a521bd636..fc5d0a0b3a7 100644
--- a/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
+++ b/hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala
@@ -19,7 +19,7 @@ package org.apache.spark.internal.io.cloud
import java.io.IOException
-import org.apache.hadoop.fs.{Path, StreamCapabilities}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}
@@ -38,28 +38,27 @@ import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
* In `setupCommitter` the factory is identified and instantiated;
* this factory then creates the actual committer implementation.
*
- * Dynamic Partition support will be determined once the committer is
- * instantiated in the setupJob/setupTask methods. If this
- * class was instantiated with `dynamicPartitionOverwrite` set to true,
- * then the instantiated committer must either be an instance of
- * `FileOutputCommitter` or it must implement the `StreamCapabilities`
- * interface and declare that it has the capability
- * `mapreduce.job.committer.dynamic.partitioning`.
- * That feature is available on Hadoop releases with the Intermediate
- * Manifest Committer for GCS and ABFS; it is not supported by the
- * S3A committers.
- * @constructor Instantiate.
+ * @constructor Instantiate. dynamic partition overwrite is not supported,
+ * so that committers for stores which do not support rename
+ * will not get confused.
* @param jobId job
* @param dest destination
* @param dynamicPartitionOverwrite does the caller want support for dynamic
- * partition overwrite?
+ * partition overwrite. If so, it will be
+ * refused.
*/
class PathOutputCommitProtocol(
jobId: String,
dest: String,
dynamicPartitionOverwrite: Boolean = false)
- extends HadoopMapReduceCommitProtocol(jobId, dest, dynamicPartitionOverwrite)
- with Serializable {
+ extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable {
+
+ if (dynamicPartitionOverwrite) {
+ // until there's explicit extensions to the PathOutputCommitProtocols
+ // to support the spark mechanism, it's left to the individual committer
+ // choice to handle partitioning.
+ throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
+ }
/** The committer created. */
@transient private var committer: PathOutputCommitter = _
@@ -115,33 +114,10 @@ class PathOutputCommitProtocol(
// failures. Warn
logTrace(s"Committer $committer may not be tolerant of task commit failures")
}
- } else {
- // if required other committers need to be checked for dynamic partition
- // compatibility through a StreamCapabilities probe.
- if (dynamicPartitionOverwrite) {
- if (supportsDynamicPartitions) {
- logDebug(
- s"Committer $committer has declared compatibility with dynamic partition overwrite")
- } else {
- throw new IOException(PathOutputCommitProtocol.UNSUPPORTED + ": " + committer)
- }
- }
}
committer
}
-
- /**
- * Does the instantiated committer support dynamic partitions?
- * @return true if the committer declares itself compatible.
- */
- private def supportsDynamicPartitions = {
- committer.isInstanceOf[FileOutputCommitter] ||
- (committer.isInstanceOf[StreamCapabilities] &&
- committer.asInstanceOf[StreamCapabilities]
- .hasCapability(CAPABILITY_DYNAMIC_PARTITIONING))
- }
-
/**
* Create a temporary file for a task.
*
@@ -164,28 +140,6 @@ class PathOutputCommitProtocol(
file.toString
}
- /**
- * Reject any requests for an absolute path file on a committer which
- * is not compatible with it.
- *
- * @param taskContext task context
- * @param absoluteDir final directory
- * @param spec output filename
- * @return a path string
- * @throws UnsupportedOperationException if incompatible
- */
- override def newTaskTempFileAbsPath(
- taskContext: TaskAttemptContext,
- absoluteDir: String,
- spec: FileNameSpec): String = {
-
- if (supportsDynamicPartitions) {
- super.newTaskTempFileAbsPath(taskContext, absoluteDir, spec)
- } else {
- throw new UnsupportedOperationException(s"Absolute output locations not supported" +
- s" by committer $committer")
- }
- }
}
object PathOutputCommitProtocol {
@@ -207,17 +161,7 @@ object PathOutputCommitProtocol {
val REJECT_FILE_OUTPUT_DEFVAL = false
/** Error string for tests. */
- private[cloud] val UNSUPPORTED: String = "PathOutputCommitter does not support" +
+ private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not support" +
" dynamicPartitionOverwrite"
- /**
- * Stream Capabilities probe for spark dynamic partitioning compatibility.
- */
- private[cloud] val CAPABILITY_DYNAMIC_PARTITIONING =
- "mapreduce.job.committer.dynamic.partitioning"
-
- /**
- * Scheme prefix for per-filesystem scheme committers.
- */
- private[cloud] val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme"
}
diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
index 984c7dbc2cb..546f54229ea 100644
--- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
+++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/CommitterBindingSuite.scala
@@ -18,17 +18,17 @@
package org.apache.spark.internal.io.cloud
import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream}
+import java.lang.reflect.InvocationTargetException
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, StreamCapabilities}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IOUtils
-import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptContext, TaskAttemptID}
-import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, FileOutputFormat}
+import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptID}
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.SparkFunSuite
-import org.apache.spark.internal.io.{FileCommitProtocol, FileNameSpec}
-import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME}
+import org.apache.spark.internal.io.FileCommitProtocol
class CommitterBindingSuite extends SparkFunSuite {
@@ -49,20 +49,18 @@ class CommitterBindingSuite extends SparkFunSuite {
* [[BindingParquetOutputCommitter]] committer bind to the schema-specific
* committer declared for the destination path? And that lifecycle events
* are correctly propagated?
- * This only works with a hadoop build where BindingPathOutputCommitter
- * does passthrough of stream capabilities, so check that first.
*/
test("BindingParquetOutputCommitter binds to the inner committer") {
-
val path = new Path("http://example/data")
val job = newJob(path)
val conf = job.getConfiguration
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
- StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
- val tContext: TaskAttemptContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
+
+ StubPathOutputCommitterFactory.bind(conf, "http")
+ val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
val parquet = new BindingParquetOutputCommitter(path, tContext)
- val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitterWithDynamicPartioning]
+ val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitter]
parquet.setupJob(tContext)
assert(inner.jobSetup, s"$inner job not setup")
parquet.setupTask(tContext)
@@ -78,18 +76,6 @@ class CommitterBindingSuite extends SparkFunSuite {
assert(inner.jobCommitted, s"$inner job not committed")
parquet.abortJob(tContext, JobStatus.State.RUNNING)
assert(inner.jobAborted, s"$inner job not aborted")
-
- val binding = new BindingPathOutputCommitter(path, tContext)
- // MAPREDUCE-7403 only arrived after hadoop 3.3.4; this test case
- // is designed to work with versions with and without the feature.
- if (binding.isInstanceOf[StreamCapabilities]) {
- // this version of hadoop does support hasCapability probes
- // through the BindingPathOutputCommitter used by the
- // parquet committer, so verify that it goes through
- // to the stub committer.
- assert(parquet.hasCapability(CAPABILITY_DYNAMIC_PARTITIONING),
- s"committer $parquet does not declare dynamic partition support")
- }
}
/**
@@ -144,124 +130,17 @@ class CommitterBindingSuite extends SparkFunSuite {
assert("file:///tmp" === protocol.destination)
}
- /*
- * Bind a job to a committer which doesn't support dynamic partitioning.
- * Job setup must fail, and calling `newTaskTempFileAbsPath()` must
- * raise `UnsupportedOperationException`.
- */
- test("reject dynamic partitioning if not supported") {
- val path = new Path("http://example/data")
- val job = newJob(path)
- val conf = job.getConfiguration
- conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
- conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
- StubPathOutputCommitterBinding.bind(conf, "http")
- val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
- val committer = FileCommitProtocol.instantiate(
- pathCommitProtocolClassname,
- jobId,
- path.toUri.toString,
- true)
- val ioe = intercept[IOException] {
- committer.setupJob(tContext)
- }
- if (!ioe.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
- throw ioe
- }
-
- // calls to newTaskTempFileAbsPath() will be rejected
- intercept[UnsupportedOperationException] {
- verifyAbsTempFileWorks(tContext, committer)
- }
- }
-
- /*
- * Bind to a committer with dynamic partitioning support,
- * verify that job and task setup works, and that
- * `newTaskTempFileAbsPath()` creates a temp file which
- * can be moved to an absolute path later.
- */
- test("permit dynamic partitioning if the committer says it works") {
- val path = new Path("http://example/data")
- val job = newJob(path)
- val conf = job.getConfiguration
- conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
- conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
- StubPathOutputCommitterBinding.bindWithDynamicPartitioning(conf, "http")
- val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
- val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate(
- pathCommitProtocolClassname,
- jobId,
- path.toUri.toString,
- true).asInstanceOf[PathOutputCommitProtocol]
- committer.setupJob(tContext)
- committer.setupTask(tContext)
- verifyAbsTempFileWorks(tContext, committer)
- }
-
- /*
- * Create a FileOutputCommitter through the PathOutputCommitProtocol
- * using the relevant factory in hadoop-mapreduce-core JAR.
- */
- test("FileOutputCommitter through PathOutputCommitProtocol") {
- // temp path; use a unique filename
- val jobCommitDir = File.createTempFile(
- "FileOutputCommitter-through-PathOutputCommitProtocol",
- "")
- try {
- // delete the temp file and create a temp dir.
- jobCommitDir.delete();
- val jobUri = jobCommitDir.toURI
- // hadoop path of the job
- val path = new Path(jobUri)
- val job = newJob(path)
- val conf = job.getConfiguration
- conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
- conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
- bindToFileOutputCommitterFactory(conf, "file")
- val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
- val committer: PathOutputCommitProtocol = FileCommitProtocol.instantiate(
+ test("reject dynamic partitioning") {
+ val cause = intercept[InvocationTargetException] {
+ FileCommitProtocol.instantiate(
pathCommitProtocolClassname,
- jobId,
- jobUri.toString,
- true).asInstanceOf[PathOutputCommitProtocol]
- committer.setupJob(tContext)
- committer.setupTask(tContext)
- verifyAbsTempFileWorks(tContext, committer)
- } finally {
- jobCommitDir.delete();
+ jobId, "file:///tmp", true)
+ }.getCause
+ if (cause == null || !cause.isInstanceOf[IOException]
+ || !cause.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
+ throw cause
}
}
- /**
- * Verify that a committer supports `newTaskTempFileAbsPath()`.
- *
- * @param tContext task context
- * @param committer committer
- */
- private def verifyAbsTempFileWorks(
- tContext: TaskAttemptContextImpl,
- committer: FileCommitProtocol): Unit = {
- val spec = FileNameSpec(".lotus.", ".123")
- val absPath = committer.newTaskTempFileAbsPath(
- tContext,
- "/tmp",
- spec)
- assert(absPath.endsWith(".123"), s"wrong suffix in $absPath")
- assert(absPath.contains("lotus"), s"wrong prefix in $absPath")
- }
-
- /**
- * Given a hadoop configuration, explicitly set up the factory binding for the scheme
- * to a committer factory which always creates FileOutputCommitters.
- *
- * @param conf config to patch
- * @param scheme filesystem scheme.
- */
- def bindToFileOutputCommitterFactory(conf: Configuration, scheme: String): Unit = {
- conf.set(OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme,
- "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory")
- }
-
}
diff --git a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala
index 5a0dba45ba8..88a36d227b1 100644
--- a/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala
+++ b/hadoop-cloud/src/hadoop-3/test/scala/org/apache/spark/internal/io/cloud/StubPathOutputCommitter.scala
@@ -18,12 +18,10 @@
package org.apache.spark.internal.io.cloud
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{Path, StreamCapabilities}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, PathOutputCommitterFactory}
-import org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.{CAPABILITY_DYNAMIC_PARTITIONING, OUTPUTCOMMITTER_FACTORY_SCHEME}
-
/**
* A local path output committer which tracks its state, for use in tests.
* @param outputPath final destination.
@@ -93,45 +91,10 @@ class StubPathOutputCommitterFactory extends PathOutputCommitterFactory {
}
private def workPath(out: Path): Path = new Path(out,
- StubPathOutputCommitterBinding.TEMP_DIR_NAME)
-}
-
-/**
- * An extension which declares that it supports dynamic partitioning.
- * @param outputPath final destination.
- * @param workPath work path
- * @param context task/job attempt.
- */
-class StubPathOutputCommitterWithDynamicPartioning(
- outputPath: Path,
- workPath: Path,
- context: TaskAttemptContext) extends StubPathOutputCommitter(outputPath, workPath, context)
- with StreamCapabilities {
-
- override def hasCapability(capability: String): Boolean =
- CAPABILITY_DYNAMIC_PARTITIONING == capability
-
+ StubPathOutputCommitterFactory.TEMP_DIR_NAME)
}
-
-class StubPathOutputCommitterWithDynamicPartioningFactory extends PathOutputCommitterFactory {
-
- override def createOutputCommitter(
- outputPath: Path,
- context: TaskAttemptContext): PathOutputCommitter = {
- new StubPathOutputCommitterWithDynamicPartioning(outputPath, workPath(outputPath), context)
- }
-
- private def workPath(out: Path): Path = new Path(out,
- StubPathOutputCommitterBinding.TEMP_DIR_NAME)
-}
-
-
-/**
- * Class to help binding job configurations to the different
- * stub committers available.
- */
-object StubPathOutputCommitterBinding {
+object StubPathOutputCommitterFactory {
/**
* This is the "Pending" directory of the FileOutputCommitter;
@@ -139,6 +102,11 @@ object StubPathOutputCommitterBinding {
*/
val TEMP_DIR_NAME = "_temporary"
+ /**
+ * Scheme prefix for per-filesystem scheme committers.
+ */
+ val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme"
+
/**
* Given a hadoop configuration, set up the factory binding for the scheme.
* @param conf config to patch
@@ -149,16 +117,4 @@ object StubPathOutputCommitterBinding {
conf.set(key, classOf[StubPathOutputCommitterFactory].getName())
}
- /**
- * Bind the configuration/scheme to the stub committer which
- * declares support for dynamic partitioning.
- *
- * @param conf config to patch
- * @param scheme filesystem scheme.
- */
- def bindWithDynamicPartitioning(conf: Configuration, scheme: String): Unit = {
- val key = OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme
- conf.set(key,
- classOf[StubPathOutputCommitterWithDynamicPartioningFactory].getName())
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org