You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rdblue <gi...@git.apache.org> on 2018/05/11 21:05:04 UTC

[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

GitHub user rdblue opened a pull request:

    https://github.com/apache/spark/pull/21305

    [SPARK-24251][SQL] Add AppendData logical plan.

    ## What changes were proposed in this pull request?
    
    This adds a new logical plan, AppendData, that was proposed in SPARK-23521: Standardize SQL logical plans.
    
    * DataFrameWriter uses the new AppendData plan for DataSourceV2 appends
    * AppendData is resolved if its output columns match the incoming data frame
    * A new analyzer rule, ResolveOutputColumns, validates data before it is appended. This rule will add safe casts, rename columns, and checks nullability
    
    ## How was this patch tested?
    
    Existing tests for v2 appends. Will add AppendData tests to validate logical plan analysis.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rdblue/spark SPARK-24251-add-append-data

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21305.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21305
    
----
commit a3bf5300e4a1c9c05a50273f530eaade00d57659
Author: Ryan Blue <bl...@...>
Date:   2018-05-07T15:54:37Z

    SPARK-24251: Add AppendData logical plan.
    
    This adds a new logical plan, AppendData, that was proposed in
    SPARK-23521. This also adds an analyzer rule to validate data written
    with AppendData against the target table. DataFrameWriter is also
    updated so that v2 writes use the new AppendData logical plan.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200824906
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---
    @@ -38,15 +38,16 @@
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
    -   * @param jobId A unique string for the writing job. It's possible that there are many writing
    -   *              jobs running at the same time, and the returned {@link DataSourceWriter} can
    -   *              use this job id to distinguish itself from other jobs.
    +   * @param writeUUID A unique string for the writing job. It's possible that there are many writing
    --- End diff --
    
    This is not the ID of the Spark job that is writing. I think the UUID name is more clear about what is actually passed, a unique string that identifies the write. There's also no need to make the string more complicated than a UUID since there are no guarantees about it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1289/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207043203
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    --- End diff --
    
    I fixed this by always failing if `canWrite` returns false and always adding the `UpCast`.
    
    Now, `canWrite` will return true if the write type can be cast to the read type for atomic types, as determined by `Cast.canSafeCast`. Since it only returns a boolean, we always insert the cast and the optimizer should remove it if it isn't needed.
    
    I also added better error messages. When an error is found, the check will add a clear error message by calling `addError: String => Unit`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94060 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94060/testReport)** for PR 21305 at commit [`897dc39`](https://github.com/apache/spark/commit/897dc3925f3155dec0ca997dc84f1a9d83e9045d).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207623351
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -352,6 +351,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing table.
    + */
    +case class AppendData(
    +    table: NamedRelation,
    +    query: LogicalPlan,
    +    isByName: Boolean) extends LogicalPlan {
    +  override def children: Seq[LogicalPlan] = Seq(query)
    +  override def output: Seq[Attribute] = Seq.empty
    +
    +  override lazy val resolved: Boolean = {
    +    query.output.size == table.output.size && query.output.zip(table.output).forall {
    +      case (inAttr, outAttr) =>
    +          inAttr.name == outAttr.name &&                // names must match
    +          outAttr.dataType.sameType(inAttr.dataType) && // types must match
    --- End diff --
    
    Good catch.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1700/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/755/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94364/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #92723 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92723/testReport)** for PR 21305 at commit [`222d097`](https://github.com/apache/spark/commit/222d097c38e5323505fa0382a874a80201d85185).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200428354
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -172,6 +173,7 @@ class Analyzer(
           ResolveWindowOrder ::
           ResolveWindowFrame ::
           ResolveNaturalAndUsingJoin ::
    +      ResolveOutputRelation ::
    --- End diff --
    
    This rule may add `Projection`, `UpCast`, and `Alias` nodes to the plan, so there are some rules in this batch that should be run after the output is resolved. `ResolveUpCast` will rewrite the casts that were inserted and throw exceptions if the cast would truncate and needs to run after this rule.
    
    I could also create a batch just after resolution for output resolution. We could just run this rule and `ResolveUpCast`. I think the optimizer will handle collapsing `Projection` nodes and aliases are only resolved in this batch, so adding resolved aliases shouldn't be a problem. Would you like a separate batch for output resolution?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    (The last test failure is a known flaky test I've been working (albeit unsuccessfully so far) to find a solution for.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94384 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94384/testReport)** for PR 21305 at commit [`42d86e1`](https://github.com/apache/spark/commit/42d86e1553f345c9879b40b1c20a2addbaf69781).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `              s\"its class is $`
      * `case class Uuid(randomSeed: Option[Long] = None) extends LeafExpression with Stateful`
      * `case class InSubquery(values: Seq[Expression], query: ListQuery)`
      * `trait ExpressionWithRandomSeed `
      * `case class Rand(child: Expression) extends RDG with ExpressionWithRandomSeed `
      * `case class Randn(child: Expression) extends RDG with ExpressionWithRandomSeed `
      * `case class AliasIdentifier(identifier: String, database: Option[String])`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206982199
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    --- End diff --
    
    `DataType.canWrite` only returns a boolean, how can we detect nullability problem there? I mean, if it returns false, then we just add `Upcast` which still doesn't handle the nullability issue.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/796/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r201391861
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -240,21 +238,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     
         val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
         if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    -      val ds = cls.newInstance()
    -      ds match {
    +      val source = cls.newInstance().asInstanceOf[DataSourceV2]
    +      source match {
             case ws: WriteSupport =>
    -          val options = new DataSourceOptions((extraOptions ++
    -            DataSourceV2Utils.extractSessionConfigs(
    -              ds = ds.asInstanceOf[DataSourceV2],
    -              conf = df.sparkSession.sessionState.conf)).asJava)
    -          // Using a timestamp and a random UUID to distinguish different writing jobs. This is good
    -          // enough as there won't be tons of writing jobs created at the same second.
    -          val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
    -            .format(new Date()) + "-" + UUID.randomUUID()
    -          val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
    -          if (writer.isPresent) {
    +          val options = extraOptions ++
    +              DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf)
    +
    +          val relation = DataSourceV2Relation.create(source, options.toMap)
    +          if (mode == SaveMode.Append) {
                 runCommand(df.sparkSession, "save") {
    -              WriteToDataSourceV2(writer.get(), df.logicalPlan)
    +              AppendData.byName(relation, df.logicalPlan)
    +            }
    +
    +          } else {
    +            val writer = ws.createWriter(
    +              UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
    --- End diff --
    
    it's very unlikely to conflict, but UUID+timestamp is even more unlikely, isn't it? I feel it's safer the keep the original logic, e.g. we may have users to look at the temporary directory name and see when the write job was started.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93612 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93612/testReport)** for PR 21305 at commit [`91f2f14`](https://github.com/apache/spark/commit/91f2f14212822b816b31647fbc3724a39717deea).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206745251
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    --- End diff --
    
    now we need to call `resolveOperators` instead of `transform`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r190696654
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -240,21 +238,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     
         val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
         if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    -      val ds = cls.newInstance()
    -      ds match {
    +      val source = cls.newInstance().asInstanceOf[DataSourceV2]
    +      source match {
             case ws: WriteSupport =>
    -          val options = new DataSourceOptions((extraOptions ++
    -            DataSourceV2Utils.extractSessionConfigs(
    -              ds = ds.asInstanceOf[DataSourceV2],
    -              conf = df.sparkSession.sessionState.conf)).asJava)
    -          // Using a timestamp and a random UUID to distinguish different writing jobs. This is good
    -          // enough as there won't be tons of writing jobs created at the same second.
    -          val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
    -            .format(new Date()) + "-" + UUID.randomUUID()
    -          val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
    -          if (writer.isPresent) {
    +          val options = extraOptions ++
    +              DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf)
    +
    +          val relation = DataSourceV2Relation.create(source, options.toMap)
    +          if (mode == SaveMode.Append) {
                 runCommand(df.sparkSession, "save") {
    -              WriteToDataSourceV2(writer.get(), df.logicalPlan)
    +              AppendData.byName(relation, df.logicalPlan)
    +            }
    +
    +          } else {
    +            val writer = ws.createWriter(
    +              UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
    --- End diff --
    
    nit: probably put the timestamp back to minimize the change


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1585/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93208 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93208/testReport)** for PR 21305 at commit [`f041019`](https://github.com/apache/spark/commit/f041019b1ffe8187e47ba89de96daf631dfd56da).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207623469
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -352,6 +351,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing table.
    + */
    +case class AppendData(
    +    table: NamedRelation,
    +    query: LogicalPlan,
    +    isByName: Boolean) extends LogicalPlan {
    +  override def children: Seq[LogicalPlan] = Seq(query)
    +  override def output: Seq[Attribute] = Seq.empty
    +
    +  override lazy val resolved: Boolean = {
    +    query.output.size == table.output.size && query.output.zip(table.output).forall {
    +      case (inAttr, outAttr) =>
    +          inAttr.name == outAttr.name &&                // names must match
    +          outAttr.dataType.sameType(inAttr.dataType) && // types must match
    --- End diff --
    
    Good catch, I agree.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #92627 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92627/testReport)** for PR 21305 at commit [`b906ab1`](https://github.com/apache/spark/commit/b906ab1698d653b35ca7ac1316c08e39d9f59008).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by bersprockets <gi...@git.apache.org>.
Github user bersprockets commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r188960491
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -344,6 +344,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing DataSourceV2 table.
    + */
    +case class AppendData(
    --- End diff --
    
    How does this logical plan node map to the 8 operations outlined in your SPIP?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Do we have a plan to update the data source v2 write API to support `ReplaceData`, `DeleteData`? It seems a simple `WriteMode` flag is not enough for the write API.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207752632
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,97 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError) &&
    +              canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        lazy val extraFields = writeFields.map(_.name).toSet -- readFields.map(_.name)
    +
    +        var result = readFields.forall { readField =>
    +          val fieldContext = context + "." + readField.name
    +          writeFields.find(writeField => resolver(writeField.name, readField.name)) match {
    --- End diff --
    
    I've implemented this as described and added `DataTypeWriteCompatibilitySuite` to validate the behavior.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200824532
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    --- End diff --
    
    Yes, I agree.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200639722
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -172,6 +173,7 @@ class Analyzer(
           ResolveWindowOrder ::
           ResolveWindowFrame ::
           ResolveNaturalAndUsingJoin ::
    +      ResolveOutputRelation ::
    --- End diff --
    
    ah ok then let's just keep it here


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, can you also review this PR for DataSourceV2?
    
    This adds the first of the logical plans proposed in [SPIP: Standardize Logical Plans](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d): `AppendData`. It replaces the current logical node for batch writes and adds schema validation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206745598
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    --- End diff --
    
    shall we use `!=`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1679/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93519/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94074 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94074/testReport)** for PR 21305 at commit [`6aa7f69`](https://github.com/apache/spark/commit/6aa7f6931f102f2db16126665a6ccf1df124489f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    cool! so `WriteSupport` will be used for `AppendData`, CTAS and RTAS? Do we still need the `SaveMode` after we standardize the DDL/DML logical plans?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #90530 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90530/testReport)** for PR 21305 at commit [`a3bf530`](https://github.com/apache/spark/commit/a3bf5300e4a1c9c05a50273f530eaade00d57659).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #92693 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92693/testReport)** for PR 21305 at commit [`487d864`](https://github.com/apache/spark/commit/487d864cb4241ea1521bce00832895e4dbbfe777).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r190711154
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -344,6 +344,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing DataSourceV2 table.
    + */
    +case class AppendData(
    +    table: LogicalPlan,
    --- End diff --
    
    Yes. These plans should not be specific to `DataSourceV2Relation`. In the long term, we want any other relations using `AppendData` to automatically have the same validation rules applied. That means that `AppendData` should be generic and not care what the table implementation is.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94030 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94030/testReport)** for PR 21305 at commit [`897dc39`](https://github.com/apache/spark/commit/897dc3925f3155dec0ca997dc84f1a9d83e9045d).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200711421
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -40,17 +44,24 @@ case class DataSourceV2Relation(
         source: DataSourceV2,
         output: Seq[AttributeReference],
         options: Map[String, String],
    -    userSpecifiedSchema: Option[StructType])
    -  extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
    +    tableIdent: Option[TableIdentifier] = None,
    +    userSpecifiedSchema: Option[StructType] = None)
    +  extends LeafNode with MultiInstanceRelation with NamedRelation with DataSourceV2StringFormat {
     
       import DataSourceV2Relation._
     
    +  override def name: String = {
    +    tableIdent.map(_.unquotedString).getOrElse("unknown")
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94060 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94060/testReport)** for PR 21305 at commit [`897dc39`](https://github.com/apache/spark/commit/897dc3925f3155dec0ca997dc84f1a9d83e9045d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200828393
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -344,6 +344,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing DataSourceV2 table.
    + */
    +case class AppendData(
    +    table: LogicalPlan,
    --- End diff --
    
    Then seems that above code comment can be updated?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93909/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94060/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1827/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94074 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94074/testReport)** for PR 21305 at commit [`6aa7f69`](https://github.com/apache/spark/commit/6aa7f6931f102f2db16126665a6ccf1df124489f).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93908 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93908/testReport)** for PR 21305 at commit [`ac7cb13`](https://github.com/apache/spark/commit/ac7cb13644a3f80c1627826513fa525154cf2d00).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    LGTM, pending jenkins


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1905/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94352 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94352/testReport)** for PR 21305 at commit [`e81790d`](https://github.com/apache/spark/commit/e81790d072ed66f1126d5918bd1a39222a9f5cfa).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r208090280
  
    --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala ---
    @@ -0,0 +1,395 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.types
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.SparkFunSuite
    +import org.apache.spark.sql.catalyst.analysis
    +import org.apache.spark.sql.catalyst.expressions.Cast
    +
    +class DataTypeWriteCompatibilitySuite extends SparkFunSuite {
    --- End diff --
    
    I'm planning on adding this, but it would be great to get this in and I'll add the tests next. It would be great to get this in to no longer keep rebasing it! Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #92780 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92780/testReport)** for PR 21305 at commit [`65c2670`](https://github.com/apache/spark/commit/65c26703cd381034bc2cf4b465b4dccc7012fdce).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206979097
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---
    @@ -38,15 +38,16 @@
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
    -   * @param jobId A unique string for the writing job. It's possible that there are many writing
    -   *              jobs running at the same time, and the returned {@link DataSourceWriter} can
    -   *              use this job id to distinguish itself from other jobs.
    +   * @param writeUUID A unique string for the writing job. It's possible that there are many writing
    +   *                  jobs running at the same time, and the returned {@link DataSourceWriter} can
    +   *                  use this job id to distinguish itself from other jobs.
        * @param schema the schema of the data to be written.
        * @param mode the save mode which determines what to do when the data are already in this data
        *             source, please refer to {@link SaveMode} for more details.
        * @param options the options for the returned data source writer, which is an immutable
        *                case-insensitive string-to-string map.
    +   * @return a writer to append data to this data source
    --- End diff --
    
    The data source API only handles writes/appends and reads. The high-level logical combine append writes with other operations.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/736/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93208 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93208/testReport)** for PR 21305 at commit [`f041019`](https://github.com/apache/spark/commit/f041019b1ffe8187e47ba89de96daf631dfd56da).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93612/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200802415
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    --- End diff --
    
    shall we add a runtime null check instead of failing at the beginning?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93909 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93909/testReport)** for PR 21305 at commit [`922dc16`](https://github.com/apache/spark/commit/922dc164f6950b78223e2e421643bcce5b72a787).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r190695611
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -344,6 +344,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing DataSourceV2 table.
    + */
    +case class AppendData(
    +    table: LogicalPlan,
    --- End diff --
    
    Do we still want to represent the table as a generic LogicalPlan rather than a separate table type? (Maybe the answer's yes; I'm not really clear on why this happens in the current code.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200802431
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    --- End diff --
    
    this check is already done in https://github.com/apache/spark/pull/21305/files#diff-57b3d87be744b7d79a9beacf8e5e5eb2R2152


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #90535 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90535/testReport)** for PR 21305 at commit [`d2e4c41`](https://github.com/apache/spark/commit/d2e4c41c3dc139cc602270d2ed9bdbbb02fd50be).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200423206
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    +          throw new AnalysisException(
    +            s"""Cannot write to '$tableName', not enough data columns:
    +               |Table columns: ${expected.map(_.name).mkString(", ")}
    +               |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +        }
    +
    +        query.output.zip(expected).flatMap {
    +          case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
    +            errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +            None
    +
    +          case (inAttr, outAttr)
    +            if !inAttr.dataType.sameType(outAttr.dataType) || inAttr.name != outAttr.name =>
    +            Some(upcast(inAttr, outAttr))
    +
    +          case (inAttr, _) =>
    +            Some(inAttr) // matches nullability, datatype, and name
    +        }
    +      }
    +
    +      if (errors.nonEmpty) {
    +        throw new AnalysisException(
    +          s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}")
    +      }
    +
    +      Project(resolved, query)
    +    }
    +
    +    private def upcast(inAttr: NamedExpression, outAttr: Attribute): NamedExpression = {
    +      Alias(
    +        UpCast(inAttr, outAttr.dataType, Seq()), outAttr.name
    --- End diff --
    
    The purpose of `UpCast` here is to prevent Spark from automatically inserting casts that could lose information, like `long` to `int` or `string` to `int`.
    
    I would support the same for `string` to `boolean` to catch destructive problems from accidental column alignment (in SQL) or similar errors. The main problem here is that Spark inserts casts instead of alerting the user that there's a problem. When the write succeeds, it may be a while before the user realizes the mistake and can't recover the original data.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94257 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94257/testReport)** for PR 21305 at commit [`618a79d`](https://github.com/apache/spark/commit/618a79dfebf52710e3d86abbde35c65910b91a81).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200802424
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) =>
    --- End diff --
    
    `sameType` ignores nullability, shall we add null check for nested fields too?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92761/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, I'll look into the test failures tomorrow, but this has been passing tests for weeks so I think it is still safe to review when you have time. We can fix both in parallel so that we can get validated writes in 2.4.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r189728174
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -344,6 +344,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing DataSourceV2 table.
    + */
    +case class AppendData(
    --- End diff --
    
    This is `InsertInto`. Wenchen wanted to call it `Append` because that is more specific.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207576664
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,98 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    --- End diff --
    
    We need some tests to verify these behaviors


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #92693 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92693/testReport)** for PR 21305 at commit [`487d864`](https://github.com/apache/spark/commit/487d864cb4241ea1521bce00832895e4dbbfe777).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200230131
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -172,6 +173,7 @@ class Analyzer(
           ResolveWindowOrder ::
           ResolveWindowFrame ::
           ResolveNaturalAndUsingJoin ::
    +      ResolveOutputRelation ::
    --- End diff --
    
    I feel this rule doesn't need to be in the giant resolution batch. Shall we put it in the `Post-Hoc Resolution` batch which run only once?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #92723 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92723/testReport)** for PR 21305 at commit [`222d097`](https://github.com/apache/spark/commit/222d097c38e5323505fa0382a874a80201d85185).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait NamedRelation extends LogicalPlan `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206978261
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    +          throw new AnalysisException(
    +            s"""Cannot write to '$tableName', not enough data columns:
    +               |Table columns: ${expected.map(_.name).mkString(", ")}
    +               |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +        }
    +
    +        query.output.zip(expected).flatMap {
    +          case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
    +            errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +            None
    +
    +          case (inAttr, outAttr)
    +            if !DataType.canWrite(inAttr.dataType, outAttr.dataType, resolver) ||
    --- End diff --
    
    The contract of `DataType.canWrite` is that the data written is compatible with the table's read schema. It should allow promotion from `int` to `long` and `float` to `double` and then use upcast here to write the correct type. I think this is a problem with `canWrite`, not the upcast.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94256 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94256/testReport)** for PR 21305 at commit [`ae0d242`](https://github.com/apache/spark/commit/ae0d2424315634760d46be0f21e0e98160bada5a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206746478
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    --- End diff --
    
    shall we check the nullability for nested fields.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93844 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93844/testReport)** for PR 21305 at commit [`cd5f309`](https://github.com/apache/spark/commit/cd5f309947bb0e711d71c742dff6984852100ddb).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `trait BarrierTaskContext extends TaskContext `
      * `class BarrierTaskInfo(val address: String)`
      * `class RDDBarrier[T: ClassTag](rdd: RDD[T]) `
      * `case class WorkerOffer(`
      * `case class Shuffle(child: Expression, randomSeed: Option[Long] = None)`
      * `case class ReplicateRows(children: Seq[Expression]) extends Generator with CodegenFallback `
      * `trait AnalysisHelper extends QueryPlan[LogicalPlan] `
      * `case class Intersect(`
      * `case class Except(`
      * `case class RandomIndicesGenerator(randomSeed: Long) `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206977289
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    +          throw new AnalysisException(
    +            s"""Cannot write to '$tableName', not enough data columns:
    +               |Table columns: ${expected.map(_.name).mkString(", ")}
    +               |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +        }
    +
    +        query.output.zip(expected).flatMap {
    --- End diff --
    
    This handles both append cases, write by name and write by position. This block is checking by position. I'll see if I can refactor the checks into a private method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94030 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94030/testReport)** for PR 21305 at commit [`897dc39`](https://github.com/apache/spark/commit/897dc3925f3155dec0ca997dc84f1a9d83e9045d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92693/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #92627 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92627/testReport)** for PR 21305 at commit [`b906ab1`](https://github.com/apache/spark/commit/b906ab1698d653b35ca7ac1316c08e39d9f59008).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207942336
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,124 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  private val SparkGeneratedName = """col\d+""".r
    +  private def isSparkGeneratedName(name: String): Boolean = name match {
    +    case SparkGeneratedName(_*) => true
    +    case _ => false
    +  }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        // run compatibility check first to produce all error messages
    +        val typesCompatible =
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +
    +        // run compatibility check first to produce all error messages
    +        val keyCompatible =
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError)
    +        val valueCompatible =
    +          canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        val typesCompatible = keyCompatible && valueCompatible
    +
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        var fieldCompatible = true
    +        readFields.zip(writeFields).foreach {
    +          case (rField, wField) =>
    +            val namesMatch = resolver(wField.name, rField.name) || isSparkGeneratedName(wField.name)
    --- End diff --
    
    ah i saw the test cases, makes sense to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206747528
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    +          throw new AnalysisException(
    +            s"""Cannot write to '$tableName', not enough data columns:
    +               |Table columns: ${expected.map(_.name).mkString(", ")}
    +               |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +        }
    +
    +        query.output.zip(expected).flatMap {
    +          case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
    +            errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +            None
    +
    +          case (inAttr, outAttr)
    +            if !DataType.canWrite(inAttr.dataType, outAttr.dataType, resolver) ||
    --- End diff --
    
    can't we always do upCast? if it can write, the upCast will be a no-op and removed by optimizer.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207043344
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    +          throw new AnalysisException(
    +            s"""Cannot write to '$tableName', not enough data columns:
    +               |Table columns: ${expected.map(_.name).mkString(", ")}
    +               |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +        }
    +
    +        query.output.zip(expected).flatMap {
    --- End diff --
    
    I refactored these into a helper method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94339/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Thanks for reviewing, @cloud-fan!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207938814
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,124 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  private val SparkGeneratedName = """col\d+""".r
    +  private def isSparkGeneratedName(name: String): Boolean = name match {
    +    case SparkGeneratedName(_*) => true
    +    case _ => false
    +  }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        // run compatibility check first to produce all error messages
    +        val typesCompatible =
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +
    +        // run compatibility check first to produce all error messages
    +        val keyCompatible =
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError)
    +        val valueCompatible =
    +          canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        val typesCompatible = keyCompatible && valueCompatible
    +
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        var fieldCompatible = true
    +        readFields.zip(writeFields).foreach {
    +          case (rField, wField) =>
    +            val namesMatch = resolver(wField.name, rField.name) || isSparkGeneratedName(wField.name)
    +            val fieldContext = s"$context.${rField.name}"
    +            val typesCompatible =
    +              canWrite(wField.dataType, rField.dataType, resolver, fieldContext, addError)
    +
    +            if (!namesMatch) {
    +              addError(s"Struct '$context' field name does not match (may be out of order): " +
    +                  s"expected '${rField.name}', found '${wField.name}'")
    +              fieldCompatible = false
    +            } else if (!rField.nullable && wField.nullable) {
    +              addError(s"Cannot write nullable values to non-null field: '$fieldContext'")
    +              fieldCompatible = false
    +            } else if (!typesCompatible) {
    +              // errors are added in the recursive call to canWrite above
    +              fieldCompatible = false
    +            }
    +        }
    +
    +        if (readFields.size > writeFields.size) {
    +          val missingFieldsStr = readFields.takeRight(readFields.size - writeFields.size)
    +                  .filterNot(_.nullable).map(f => s"'${f.name}'").mkString(", ")
    +          if (missingFieldsStr.nonEmpty) {
    +            addError(s"Struct '$context' missing required (non-null) fields: $missingFieldsStr")
    --- End diff --
    
    we don't have this feature yet(filling nulls for nullable missing fields), so we should not allow it here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94352 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94352/testReport)** for PR 21305 at commit [`e81790d`](https://github.com/apache/spark/commit/e81790d072ed66f1126d5918bd1a39222a9f5cfa).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3157/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94364 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94364/testReport)** for PR 21305 at commit [`e81790d`](https://github.com/apache/spark/commit/e81790d072ed66f1126d5918bd1a39222a9f5cfa).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207575203
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -352,6 +351,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing table.
    + */
    +case class AppendData(
    +    table: NamedRelation,
    +    query: LogicalPlan,
    +    isByName: Boolean) extends LogicalPlan {
    +  override def children: Seq[LogicalPlan] = Seq(query)
    +  override def output: Seq[Attribute] = Seq.empty
    +
    +  override lazy val resolved: Boolean = {
    +    query.output.size == table.output.size && query.output.zip(table.output).forall {
    +      case (inAttr, outAttr) =>
    +          inAttr.name == outAttr.name &&                // names must match
    +          outAttr.dataType.sameType(inAttr.dataType) && // types must match
    --- End diff --
    
    we should use `DataType.equalsIgnoreCompatibleNullability`, otherwise the nullability check blew does not consider nullability of inner fields, like struct field, array element, etc.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200230564
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala ---
    @@ -203,33 +203,33 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
             val path = file.getCanonicalPath
             assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
     
    -        spark.range(10).select('id, -'id).write.format(cls.getName)
    +        spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
    --- End diff --
    
    are these required changes?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206746383
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -352,6 +351,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing table.
    + */
    +case class AppendData(
    +    table: NamedRelation,
    +    query: LogicalPlan,
    +    isByName: Boolean) extends LogicalPlan {
    +  override def children: Seq[LogicalPlan] = Seq(query)
    --- End diff --
    
    why is `table` not a child? Then we can't transform the table relation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94074/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207590794
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,98 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    --- End diff --
    
    Agreed. I'm already working on a suite for `canWrite` and will be adding better tests shortly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200802447
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---
    @@ -38,15 +38,16 @@
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
    -   * @param jobId A unique string for the writing job. It's possible that there are many writing
    -   *              jobs running at the same time, and the returned {@link DataSourceWriter} can
    -   *              use this job id to distinguish itself from other jobs.
    +   * @param writeUUID A unique string for the writing job. It's possible that there are many writing
    --- End diff --
    
    what's the rationale behind this renaming?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1912/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206978506
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -352,6 +351,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing table.
    + */
    +case class AppendData(
    +    table: NamedRelation,
    +    query: LogicalPlan,
    +    isByName: Boolean) extends LogicalPlan {
    +  override def children: Seq[LogicalPlan] = Seq(query)
    --- End diff --
    
    What transforms would be enabled by making `table` a child? Do we want to transform the relation? It is fixed so I didn't think that was a good idea.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r208090428
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,124 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  private val SparkGeneratedName = """col\d+""".r
    +  private def isSparkGeneratedName(name: String): Boolean = name match {
    +    case SparkGeneratedName(_*) => true
    +    case _ => false
    +  }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        // run compatibility check first to produce all error messages
    +        val typesCompatible =
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +
    +        // run compatibility check first to produce all error messages
    +        val keyCompatible =
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError)
    +        val valueCompatible =
    +          canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        val typesCompatible = keyCompatible && valueCompatible
    +
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        var fieldCompatible = true
    +        readFields.zip(writeFields).foreach {
    +          case (rField, wField) =>
    +            val namesMatch = resolver(wField.name, rField.name) || isSparkGeneratedName(wField.name)
    +            val fieldContext = s"$context.${rField.name}"
    +            val typesCompatible =
    +              canWrite(wField.dataType, rField.dataType, resolver, fieldContext, addError)
    +
    +            if (!namesMatch) {
    +              addError(s"Struct '$context' field name does not match (may be out of order): " +
    +                  s"expected '${rField.name}', found '${wField.name}'")
    +              fieldCompatible = false
    +            } else if (!rField.nullable && wField.nullable) {
    +              addError(s"Cannot write nullable values to non-null field: '$fieldContext'")
    +              fieldCompatible = false
    +            } else if (!typesCompatible) {
    +              // errors are added in the recursive call to canWrite above
    +              fieldCompatible = false
    +            }
    +        }
    +
    +        if (readFields.size > writeFields.size) {
    +          val missingFieldsStr = readFields.takeRight(readFields.size - writeFields.size)
    +                  .filterNot(_.nullable).map(f => s"'${f.name}'").mkString(", ")
    +          if (missingFieldsStr.nonEmpty) {
    +            addError(s"Struct '$context' missing required (non-null) fields: $missingFieldsStr")
    --- End diff --
    
    I'll update it and open an issue for schema evolution. If we support ADD COLUMN then we'll need to do this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, @gatorsmile, is it possible to get this in for 2.4? This validates writes to data source tables so I think it is a good one to have.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, I've added a suite for the `DataType.canWrite`. I still need to add tests for the analyzer rule to make sure it catches any problems and so to validate that AppendData's `resolved` check. If you want to look at the `canWrite` suite in the mean time, I think it's ready for review.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207043490
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    +          throw new AnalysisException(
    +            s"""Cannot write to '$tableName', not enough data columns:
    +               |Table columns: ${expected.map(_.name).mkString(", ")}
    +               |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +        }
    +
    +        query.output.zip(expected).flatMap {
    +          case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
    +            errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +            None
    +
    +          case (inAttr, outAttr)
    +            if !DataType.canWrite(inAttr.dataType, outAttr.dataType, resolver) ||
    --- End diff --
    
    I updated this to use your suggestion: now it always adds the cast.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, @gatorsmile, this has been ready for final review for a while. Do you think you'll have some time to look at it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1711/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207724886
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,97 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError) &&
    +              canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        lazy val extraFields = writeFields.map(_.name).toSet -- readFields.map(_.name)
    +
    +        var result = readFields.forall { readField =>
    +          val fieldContext = context + "." + readField.name
    +          writeFields.find(writeField => resolver(writeField.name, readField.name)) match {
    --- End diff --
    
    ah, makes sense


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206746209
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    +          throw new AnalysisException(
    +            s"""Cannot write to '$tableName', not enough data columns:
    +               |Table columns: ${expected.map(_.name).mkString(", ")}
    +               |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +        }
    +
    +        query.output.zip(expected).flatMap {
    --- End diff --
    
    are these checks duplicated?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93519 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93519/testReport)** for PR 21305 at commit [`f041019`](https://github.com/apache/spark/commit/f041019b1ffe8187e47ba89de96daf631dfd56da).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207576303
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,97 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError) &&
    +              canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        lazy val extraFields = writeFields.map(_.name).toSet -- readFields.map(_.name)
    +
    +        var result = readFields.forall { readField =>
    +          val fieldContext = context + "." + readField.name
    +          writeFields.find(writeField => resolver(writeField.name, readField.name)) match {
    --- End diff --
    
    is it safe to match struct fields by name? How do we  reorder the nested fields in `ResolveOutputRelation`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94030/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90530/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    I don't think we need (or want) `SaveMode` passed to writers after standardizing. Uses of `WriteSupport` will always append data to an existing table, which makes it simpler for writers. And it will be used for all writes.
    
    A couple other notes:
    * We will also need a `StagedTable` variant of `DeleteSupport` to support `ReplaceData` as an atomic operation, but I want to get the non-atomic variants in first; hopefully for 2.4.0.
    * RTAS would use `dropTable` and not `DeleteSupport`, since it is at the table level.
    * We may still use `SaveMode` in the DF writer API, which is still under discussion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93908/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94373 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94373/testReport)** for PR 21305 at commit [`e81790d`](https://github.com/apache/spark/commit/e81790d072ed66f1126d5918bd1a39222a9f5cfa).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200423733
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -40,17 +44,24 @@ case class DataSourceV2Relation(
         source: DataSourceV2,
         output: Seq[AttributeReference],
         options: Map[String, String],
    -    userSpecifiedSchema: Option[StructType])
    -  extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
    +    tableIdent: Option[TableIdentifier] = None,
    +    userSpecifiedSchema: Option[StructType] = None)
    +  extends LeafNode with MultiInstanceRelation with NamedRelation with DataSourceV2StringFormat {
     
       import DataSourceV2Relation._
     
    +  override def name: String = {
    +    tableIdent.map(_.unquotedString).getOrElse("unknown")
    --- End diff --
    
    That's the name of the data source, not the name of the table. I'd be fine with updating this if you want to include the source name. What about `s"${source.name}:unknown"`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94257/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1532/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1364/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93908 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93908/testReport)** for PR 21305 at commit [`ac7cb13`](https://github.com/apache/spark/commit/ac7cb13644a3f80c1627826513fa525154cf2d00).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, I've updated this and the tests are passing, so I think it is ready for another look.
    
    I just pushed a comments-only commit to fix the Javadoc for AppendData that @viirya pointed out (thanks!). Since that's only comments, it shouldn't affect test results.
    
    I think there's just one more point under discussion, which is the change from `jobId` (which isn't one) to `jobUUID` and dropping the timestamp. I don't think a timestamp helps avoid conflicts because it is astronomically unlikely that UUIDs will collide.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94256 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94256/testReport)** for PR 21305 at commit [`ae0d242`](https://github.com/apache/spark/commit/ae0d2424315634760d46be0f21e0e98160bada5a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207937834
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,124 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  private val SparkGeneratedName = """col\d+""".r
    +  private def isSparkGeneratedName(name: String): Boolean = name match {
    +    case SparkGeneratedName(_*) => true
    +    case _ => false
    +  }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        // run compatibility check first to produce all error messages
    +        val typesCompatible =
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +
    +        // run compatibility check first to produce all error messages
    +        val keyCompatible =
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError)
    +        val valueCompatible =
    +          canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        val typesCompatible = keyCompatible && valueCompatible
    +
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        var fieldCompatible = true
    +        readFields.zip(writeFields).foreach {
    --- End diff --
    
    the scala `zip` doesn't check the length, we should return false if the length doesn't match, or call [Seq#corresponds](https://www.scala-lang.org/api/current/scala/collection/Seq.html#corresponds[B](that:scala.collection.GenSeq[B])(p:(A,B)=%3EBoolean):Boolean)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94364 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94364/testReport)** for PR 21305 at commit [`e81790d`](https://github.com/apache/spark/commit/e81790d072ed66f1126d5918bd1a39222a9f5cfa).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207591742
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,97 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError) &&
    +              canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        lazy val extraFields = writeFields.map(_.name).toSet -- readFields.map(_.name)
    +
    +        var result = readFields.forall { readField =>
    +          val fieldContext = context + "." + readField.name
    +          writeFields.find(writeField => resolver(writeField.name, readField.name)) match {
    --- End diff --
    
    Yeah, I've been thinking about whether the rest of the code actually implements the rules that `canWrite` expects. This may not work, but I don't think it makes sense to add nested struct reordering if it isn't done already in this commit.
    
    Should we disable nested structures, or go ahead with this validation and fix reordering later?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1897/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200421789
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala ---
    @@ -203,33 +203,33 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
             val path = file.getCanonicalPath
             assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
     
    -        spark.range(10).select('id, -'id).write.format(cls.getName)
    +        spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName)
    --- End diff --
    
    Yes. The new resolution rule validates the dataframe that will be written to the table.
    
    Because this uses the `DataFrameWriter` API, it matches columns by name because there isn't a strong expectation for ordering in the dataframe API (e.g. `withColumn` doesn't specify where the new column is added).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200230408
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -40,17 +44,24 @@ case class DataSourceV2Relation(
         source: DataSourceV2,
         output: Seq[AttributeReference],
         options: Map[String, String],
    -    userSpecifiedSchema: Option[StructType])
    -  extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
    +    tableIdent: Option[TableIdentifier] = None,
    +    userSpecifiedSchema: Option[StructType] = None)
    +  extends LeafNode with MultiInstanceRelation with NamedRelation with DataSourceV2StringFormat {
     
       import DataSourceV2Relation._
     
    +  override def name: String = {
    +    tableIdent.map(_.unquotedString).getOrElse("unknown")
    --- End diff --
    
    shall we use data source short name or its full class name instead of `unknown`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94352/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92627/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #92780 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92780/testReport)** for PR 21305 at commit [`65c2670`](https://github.com/apache/spark/commit/65c26703cd381034bc2cf4b465b4dccc7012fdce).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206976856
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    --- End diff --
    
    This is done by `DataType.canWrite`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1921/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, I think we can ignore that last test failure because tests are passing on the last commit that made real changes. The latest commit only changed a comment.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207015951
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,100 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !DataType.canWrite(outAttr.dataType, inAttr.dataType, resolver) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    +          throw new AnalysisException(
    +            s"""Cannot write to '$tableName', not enough data columns:
    +               |Table columns: ${expected.map(_.name).mkString(", ")}
    +               |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +        }
    +
    +        query.output.zip(expected).flatMap {
    +          case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
    +            errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +            None
    +
    +          case (inAttr, outAttr)
    +            if !DataType.canWrite(inAttr.dataType, outAttr.dataType, resolver) ||
    --- End diff --
    
    Sorry, I thought you were asking whether upcast is necessary. We could probably always upcast.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1584/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, I'll fix the conflicts and re-run tests. Yesterday's tests passed after I updated for your feedback. I'd like to try to get this in soon because it is taking so much time to resolve conflicts without any real changes.
    
    FYI @gatorsmile, @bersprockets, @jzhuge


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #90530 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90530/testReport)** for PR 21305 at commit [`a3bf530`](https://github.com/apache/spark/commit/a3bf5300e4a1c9c05a50273f530eaade00d57659).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r190685985
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -344,6 +344,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing DataSourceV2 table.
    + */
    +case class AppendData(
    --- End diff --
    
    I updated the SPIP doc to have both names.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200230683
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -240,21 +238,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     
         val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
         if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    -      val ds = cls.newInstance()
    -      ds match {
    +      val source = cls.newInstance().asInstanceOf[DataSourceV2]
    +      source match {
             case ws: WriteSupport =>
    -          val options = new DataSourceOptions((extraOptions ++
    -            DataSourceV2Utils.extractSessionConfigs(
    -              ds = ds.asInstanceOf[DataSourceV2],
    -              conf = df.sparkSession.sessionState.conf)).asJava)
    -          // Using a timestamp and a random UUID to distinguish different writing jobs. This is good
    -          // enough as there won't be tons of writing jobs created at the same second.
    -          val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
    -            .format(new Date()) + "-" + UUID.randomUUID()
    -          val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
    -          if (writer.isPresent) {
    +          val options = extraOptions ++
    +              DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf)
    +
    +          val relation = DataSourceV2Relation.create(source, options.toMap)
    +          if (mode == SaveMode.Append) {
                 runCommand(df.sparkSession, "save") {
    -              WriteToDataSourceV2(writer.get(), df.logicalPlan)
    +              AppendData.byName(relation, df.logicalPlan)
    +            }
    +
    +          } else {
    +            val writer = ws.createWriter(
    +              UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
    --- End diff --
    
    +1. The timestamp was added to reduce the possibility of conflicting job ids.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1887/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94339 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94339/testReport)** for PR 21305 at commit [`e81790d`](https://github.com/apache/spark/commit/e81790d072ed66f1126d5918bd1a39222a9f5cfa).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, I've rebased and updated with the requested change to disallow missing columns, even if they're optional. Thanks for reviewing!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, I've updated this with the requested changes. Thanks for looking at it!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200824602
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) =>
    --- End diff --
    
    Yes, I'll update to check nested fields.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200825129
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -240,21 +238,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     
         val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
         if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    -      val ds = cls.newInstance()
    -      ds match {
    +      val source = cls.newInstance().asInstanceOf[DataSourceV2]
    +      source match {
             case ws: WriteSupport =>
    -          val options = new DataSourceOptions((extraOptions ++
    -            DataSourceV2Utils.extractSessionConfigs(
    -              ds = ds.asInstanceOf[DataSourceV2],
    -              conf = df.sparkSession.sessionState.conf)).asJava)
    -          // Using a timestamp and a random UUID to distinguish different writing jobs. This is good
    -          // enough as there won't be tons of writing jobs created at the same second.
    -          val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
    -            .format(new Date()) + "-" + UUID.randomUUID()
    -          val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
    -          if (writer.isPresent) {
    +          val options = extraOptions ++
    +              DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf)
    +
    +          val relation = DataSourceV2Relation.create(source, options.toMap)
    +          if (mode == SaveMode.Append) {
                 runCommand(df.sparkSession, "save") {
    -              WriteToDataSourceV2(writer.get(), df.logicalPlan)
    +              AppendData.byName(relation, df.logicalPlan)
    +            }
    +
    +          } else {
    +            val writer = ws.createWriter(
    +              UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
    --- End diff --
    
    How would random UUIDs conflict?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, I've rebased this so it is ready for final review when you get a chance. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207701302
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,97 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError) &&
    +              canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        lazy val extraFields = writeFields.map(_.name).toSet -- readFields.map(_.name)
    +
    +        var result = readFields.forall { readField =>
    +          val fieldContext = context + "." + readField.name
    +          writeFields.find(writeField => resolver(writeField.name, readField.name)) match {
    --- End diff --
    
    since we are not able to reorder the nested fields for now, I think the check here should match it: the struct fields must be matched by position with same name and compatible data type.
    
    If the append mode is by-position, maybe we can ignore the name difference for struct fields.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #90535 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90535/testReport)** for PR 21305 at commit [`d2e4c41`](https://github.com/apache/spark/commit/d2e4c41c3dc139cc602270d2ed9bdbbb02fd50be).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90535/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1826/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206748200
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---
    @@ -38,15 +38,16 @@
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
    -   * @param jobId A unique string for the writing job. It's possible that there are many writing
    -   *              jobs running at the same time, and the returned {@link DataSourceWriter} can
    -   *              use this job id to distinguish itself from other jobs.
    +   * @param writeUUID A unique string for the writing job. It's possible that there are many writing
    +   *                  jobs running at the same time, and the returned {@link DataSourceWriter} can
    +   *                  use this job id to distinguish itself from other jobs.
        * @param schema the schema of the data to be written.
        * @param mode the save mode which determines what to do when the data are already in this data
        *             source, please refer to {@link SaveMode} for more details.
        * @param options the options for the returned data source writer, which is an immutable
        *                case-insensitive string-to-string map.
    +   * @return a writer to append data to this data source
    --- End diff --
    
    non-append cases also call this `createWriter`, shall we remove this line?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200639791
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala ---
    @@ -40,17 +44,24 @@ case class DataSourceV2Relation(
         source: DataSourceV2,
         output: Seq[AttributeReference],
         options: Map[String, String],
    -    userSpecifiedSchema: Option[StructType])
    -  extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
    +    tableIdent: Option[TableIdentifier] = None,
    +    userSpecifiedSchema: Option[StructType] = None)
    +  extends LeafNode with MultiInstanceRelation with NamedRelation with DataSourceV2StringFormat {
     
       import DataSourceV2Relation._
     
    +  override def name: String = {
    +    tableIdent.map(_.unquotedString).getOrElse("unknown")
    --- End diff --
    
    SGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94384 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94384/testReport)** for PR 21305 at commit [`42d86e1`](https://github.com/apache/spark/commit/42d86e1553f345c9879b40b1c20a2addbaf69781).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207624290
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,98 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { tableAttr =>
    +          query.resolveQuoted(tableAttr.name, resolver) match {
    +            case Some(queryExpr) =>
    +              checkField(tableAttr, queryExpr, err => errors += err)
    --- End diff --
    
    I'd much rather pass functions than mutable state into other methods (side-effects). Plus, a function is cleaner because it doesn't require a a particular storage for the caller. If this were in a tight loop, there would be an argument for changing it but this only happens once for a plan.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93612 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93612/testReport)** for PR 21305 at commit [`91f2f14`](https://github.com/apache/spark/commit/91f2f14212822b816b31647fbc3724a39717deea).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92780/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3162/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200801936
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    --- End diff --
    
    Since we don't have an analyzer rule for `ApendData(LogicalPlan ...)`, shall we just require `AppendData.table` to be a `NamedRelation` in class definition?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93208/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94373/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #92761 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92761/testReport)** for PR 21305 at commit [`65c2670`](https://github.com/apache/spark/commit/65c26703cd381034bc2cf4b465b4dccc7012fdce).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #92761 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92761/testReport)** for PR 21305 at commit [`65c2670`](https://github.com/apache/spark/commit/65c26703cd381034bc2cf4b465b4dccc7012fdce).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92723/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r201399506
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
    @@ -240,21 +238,27 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     
         val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
         if (classOf[DataSourceV2].isAssignableFrom(cls)) {
    -      val ds = cls.newInstance()
    -      ds match {
    +      val source = cls.newInstance().asInstanceOf[DataSourceV2]
    +      source match {
             case ws: WriteSupport =>
    -          val options = new DataSourceOptions((extraOptions ++
    -            DataSourceV2Utils.extractSessionConfigs(
    -              ds = ds.asInstanceOf[DataSourceV2],
    -              conf = df.sparkSession.sessionState.conf)).asJava)
    -          // Using a timestamp and a random UUID to distinguish different writing jobs. This is good
    -          // enough as there won't be tons of writing jobs created at the same second.
    -          val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
    -            .format(new Date()) + "-" + UUID.randomUUID()
    -          val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options)
    -          if (writer.isPresent) {
    +          val options = extraOptions ++
    +              DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf)
    +
    +          val relation = DataSourceV2Relation.create(source, options.toMap)
    +          if (mode == SaveMode.Append) {
                 runCommand(df.sparkSession, "save") {
    -              WriteToDataSourceV2(writer.get(), df.logicalPlan)
    +              AppendData.byName(relation, df.logicalPlan)
    +            }
    +
    +          } else {
    +            val writer = ws.createWriter(
    +              UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
    --- End diff --
    
    I see no good reason to over-complicate the unique string passed in. Here's a quote from wikipedia on the chance of a conflict (from [this SO answer](https://stackoverflow.com/questions/24876188/how-big-is-the-chance-to-get-a-java-uuid-randomuuid-collision)):
    
    ```
    Only after generating 1 billion UUIDs every second for the next 100 years, the probability of creating just one duplicate would be about 50%. Or, to put it another way, the probability of one duplicate would be about 50% if every person on earth owned 600 million UUIDs.
    ```
    
    Adding timestamp to a UUID to avoid collisions is unnecessary.
    
    For the other use, why would a user go to the temp directory of some node's file system -- which may not even be used by a given source -- instead of going to the logs? What if the user wants any other piece of information besides the starting timestamp (that's in some format that has to be converted)?
    
    In short, I don't agree with the argument that it is helpful to pass the old format. This is just a carry-over from making fake Hadoop job IDs (why it was called `jobId` and started with `job_`). It's debatable whether the write UUID itself is even useful given that there is no requirement to use it anywhere.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207938162
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,124 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  private val SparkGeneratedName = """col\d+""".r
    +  private def isSparkGeneratedName(name: String): Boolean = name match {
    +    case SparkGeneratedName(_*) => true
    +    case _ => false
    +  }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        // run compatibility check first to produce all error messages
    +        val typesCompatible =
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +
    +        // run compatibility check first to produce all error messages
    +        val keyCompatible =
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError)
    +        val valueCompatible =
    +          canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        val typesCompatible = keyCompatible && valueCompatible
    +
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          typesCompatible
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        var fieldCompatible = true
    +        readFields.zip(writeFields).foreach {
    +          case (rField, wField) =>
    +            val namesMatch = resolver(wField.name, rField.name) || isSparkGeneratedName(wField.name)
    --- End diff --
    
    do we need to check spark generated name? if the number of fields is the same, the generated names also match.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207395603
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -352,6 +351,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing table.
    + */
    +case class AppendData(
    +    table: NamedRelation,
    +    query: LogicalPlan,
    +    isByName: Boolean) extends LogicalPlan {
    +  override def children: Seq[LogicalPlan] = Seq(query)
    --- End diff --
    
    I also see that `InsertIntoTable` doesn't list the relation as a child.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94257 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94257/testReport)** for PR 21305 at commit [`618a79d`](https://github.com/apache/spark/commit/618a79dfebf52710e3d86abbde35c65910b91a81).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207723244
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala ---
    @@ -336,4 +337,97 @@ object DataType {
           case (fromDataType, toDataType) => fromDataType == toDataType
         }
       }
    +
    +  /**
    +   * Returns true if the write data type can be read using the read data type.
    +   *
    +   * The write type is compatible with the read type if:
    +   * - Both types are arrays, the array element types are compatible, and element nullability is
    +   *   compatible (read allows nulls or write does not contain nulls).
    +   * - Both types are maps and the map key and value types are compatible, and value nullability
    +   *   is compatible  (read allows nulls or write does not contain nulls).
    +   * - Both types are structs and each field in the read struct is present in the write struct and
    +   *   compatible (including nullability), or is nullable if the write struct does not contain the
    +   *   field. Write-side structs are not compatible if they contain fields that are not present in
    +   *   the read-side struct.
    +   * - Both types are atomic and the write type can be safely cast to the read type.
    +   *
    +   * Extra fields in write-side structs are not allowed to avoid accidentally writing data that
    +   * the read schema will not read, and to ensure map key equality is not changed when data is read.
    +   *
    +   * @param write a write-side data type to validate against the read type
    +   * @param read a read-side data type
    +   * @return true if data written with the write type can be read using the read type
    +   */
    +  def canWrite(
    +      write: DataType,
    +      read: DataType,
    +      resolver: Resolver,
    +      context: String,
    +      addError: String => Unit = (_: String) => {}): Boolean = {
    +    (write, read) match {
    +      case (wArr: ArrayType, rArr: ArrayType) =>
    +        if (wArr.containsNull && !rArr.containsNull) {
    +          addError(s"Cannot write nullable elements to array of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wArr.elementType, rArr.elementType, resolver, context + ".element", addError)
    +        }
    +
    +      case (wMap: MapType, rMap: MapType) =>
    +        // map keys cannot include data fields not in the read schema without changing equality when
    +        // read. map keys can be missing fields as long as they are nullable in the read schema.
    +        if (wMap.valueContainsNull && !rMap.valueContainsNull) {
    +          addError(s"Cannot write nullable values to map of non-nulls: '$context'")
    +          false
    +        } else {
    +          canWrite(wMap.keyType, rMap.keyType, resolver, context + ".key", addError) &&
    +              canWrite(wMap.valueType, rMap.valueType, resolver, context + ".value", addError)
    +        }
    +
    +      case (StructType(writeFields), StructType(readFields)) =>
    +        lazy val extraFields = writeFields.map(_.name).toSet -- readFields.map(_.name)
    +
    +        var result = readFields.forall { readField =>
    +          val fieldContext = context + "." + readField.name
    +          writeFields.find(writeField => resolver(writeField.name, readField.name)) match {
    --- End diff --
    
    @cloud-fan, After thinking about this more, I think the right thing to do is to match by position and validate names if names are available. We can always make it more permissive later if we think it is reasonable and we have the capability to reorder fields
    
    My reasoning for this is similar to the by-name or by-position mappings. For top-level columns, users expect SQL to match by position and DataFrames to match by name. Structs are similar in that the way a struct is built determines the user's expectation, but structs are always constructed with an explicit field order.
    
    SQL has two methods to create structs:
    * [`named_struct(name1, expr1, ...)`](https://spark.apache.org/docs/2.3.0/api/sql/index.html#named_struct)
    * [`struct(expr1, expr2, ...)`](https://spark.apache.org/docs/2.3.0/api/sql/index.html#struct) uses names `col1`, `col2`, ...
    
    The Dataset API has two methods, [`struct(Column*)` and `struct(String*)`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@struct(colName:String,colNames:String*):org.apache.spark.sql.Column) that use the incoming column names or `col1`, `col2`, etc.
    
    Other methods like `from_json` require a `StructType` so they use named structs. You can also create structs when using a Dataset of case classes or Java beans, in which case the struct has the names from the type.
    
    Looks like users always make structs with a field order and sometimes names. When field names are missing, we should clearly use field order. But even when there are field names, order is still obvious to the user -- unlike the DataFrame API where `withColumn` doesn't give a clear indication of column order. Because order is obvious, we should require the correct order for structs. We should *also* make sure that the field names match since we have that extra information to validate cases like `struct<y int, x int>` written to `struct<x int, y int>`.
    
    If we want to add field reordering in named structs, we can do that later.
    
    Does that sound like a reasonable solution for now? I'll update the code and finish the tests I've been working on.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94256/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200230884
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---
    @@ -344,6 +344,36 @@ case class Join(
       }
     }
     
    +/**
    + * Append data to an existing DataSourceV2 table.
    + */
    +case class AppendData(
    +    table: LogicalPlan,
    --- End diff --
    
    +1 on this. We will still keep data source v1 and file format(for fallback), which doesn't use v2 relation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    LGTM, can you resolve the conflict and address my concern about filling null for nullable missing fields? I think it's ready to go. thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94384/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200824599
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    --- End diff --
    
    I would much rather have a job fail fast and give a clear error message than to fail during a write. I can see how adding such an assertion to the plan could be useful, so I'd consider it if someone wanted to add that feature later. Right now, though, I think this is good.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93519 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93519/testReport)** for PR 21305 at commit [`f041019`](https://github.com/apache/spark/commit/f041019b1ffe8187e47ba89de96daf631dfd56da).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200231214
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    +          throw new AnalysisException(
    +            s"""Cannot write to '$tableName', not enough data columns:
    +               |Table columns: ${expected.map(_.name).mkString(", ")}
    +               |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +        }
    +
    +        query.output.zip(expected).flatMap {
    +          case (inAttr, outAttr) if inAttr.nullable && !outAttr.nullable =>
    +            errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +            None
    +
    +          case (inAttr, outAttr)
    +            if !inAttr.dataType.sameType(outAttr.dataType) || inAttr.name != outAttr.name =>
    +            Some(upcast(inAttr, outAttr))
    +
    +          case (inAttr, _) =>
    +            Some(inAttr) // matches nullability, datatype, and name
    +        }
    +      }
    +
    +      if (errors.nonEmpty) {
    +        throw new AnalysisException(
    +          s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}")
    +      }
    +
    +      Project(resolved, query)
    +    }
    +
    +    private def upcast(inAttr: NamedExpression, outAttr: Attribute): NamedExpression = {
    +      Alias(
    +        UpCast(inAttr, outAttr.dataType, Seq()), outAttr.name
    --- End diff --
    
    Since Upcast is being used in more places, I wanna raise https://github.com/apache/spark/pull/21586 again.
    
    The current Upcast has a bug that it forbids string-to-int, but allows string-to-boolean. Changing it is It's a behavior change so we should make sure the behavior is reasonable. What's our expectation of Upcast here? Are we expecting runtime error when casting string to other types?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1079/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/689/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93909 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93909/testReport)** for PR 21305 at commit [`922dc16`](https://github.com/apache/spark/commit/922dc164f6950b78223e2e421643bcce5b72a787).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94373 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94373/testReport)** for PR 21305 at commit [`e81790d`](https://github.com/apache/spark/commit/e81790d072ed66f1126d5918bd1a39222a9f5cfa).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/782/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r206978690
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/WriteSupport.java ---
    @@ -38,15 +38,16 @@
        * If this method fails (by throwing an exception), the action will fail and no Spark job will be
        * submitted.
        *
    -   * @param jobId A unique string for the writing job. It's possible that there are many writing
    -   *              jobs running at the same time, and the returned {@link DataSourceWriter} can
    -   *              use this job id to distinguish itself from other jobs.
    +   * @param writeUUID A unique string for the writing job. It's possible that there are many writing
    --- End diff --
    
    This is removed in the v2 API redesign.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21305


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93844/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #93844 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93844/testReport)** for PR 21305 at commit [`cd5f309`](https://github.com/apache/spark/commit/cd5f309947bb0e711d71c742dff6984852100ddb).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    **[Test build #94339 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94339/testReport)** for PR 21305 at commit [`e81790d`](https://github.com/apache/spark/commit/e81790d072ed66f1126d5918bd1a39222a9f5cfa).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/21305
  
    @cloud-fan, yes. There is an open PR, #21308, that adds `DeleteSupport`. I'm not pushing for that just yet because I think `DeleteSupport` should be applied to `Table` after #21306 makes it in.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r207573875
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2217,6 +2218,98 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
    +      case append @ AppendData(table, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { tableAttr =>
    +          query.resolveQuoted(tableAttr.name, resolver) match {
    +            case Some(queryExpr) =>
    +              checkField(tableAttr, queryExpr, err => errors += err)
    --- End diff --
    
    nit: seems we can pass the `errors` directly instead of a function.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21305: [SPARK-24251][SQL] Add AppendData logical plan.

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21305#discussion_r200824639
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---
    @@ -2120,6 +2122,99 @@ class Analyzer(
         }
       }
     
    +  /**
    +   * Resolves columns of an output table from the data in a logical plan. This rule will:
    +   *
    +   * - Reorder columns when the write is by name
    +   * - Insert safe casts when data types do not match
    +   * - Insert aliases when column names do not match
    +   * - Detect plans that are not compatible with the output table and throw AnalysisException
    +   */
    +  object ResolveOutputRelation extends Rule[LogicalPlan] {
    +    override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +      case append @ AppendData(table: NamedRelation, query, isByName)
    +          if table.resolved && query.resolved && !append.resolved =>
    +        val projection = resolveOutputColumns(table.name, table.output, query, isByName)
    +
    +        if (projection != query) {
    +          append.copy(query = projection)
    +        } else {
    +          append
    +        }
    +    }
    +
    +    def resolveOutputColumns(
    +        tableName: String,
    +        expected: Seq[Attribute],
    +        query: LogicalPlan,
    +        byName: Boolean): LogicalPlan = {
    +
    +      if (expected.size < query.output.size) {
    +        throw new AnalysisException(
    +          s"""Cannot write to '$tableName', too many data columns:
    +             |Table columns: ${expected.map(_.name).mkString(", ")}
    +             |Data columns: ${query.output.map(_.name).mkString(", ")}""".stripMargin)
    +      }
    +
    +      val errors = new mutable.ArrayBuffer[String]()
    +      val resolved: Seq[NamedExpression] = if (byName) {
    +        expected.flatMap { outAttr =>
    +          query.resolveQuoted(outAttr.name, resolver) match {
    +            case Some(inAttr) if inAttr.nullable && !outAttr.nullable =>
    +              errors += s"Cannot write nullable values to non-null column '${outAttr.name}'"
    +              None
    +
    +            case Some(inAttr) if !outAttr.dataType.sameType(inAttr.dataType) =>
    +              Some(upcast(inAttr, outAttr))
    +
    +            case Some(inAttr) =>
    +              Some(inAttr) // matches nullability, datatype, and name
    +
    +            case _ =>
    +              errors += s"Cannot find data for output column '${outAttr.name}'"
    +              None
    +          }
    +        }
    +
    +      } else {
    +        if (expected.size > query.output.size) {
    --- End diff --
    
    That check is the other direction: not enough columns.
    
    When matching by position, we need to have the same number of columns so we add this check (we already know that there aren't too few columns, so this checks for too many). When matching by name, we can call out specific columns that are missing, which is why we do the validation differently for the two cases.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org