You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/12/03 15:23:48 UTC

[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/23208

    [SPARK-25530][SQL] data source v2 API refactor (batch write)

    ## What changes were proposed in this pull request?
    
    Adjust the batch write API to match the read API refactor after https://github.com/apache/spark/pull/23086
    
    Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, and make it extend `Table`. It also cleans up some code as batch API is completed.
    
    This PR also removes the test from https://github.com/apache/spark/pull/22688 . Now data source must return a table for read/write. It's a little awkward to use it with the `SaveMode` based write APIs, as users can append data to a non-existing table. `TableProvider` needs to return a `Table` instance with empty schema if the table doesn't exist, so that we can write it later. Hopefully we can remove the `SaveMode` based write APIs after the new APIs are finished and widely used.
    
    A few notes about future changes:
    1. We will create `SupportsStreamingWrite` later for streaming APIs
    2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. for the new end-user write APIs. I think streaming APIs would remain to use `OutputMode`, and new end-user write APIs will apply to batch only, at least in the near future.
    
    
    ## How was this patch tested?
    
    existing tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark refactor-batch

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/23208.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #23208
    
----
commit 00fc34fa793b922a48a4bf8e9f9cd0e3b688800b
Author: Wenchen Fan <we...@...>
Date:   2018-12-03T14:38:43Z

    data source v2 API refactor (batch write)

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239682239
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
    @@ -25,7 +25,10 @@
      * The base interface for v2 data sources which don't have a real catalog. Implementations must
      * have a public, 0-arg constructor.
      * <p>
    - * The major responsibility of this interface is to return a {@link Table} for read/write.
    + * The major responsibility of this interface is to return a {@link Table} for read/write. If you
    + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter`
    + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The
    + * table schema can be empty in this case.
    --- End diff --
    
    I don't want to break existing use cases, file sources can overwrite/append to a non-existing location, and we still need to support that with `SaveMode`.
    
    Whatever the new write API will be, I think we still need to support `SaveMode` for a while.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239888975
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java ---
    @@ -25,14 +25,14 @@
     import org.apache.spark.sql.types.StructType;
     
     /**
    - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    + * A mix-in interface for {@link Table}. Data sources can implement this interface to
      * provide data writing ability for batch processing.
      *
      * This interface is used to create {@link BatchWriteSupport} instances when end users run
      * {@code Dataset.write.format(...).option(...).save()}.
      */
     @Evolving
    -public interface BatchWriteSupportProvider extends DataSourceV2 {
    +public interface SupportsBatchWrite extends Table {
    --- End diff --
    
    I'm fine either way, as long as we are consistent between the read and write sides.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239889152
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -17,52 +17,49 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import java.util.UUID
    -
    -import scala.collection.JavaConverters._
    +import java.util.{Optional, UUID}
     
     import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
     import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
     import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
     import org.apache.spark.sql.catalyst.util.truncatedString
    -import org.apache.spark.sql.sources.DataSourceRegister
     import org.apache.spark.sql.sources.v2._
     import org.apache.spark.sql.sources.v2.reader._
     import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
     import org.apache.spark.sql.types.StructType
     
     /**
    - * A logical plan representing a data source v2 scan.
    + * A logical plan representing a data source v2 table.
      *
    - * @param source An instance of a [[DataSourceV2]] implementation.
    - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]].
    - * @param userSpecifiedSchema The user-specified schema for this scan.
    + * @param table The table that this relation represents.
    + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
    + *                and [[BatchWriteSupport]].
      */
     case class DataSourceV2Relation(
    -    // TODO: remove `source` when we finish API refactor for write.
    -    source: TableProvider,
    -    table: SupportsBatchRead,
    +    table: Table,
         output: Seq[AttributeReference],
    -    options: Map[String, String],
    -    userSpecifiedSchema: Option[StructType] = None)
    +    // TODO: use a simple case insensitive map instead.
    +    options: DataSourceOptions)
    --- End diff --
    
    A private method to do that existed in the past. Why not just revive it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    Thanks for posting this PR @cloud-fan! I'll have a look in the next day or so.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r238524973
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java ---
    @@ -25,14 +25,14 @@
     import org.apache.spark.sql.types.StructType;
     
     /**
    - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    + * A mix-in interface for {@link Table}. Data sources can implement this interface to
      * provide data writing ability for batch processing.
      *
      * This interface is used to create {@link BatchWriteSupport} instances when end users run
    --- End diff --
    
    I don't have a better naming in mind, so I leave it as `WriteSupport` for now. Better naming is welcome to match `Scan`!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r240101369
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -17,52 +17,49 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import java.util.UUID
    -
    -import scala.collection.JavaConverters._
    +import java.util.{Optional, UUID}
     
     import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
     import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
     import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
     import org.apache.spark.sql.catalyst.util.truncatedString
    -import org.apache.spark.sql.sources.DataSourceRegister
     import org.apache.spark.sql.sources.v2._
     import org.apache.spark.sql.sources.v2.reader._
     import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
     import org.apache.spark.sql.types.StructType
     
     /**
    - * A logical plan representing a data source v2 scan.
    + * A logical plan representing a data source v2 table.
      *
    - * @param source An instance of a [[DataSourceV2]] implementation.
    - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]].
    - * @param userSpecifiedSchema The user-specified schema for this scan.
    + * @param table The table that this relation represents.
    + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
    + *                and [[BatchWriteSupport]].
      */
     case class DataSourceV2Relation(
    -    // TODO: remove `source` when we finish API refactor for write.
    -    source: TableProvider,
    -    table: SupportsBatchRead,
    +    table: Table,
         output: Seq[AttributeReference],
    -    options: Map[String, String],
    -    userSpecifiedSchema: Option[StructType] = None)
    +    // TODO: use a simple case insensitive map instead.
    +    options: DataSourceOptions)
       extends LeafNode with MultiInstanceRelation with NamedRelation {
     
    -  import DataSourceV2Relation._
    -
       override def name: String = table.name()
     
       override def simpleString: String = {
         s"RelationV2${truncatedString(output, "[", ", ", "]")} $name"
       }
     
    -  def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema)
    -
    -  def newScanBuilder(): ScanBuilder = {
    -    val dsOptions = new DataSourceOptions(options.asJava)
    -    table.newScanBuilder(dsOptions)
    +  def newWriteSupport(inputSchema: StructType, mode: SaveMode): Optional[BatchWriteSupport] = {
    --- End diff --
    
    Nit: add comment for the method. Especially when it will return None. Although it is explained in `SupportsBatchWrite.createBatchWriteSupport`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239469368
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java ---
    @@ -25,14 +25,14 @@
     import org.apache.spark.sql.types.StructType;
     
     /**
    - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    + * A mix-in interface for {@link Table}. Data sources can implement this interface to
      * provide data writing ability for batch processing.
      *
      * This interface is used to create {@link BatchWriteSupport} instances when end users run
      * {@code Dataset.write.format(...).option(...).save()}.
      */
     @Evolving
    -public interface BatchWriteSupportProvider extends DataSourceV2 {
    +public interface SupportsBatchWrite extends Table {
    --- End diff --
    
    That's why I left https://github.com/apache/spark/pull/23208#discussion_r238524973 .
    
    namings are welcome!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    cc @rdblue @rxin @jose-torres  @gatorsmile @HyukjinKwon @gengliangwang 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    Let's move the high level discussion to the doc: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239559037
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
    @@ -25,7 +25,10 @@
      * The base interface for v2 data sources which don't have a real catalog. Implementations must
      * have a public, 0-arg constructor.
      * <p>
    - * The major responsibility of this interface is to return a {@link Table} for read/write.
    + * The major responsibility of this interface is to return a {@link Table} for read/write. If you
    + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter`
    + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The
    + * table schema can be empty in this case.
    --- End diff --
    
    What does it mean to write to a non-existing table? If you're writing somewhere, the table must exist.
    
    This is for creating a table directly from configuration and an implementation class in the DataFrameWriter API. The target of the write still needs to exist.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239578059
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
    @@ -25,7 +25,10 @@
      * The base interface for v2 data sources which don't have a real catalog. Implementations must
      * have a public, 0-arg constructor.
      * <p>
    - * The major responsibility of this interface is to return a {@link Table} for read/write.
    + * The major responsibility of this interface is to return a {@link Table} for read/write. If you
    + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter`
    + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The
    + * table schema can be empty in this case.
    --- End diff --
    
    @jose-torres, create on write is done by CTAS. It should not be left up to the source whether to fail or create.
    
    I think the confusion here is that this is a degenerate case where Spark has no ability to interact with the table's metadata. Spark must assume that it exists because the caller is writing to it.
    
    The caller is indicating that a table exists, is identified by some configuration, and that a specific implementation can be used to write to it. That's what happens today when source implementations are directly specified.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239682984
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     
         assertNotBucketed("save")
     
    -    val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
    -    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    -      val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
    -      source match {
    -        case provider: BatchWriteSupportProvider =>
    -          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
    -            source,
    -            df.sparkSession.sessionState.conf)
    -          val options = sessionOptions ++ extraOptions
    -
    +    val session = df.sparkSession
    +    val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
    +    if (classOf[TableProvider].isAssignableFrom(cls)) {
    +      val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
    +      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
    +        provider, session.sessionState.conf)
    +      val options = sessionOptions ++ extraOptions
    +      val dsOptions = new DataSourceOptions(options.asJava)
    +      provider.getTable(dsOptions) match {
    +        case table: SupportsBatchWrite =>
    +          val relation = DataSourceV2Relation.create(table, dsOptions)
    +          // TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`.
    +          // We should create new end-users APIs for the `AppendData` operator.
    --- End diff --
    
    yea, that's why I only left a comment and just ask for revisiting later. I think we can see a clearer picture after we migrating the file source.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239888795
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
    @@ -25,7 +25,10 @@
      * The base interface for v2 data sources which don't have a real catalog. Implementations must
      * have a public, 0-arg constructor.
      * <p>
    - * The major responsibility of this interface is to return a {@link Table} for read/write.
    + * The major responsibility of this interface is to return a {@link Table} for read/write. If you
    + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter`
    + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The
    + * table schema can be empty in this case.
    --- End diff --
    
    I think we can remove SaveMode right away. We don't need to break existing use cases if we add the OverwriteData plan and use it when the user's mode is Overwrite. That helps us get to the point where we can integrate SQL on top of this faster.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99620/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    **[Test build #99620 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99620/testReport)** for PR 23208 at commit [`00fc34f`](https://github.com/apache/spark/commit/00fc34fa793b922a48a4bf8e9f9cd0e3b688800b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    @rdblue I tried to add `WriteBuilder`, but there is a difference between read and write:
    1. for read, the `ScanBuilder` can collect many information, like column pruning, filter pushdown, etc. together, and create a `Scan`
    2. for write, it's just different branches, not a combination. e.g. you can't do append and replaceWhere at the same time.
    
    Because of this, I feel we don't need `WriterBuilder`, but just different mixin traits to create `Write` for different purposes.
    
    Let me know if you have other ideas. Thanks for your review!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239684490
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java ---
    @@ -25,14 +25,14 @@
     import org.apache.spark.sql.types.StructType;
     
     /**
    - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    + * A mix-in interface for {@link Table}. Data sources can implement this interface to
      * provide data writing ability for batch processing.
      *
      * This interface is used to create {@link BatchWriteSupport} instances when end users run
      * {@code Dataset.write.format(...).option(...).save()}.
      */
     @Evolving
    -public interface BatchWriteSupportProvider extends DataSourceV2 {
    +public interface SupportsBatchWrite extends Table {
    --- End diff --
    
    I do think read-only or write-only is a necessary feature, according to what I've seen in the dev list. Maybe we should move `newScanBuilder` from `Table` to the mixin traits.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239596456
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     
         assertNotBucketed("save")
     
    -    val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
    -    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    -      val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
    -      source match {
    -        case provider: BatchWriteSupportProvider =>
    -          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
    -            source,
    -            df.sparkSession.sessionState.conf)
    -          val options = sessionOptions ++ extraOptions
    -
    +    val session = df.sparkSession
    +    val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
    +    if (classOf[TableProvider].isAssignableFrom(cls)) {
    +      val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
    +      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
    +        provider, session.sessionState.conf)
    +      val options = sessionOptions ++ extraOptions
    +      val dsOptions = new DataSourceOptions(options.asJava)
    +      provider.getTable(dsOptions) match {
    +        case table: SupportsBatchWrite =>
    +          val relation = DataSourceV2Relation.create(table, dsOptions)
    +          // TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`.
    +          // We should create new end-users APIs for the `AppendData` operator.
    --- End diff --
    
    The example in the referenced comment is this:
    
    ```
    spark.range(1).format("source").write.save("non-existent-path")
    ```
    
    If a path for a path-based table doesn't exist, then I think that the table doesn't exist. If a table doesn't exist, then the operation for `save` should be CTAS instead of AppendData.
    
    Here, I think the right behavior is to check whether the provider returns a table. If it doesn't, then the table doesn't exist and the plan should be CTAS. If it does, then it must provide the schema used to validate the AppendData operation. Since we don't currently have CTAS, this should throw an exception stating that the table doesn't exist and can't be created.
    
    More generally, the meaning of SaveMode with v1 is not always reliable. I think the right approach is what @cloud-fan suggests: create a new write API for v2 tables that is clear for the new logical plans (I've proposed one and would be happy to open a PR). Once the logical plans are in place, we can go back through this API and move it over to v2 where the behaviors match.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239613722
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java ---
    @@ -25,14 +25,14 @@
     import org.apache.spark.sql.types.StructType;
     
     /**
    - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    + * A mix-in interface for {@link Table}. Data sources can implement this interface to
      * provide data writing ability for batch processing.
      *
      * This interface is used to create {@link BatchWriteSupport} instances when end users run
      * {@code Dataset.write.format(...).option(...).save()}.
      */
     @Evolving
    -public interface BatchWriteSupportProvider extends DataSourceV2 {
    +public interface SupportsBatchWrite extends Table {
    --- End diff --
    
    `Table` exposes `newScanBuilder` without an interface. Why should the write side be different? Doesn't Spark support sources that are read-only and write-only?
    
    I think that both reads and writes should use interfaces to mix support into `Table` or both should be exposed by `Table` and throw `UnsupportedOperationException` by default, not a mix of the two options.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r238313221
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     
         assertNotBucketed("save")
     
    -    val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
    -    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    -      val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
    -      source match {
    -        case provider: BatchWriteSupportProvider =>
    -          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
    -            source,
    -            df.sparkSession.sessionState.conf)
    -          val options = sessionOptions ++ extraOptions
    -
    +    val session = df.sparkSession
    +    val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
    +    if (classOf[TableProvider].isAssignableFrom(cls)) {
    +      val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
    +      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
    +        provider, session.sessionState.conf)
    +      val options = sessionOptions ++ extraOptions
    +      val dsOptions = new DataSourceOptions(options.asJava)
    +      provider.getTable(dsOptions) match {
    +        case table: SupportsBatchWrite =>
    +          val relation = DataSourceV2Relation.create(table, dsOptions)
    +          // TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`.
    +          // We should create new end-users APIs for the `AppendData` operator.
    --- End diff --
    
    according to the discussion in https://github.com/apache/spark/pull/22688#issuecomment-428626027 , the behavior of append operator and `SaveMode.Append` can be different. We should revisit it when we have the new end-user write APIs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5676/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r240028515
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
    @@ -25,7 +25,10 @@
      * The base interface for v2 data sources which don't have a real catalog. Implementations must
      * have a public, 0-arg constructor.
      * <p>
    - * The major responsibility of this interface is to return a {@link Table} for read/write.
    + * The major responsibility of this interface is to return a {@link Table} for read/write. If you
    + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter`
    + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The
    + * table schema can be empty in this case.
    --- End diff --
    
    I'm not convinced it's safe to remove `SaveMode` right away, when there is only an `Append` operator implemented currently.
    
    If we do it, it means `DataFrameWriter.save` need to throw an exception for a lot of cases, except when the `mode` is append. I don't think this is acceptable right now.
    
    Can we discuss the removal of `SaveMode` at least after all the necessary new write operators are implemented?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    @cloud-fan, what are you suggesting to use as a design? If you think this shouldn't mirror the read side, then let's be clear on what it should look like. Maybe that's a design doc, or maybe that's a discussion thread on the mailing list.
    
    Whatever option we go for, we still need to have a plan for exposing the replace-by-filter and replace-dynamic-partitions methods, whatever they end up being. We also need the life-cycle to match.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239613088
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -17,52 +17,49 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import java.util.UUID
    -
    -import scala.collection.JavaConverters._
    +import java.util.{Optional, UUID}
     
     import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
     import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
     import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
     import org.apache.spark.sql.catalyst.util.truncatedString
    -import org.apache.spark.sql.sources.DataSourceRegister
     import org.apache.spark.sql.sources.v2._
     import org.apache.spark.sql.sources.v2.reader._
     import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
     import org.apache.spark.sql.types.StructType
     
     /**
    - * A logical plan representing a data source v2 scan.
    + * A logical plan representing a data source v2 table.
      *
    - * @param source An instance of a [[DataSourceV2]] implementation.
    - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]].
    - * @param userSpecifiedSchema The user-specified schema for this scan.
    + * @param table The table that this relation represents.
    + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
    + *                and [[BatchWriteSupport]].
      */
     case class DataSourceV2Relation(
    -    // TODO: remove `source` when we finish API refactor for write.
    -    source: TableProvider,
    -    table: SupportsBatchRead,
    +    table: Table,
         output: Seq[AttributeReference],
    -    options: Map[String, String],
    -    userSpecifiedSchema: Option[StructType] = None)
    +    // TODO: use a simple case insensitive map instead.
    +    options: DataSourceOptions)
    --- End diff --
    
    Why change this now, when DataSourceOptions will be replaced? I would say just leave it as a map and update it once later.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239581374
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
    @@ -25,7 +25,10 @@
      * The base interface for v2 data sources which don't have a real catalog. Implementations must
      * have a public, 0-arg constructor.
      * <p>
    - * The major responsibility of this interface is to return a {@link Table} for read/write.
    + * The major responsibility of this interface is to return a {@link Table} for read/write. If you
    + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter`
    + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The
    + * table schema can be empty in this case.
    --- End diff --
    
    Maybe it should also be part of the `TableProvider` contract that if the table can't be located, it throws an exception?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r238313454
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -17,52 +17,49 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import java.util.UUID
    -
    -import scala.collection.JavaConverters._
    +import java.util.{Optional, UUID}
     
     import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
     import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
     import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
     import org.apache.spark.sql.catalyst.util.truncatedString
    -import org.apache.spark.sql.sources.DataSourceRegister
     import org.apache.spark.sql.sources.v2._
     import org.apache.spark.sql.sources.v2.reader._
     import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
     import org.apache.spark.sql.types.StructType
     
     /**
    - * A logical plan representing a data source v2 scan.
    + * A logical plan representing a data source v2 table.
      *
    - * @param source An instance of a [[DataSourceV2]] implementation.
    - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]].
    - * @param userSpecifiedSchema The user-specified schema for this scan.
    + * @param table The table that this relation represents.
    + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
    + *                and [[BatchWriteSupport]].
      */
     case class DataSourceV2Relation(
    -    // TODO: remove `source` when we finish API refactor for write.
    -    source: TableProvider,
    -    table: SupportsBatchRead,
    +    table: Table,
         output: Seq[AttributeReference],
    -    options: Map[String, String],
    -    userSpecifiedSchema: Option[StructType] = None)
    +    // TODO: use a simple case insensitive map instead.
    --- End diff --
    
    I'll do it in my next PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    **[Test build #99620 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99620/testReport)** for PR 23208 at commit [`00fc34f`](https://github.com/apache/spark/commit/00fc34fa793b922a48a4bf8e9f9cd0e3b688800b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239598346
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -241,32 +241,28 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     
         assertNotBucketed("save")
     
    -    val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
    -    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    -      val source = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
    -      source match {
    -        case provider: BatchWriteSupportProvider =>
    -          val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
    -            source,
    -            df.sparkSession.sessionState.conf)
    -          val options = sessionOptions ++ extraOptions
    -
    +    val session = df.sparkSession
    +    val cls = DataSource.lookupDataSource(source, session.sessionState.conf)
    +    if (classOf[TableProvider].isAssignableFrom(cls)) {
    +      val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
    +      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
    +        provider, session.sessionState.conf)
    +      val options = sessionOptions ++ extraOptions
    +      val dsOptions = new DataSourceOptions(options.asJava)
    +      provider.getTable(dsOptions) match {
    +        case table: SupportsBatchWrite =>
    +          val relation = DataSourceV2Relation.create(table, dsOptions)
    +          // TODO: revisit it. We should not create the `AppendData` operator for `SaveMode.Append`.
    +          // We should create new end-users APIs for the `AppendData` operator.
    --- End diff --
    
    Here is what my branch uses for this logic:
    
    ```scala
            val maybeTable = provider.getTable(identifier)
            val exists = maybeTable.isDefined
            (exists, mode) match {
              case (true, SaveMode.ErrorIfExists) =>
                throw new AnalysisException(s"Table already exists: ${identifier.quotedString}")
    
              case (true, SaveMode.Overwrite) =>
                val relation = DataSourceV2Relation.create(
                  catalog.name, identifier, maybeTable.get, options)
    
                runCommand(df.sparkSession, "insertInto") {
                  OverwritePartitionsDynamic.byName(relation, df.logicalPlan)
                }
    
              case (true, SaveMode.Append) =>
                val relation = DataSourceV2Relation.create(
                  catalog.name, identifier, maybeTable.get, options)
    
                runCommand(df.sparkSession, "save") {
                  AppendData.byName(relation, df.logicalPlan)
                }
    
              case (false, SaveMode.Append) |
                   (false, SaveMode.ErrorIfExists) |
                   (false, SaveMode.Ignore) |
                   (false, SaveMode.Overwrite) =>
    
                runCommand(df.sparkSession, "save") {
                  CreateTableAsSelect(catalog, identifier, Seq.empty, df.logicalPlan, options,
                    ignoreIfExists = mode == SaveMode.Ignore)
                }
    
              case _ =>
              // table exists and mode is ignore
            }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239454594
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java ---
    @@ -25,14 +25,14 @@
     import org.apache.spark.sql.types.StructType;
     
     /**
    - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
    + * A mix-in interface for {@link Table}. Data sources can implement this interface to
      * provide data writing ability for batch processing.
      *
      * This interface is used to create {@link BatchWriteSupport} instances when end users run
      * {@code Dataset.write.format(...).option(...).save()}.
      */
     @Evolving
    -public interface BatchWriteSupportProvider extends DataSourceV2 {
    +public interface SupportsBatchWrite extends Table {
    --- End diff --
    
    It is quite confusing to have `BatchWriteSupport` and  `SupportsBatchWrite` to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239563967
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java ---
    @@ -25,7 +25,10 @@
      * The base interface for v2 data sources which don't have a real catalog. Implementations must
      * have a public, 0-arg constructor.
      * <p>
    - * The major responsibility of this interface is to return a {@link Table} for read/write.
    + * The major responsibility of this interface is to return a {@link Table} for read/write. If you
    + * want to allow end-users to write data to non-existing tables via write APIs in `DataFrameWriter`
    + * with `SaveMode`, you must return a {@link Table} instance even if the table doesn't exist. The
    + * table schema can be empty in this case.
    --- End diff --
    
    "Exist" is a relative concept, I suppose. I think we need to somehow allow for create-on-write functionality, even if many table providers won't want to support it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r240028574
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -17,52 +17,49 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import java.util.UUID
    -
    -import scala.collection.JavaConverters._
    +import java.util.{Optional, UUID}
     
     import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
     import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
     import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
     import org.apache.spark.sql.catalyst.util.truncatedString
    -import org.apache.spark.sql.sources.DataSourceRegister
     import org.apache.spark.sql.sources.v2._
     import org.apache.spark.sql.sources.v2.reader._
     import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
     import org.apache.spark.sql.types.StructType
     
     /**
    - * A logical plan representing a data source v2 scan.
    + * A logical plan representing a data source v2 table.
      *
    - * @param source An instance of a [[DataSourceV2]] implementation.
    - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]].
    - * @param userSpecifiedSchema The user-specified schema for this scan.
    + * @param table The table that this relation represents.
    + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
    + *                and [[BatchWriteSupport]].
      */
     case class DataSourceV2Relation(
    -    // TODO: remove `source` when we finish API refactor for write.
    -    source: TableProvider,
    -    table: SupportsBatchRead,
    +    table: Table,
         output: Seq[AttributeReference],
    -    options: Map[String, String],
    -    userSpecifiedSchema: Option[StructType] = None)
    +    // TODO: use a simple case insensitive map instead.
    +    options: DataSourceOptions)
    --- End diff --
    
    It was done it multiple places before:
    https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L62
    https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L153
    https://github.com/apache/spark/pull/23208/files#diff-35ba4ffb5ccb9b18b43226f1d5effa23L171
    
    If you prefer it strongly, I can follow it and update the code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #23208: [SPARK-25530][SQL] data source v2 API refactor (batch wr...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/23208
  
    @cloud-fan, I see that this adds `Table` and uses `TableProvider`, but I was expecting this to also update the write side to mirror the read side, like PR #22190 for [SPARK-25188](https://issues.apache.org/jira/browse/SPARK-25188) (originally proposed in [discussion on SPARK-24882](https://issues.apache.org/jira/browse/SPARK-24882?focusedCommentId=16581725&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16581725)).
    
    The main parts that we discussed there were:
    * Mirror the read side structure by adding WriteConfig. Now, that would be adding a WriteBuilder.
    * Mirroring the read life-cycle of ScanBuilder and Scan, to enable use cases like acquiring and holding a write lock, for example.
    * Using the WriteBuilder to expose more write configuration to support overwrite and dynamic partition overwrite.
    
    We don't need to add the overwrite mix-ins here, but I would expect to see a WriteBuilder that returns a Writer. (`Table -> WriteBuilder -> Write` matches `Table -> ScanBulder -> Scan`.)
    
    The Write would expose BatchWrite and StreamWrite (if they are different) or could directly expose the WriteFactory, commit, abort, etc.
    
    WriteBuilder would be extensible so that SupportsOverwrite and SupportsDynamicOverwrite can be added as mix-ins at some point.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (b...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23208#discussion_r239683592
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -17,52 +17,49 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import java.util.UUID
    -
    -import scala.collection.JavaConverters._
    +import java.util.{Optional, UUID}
     
     import org.apache.spark.sql.{AnalysisException, SaveMode}
     import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
     import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
     import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
     import org.apache.spark.sql.catalyst.util.truncatedString
    -import org.apache.spark.sql.sources.DataSourceRegister
     import org.apache.spark.sql.sources.v2._
     import org.apache.spark.sql.sources.v2.reader._
     import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
     import org.apache.spark.sql.types.StructType
     
     /**
    - * A logical plan representing a data source v2 scan.
    + * A logical plan representing a data source v2 table.
      *
    - * @param source An instance of a [[DataSourceV2]] implementation.
    - * @param options The options for this scan. Used to create fresh [[BatchWriteSupport]].
    - * @param userSpecifiedSchema The user-specified schema for this scan.
    + * @param table The table that this relation represents.
    + * @param options The options for this table operation. It's used to create fresh [[ScanBuilder]]
    + *                and [[BatchWriteSupport]].
      */
     case class DataSourceV2Relation(
    -    // TODO: remove `source` when we finish API refactor for write.
    -    source: TableProvider,
    -    table: SupportsBatchRead,
    +    table: Table,
         output: Seq[AttributeReference],
    -    options: Map[String, String],
    -    userSpecifiedSchema: Option[StructType] = None)
    +    // TODO: use a simple case insensitive map instead.
    +    options: DataSourceOptions)
    --- End diff --
    
    Because this makes the code cleaner, otherwise I need to write more code to convert a map to `DataSourceOptions` multiple times inside `DataSourceV2Relation`.
    
    I don't have a strong preference here, and just pick the easiest approach for me. If you do think using a map here is clearer, I can add these extra code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org