You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Cheng Su (Jira)" <ji...@apache.org> on 2020/10/30 06:47:00 UTC

[jira] [Updated] (SPARK-33298) FileCommitProtocol V2

     [ https://issues.apache.org/jira/browse/SPARK-33298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Cheng Su updated SPARK-33298:
-----------------------------
    Description: 
This Jira is to propose a new version for `FileCommitProtocol` ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala] ), e.g. `FileCommitProtocolV2`.

The motivation is currently we have two requirements to change the API for FileCommitProtocol:

(1).Support write Hive ORC/Parquet bucketed table ([https://github.com/apache/spark/pull/30003] ): need to add new parameter `prefix` into method `newTaskTempFile` and `newTaskTempFileAbsPath`, to allow spark writes hive/presto-compatible bucketed files.

(2).Fix commit collision in dynamic partition overwrite mode ([https://github.com/apache/spark/pull/29000] ): need to add new method `getStagingDir` to allow customize dynamic partition staging directory to avoid commit collision.

 

The reason to propose FileCommitProtocolV2 instead of changing `FileCommitProtocol` directly, is that the API for FileCommitProtocolV2 is kind of public where we allow customized commit protocol subclass to use during run-time ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala#L146] ). So if we change the API (e.g. adding method, or changing existing method signature), it will break external subclass for the commit protocol. And we are aware of some of external subclasses for better support of object store, according to [~cloud_fan] .

 

One proposal for `FileCommitProtocolV2` can be:
{code:java}
abstract class FileCommitProtocolV2 {
  // `options` to replace `ext`, where we can put more string-string parameters
  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String],
options: Map[String, String]): String
 
  // `options` to replace `ext`, where we can put more string-string parameters
  def newTaskTempFileAbsPath(
    taskContext: TaskAttemptContext, absoluteDir: String, options: Map[String, String]): String

  // other new methods, e.g. getStagingDir
  def getStagingDir(path: String, jobId: String): Path

  // rest of FileCommitProtocol methods
  ...
}
{code}
 

FileCommitProtocolV2.instantiate() logic will first try to find a subclass for `FileCommitProtocolV2`, if not will find a subclass for `FileCommitProtocol`, so the current version of `FileCommitProtocol` is still supported.

  was:
This Jira is to propose a new version for `FileCommitProtocol` ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala] ), e.g. `FileCommitProtocolV2`.

The motivation is currently we have two requirements to change the API for FileCommitProtocol:

(1).Support write Hive ORC/Parquet bucketed table ([https://github.com/apache/spark/pull/30003] ): need to add new parameter `prefix` into method `newTaskTempFile` and `newTaskTempFileAbsPath`, to allow spark writes hive/presto-compatible bucketed files.

(2).Fix commit collision in dynamic partition overwrite mode ([https://github.com/apache/spark/pull/29000] ): need to add new method `getStagingDir` to allow customize dynamic partition staging directory to avoid commit collision.

 

The reason to propose FileCommitProtocolV2 instead of changing `FileCommitProtocol` directly, is that the API for FileCommitProtocolV2 is kind of public where we allow customized commit protocol subclass to use during run-time ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala#L146] ). So if we change the API (e.g. adding method, or changing existing method signature), it will break external subclass for the commit protocol. And we are aware of some of external subclasses for better support of object store, according to [~cloud_fan] .

 

One proposal for `FileCommitProtocolV2` can be:
{code:java}
abstract class FileCommitProtocolV2 {
  // `options` to replace `ext`, where we can put more string-string parameters
  def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String],
options: Map[String, String]): String
 
  // `options` to replace `ext`, where we can put more string-string parameters
  def newTaskTempFileAbsPath(
    taskContext: TaskAttemptContext, absoluteDir: String, options: Map[String, String]): String

  // other new methods, e.g. getStagingDir
  def getStagingDir(path: String, jobId: String): Path

  // rest of FileCommitProtocol methods
  ...
}
{code}


> FileCommitProtocol V2
> ---------------------
>
>                 Key: SPARK-33298
>                 URL: https://issues.apache.org/jira/browse/SPARK-33298
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 3.1.0
>            Reporter: Cheng Su
>            Priority: Minor
>
> This Jira is to propose a new version for `FileCommitProtocol` ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala] ), e.g. `FileCommitProtocolV2`.
> The motivation is currently we have two requirements to change the API for FileCommitProtocol:
> (1).Support write Hive ORC/Parquet bucketed table ([https://github.com/apache/spark/pull/30003] ): need to add new parameter `prefix` into method `newTaskTempFile` and `newTaskTempFileAbsPath`, to allow spark writes hive/presto-compatible bucketed files.
> (2).Fix commit collision in dynamic partition overwrite mode ([https://github.com/apache/spark/pull/29000] ): need to add new method `getStagingDir` to allow customize dynamic partition staging directory to avoid commit collision.
>  
> The reason to propose FileCommitProtocolV2 instead of changing `FileCommitProtocol` directly, is that the API for FileCommitProtocolV2 is kind of public where we allow customized commit protocol subclass to use during run-time ([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala#L146] ). So if we change the API (e.g. adding method, or changing existing method signature), it will break external subclass for the commit protocol. And we are aware of some of external subclasses for better support of object store, according to [~cloud_fan] .
>  
> One proposal for `FileCommitProtocolV2` can be:
> {code:java}
> abstract class FileCommitProtocolV2 {
>   // `options` to replace `ext`, where we can put more string-string parameters
>   def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String],
> options: Map[String, String]): String
>  
>   // `options` to replace `ext`, where we can put more string-string parameters
>   def newTaskTempFileAbsPath(
>     taskContext: TaskAttemptContext, absoluteDir: String, options: Map[String, String]): String
>   // other new methods, e.g. getStagingDir
>   def getStagingDir(path: String, jobId: String): Path
>   // rest of FileCommitProtocol methods
>   ...
> }
> {code}
>  
> FileCommitProtocolV2.instantiate() logic will first try to find a subclass for `FileCommitProtocolV2`, if not will find a subclass for `FileCommitProtocol`, so the current version of `FileCommitProtocol` is still supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org