You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/04 08:38:01 UTC

[GitHub] [iceberg] dilipbiswal opened a new pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

dilipbiswal opened a new pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022


   - Group and sort the rows based on partition spec and sort order of target table.
   - If the above is not available, then group and sort rows based on columns from join condition.
   
   **Note:**
   There is proposal from Anton to group and sort rows based on `_file` and `_pos` for unchanged rows and partition spec and sort order for unchanged rows. This can be added in a follow-up.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#issuecomment-753842869


   @aokolnychyi @rdblue
   Kindly review the top commit which has the group + sort implementation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r551543233



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.MERGE_WRITE_CARDINALITY_CHECK
+import org.apache.iceberg.TableProperties.MERGE_WRITE_CARDINALITY_CHECK_DEFAULT
+import org.apache.iceberg.TableProperties.MERGE_WRITE_GLOBAL_SORT_ENABLED
+import org.apache.iceberg.TableProperties.MERGE_WRITE_GLOBAL_SORT_ENABLED_DEFAULT
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, None, Some(prunedTargetPlan),
+          isCountCheckEnabled(target.table))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(plan: LogicalPlan, table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+     val iceTable = table.asInstanceOf[SparkTable].table()
+     val globalSortEnabled = isGlobalSortEnabled(table)
+     val partitionExpressions = toCatalyst(iceTable.spec(), plan)
+     val sortExpressions =  toCatalyst(iceTable.sortOrder(), plan, iceTable.schema())
+    val numShufflePartitions = SQLConf.get.numShufflePartitions
+    (partitionExpressions.isEmpty, sortExpressions.isEmpty) match {

Review comment:
       I don't think the logic here that always adds `RepartitionByExpression` is correct. If the sort is global, that will automatically add the repartition by expression required by that sort, right? And that might not be the same as the partition expressions. When performing a global sort, the partition expressions and sort must be merged to produce a sort that meets the sort requirements within each partition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r551540318



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.MERGE_WRITE_CARDINALITY_CHECK
+import org.apache.iceberg.TableProperties.MERGE_WRITE_CARDINALITY_CHECK_DEFAULT
+import org.apache.iceberg.TableProperties.MERGE_WRITE_GLOBAL_SORT_ENABLED
+import org.apache.iceberg.TableProperties.MERGE_WRITE_GLOBAL_SORT_ENABLED_DEFAULT
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, None, Some(prunedTargetPlan),
+          isCountCheckEnabled(target.table))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(plan: LogicalPlan, table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+     val iceTable = table.asInstanceOf[SparkTable].table()
+     val globalSortEnabled = isGlobalSortEnabled(table)
+     val partitionExpressions = toCatalyst(iceTable.spec(), plan)
+     val sortExpressions =  toCatalyst(iceTable.sortOrder(), plan, iceTable.schema())
+    val numShufflePartitions = SQLConf.get.numShufflePartitions
+    (partitionExpressions.isEmpty, sortExpressions.isEmpty) match {
+      case (true, true) =>
+        if (targetJoinAttrs.nonEmpty) {
+          val repartition = RepartitionByExpression(targetJoinAttrs, plan, numShufflePartitions)
+          Sort(buildSortOrder(targetJoinAttrs), global = globalSortEnabled, repartition)
+        } else {
+          plan
+        }
+      case (true, false) =>
+        val repartition = RepartitionByExpression(sortExpressions, plan, numShufflePartitions)
+        Sort(sortExpressions, global = globalSortEnabled, repartition)
+      case (false, true) =>
+        val repartition = RepartitionByExpression(partitionExpressions, plan, numShufflePartitions)
+        Sort(buildSortOrder(partitionExpressions), global = globalSortEnabled, repartition)
+      case (false, false) =>
+        val repartition = RepartitionByExpression(partitionExpressions, plan, numShufflePartitions)
+        Sort(sortExpressions, global = globalSortEnabled, repartition)
+    }
+  }
+
+  def getTargetOutputCols(target: DataSourceV2Relation): Seq[NamedExpression] = {
+    target.schema.map { col =>
+      target.output.find(attr => SQLConf.get.resolver(attr.name, col.name)).getOrElse {
+        Alias(Literal(null, col.dataType), col.name)()
+      }
+    }
+  }
+
+  def buildSortOrder(exprs: Seq[Expression]): Seq[SortOrder] = {
+    exprs.map { expr =>
+      SortOrder(expr, Ascending, NullsFirst, Set.empty)
+    }
+  }
+
+  def actionOutput(clause: MergeAction, targetOutputCols: Seq[Expression]): Seq[Expression] = {
+    clause match {
+      case u: UpdateAction =>
+        u.assignments.map(_.value) :+ Literal(false)
+      case _: DeleteAction =>
+        targetOutputCols :+ Literal(true)
+      case i: InsertAction =>
+        i.assignments.map(_.value) :+ Literal(false)
+    }
+  }
+
+  def getClauseCondition(clause: MergeAction): Expression = {
+    clause.condition.getOrElse(Literal(true))
+  }
+
+  def isCountCheckEnabled(table: Table): Boolean = {
+    // TODO - can we avoid the cast below ?

Review comment:
       Yes. The Spark table should have a properties map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559057635



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(childPlan: LogicalPlan,
+                     planToResolveFrom: LogicalPlan,
+                     table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+    val iceTable = table.asInstanceOf[SparkTable].table()
+    val globalSortEnabled = isGlobalSortEnabled(table)
+    val partitionExpressions = toCatalyst(toRequiredDistribution(iceTable.spec), planToResolveFrom)
+    val sortExpressions: Seq[SortOrder] =  toCatalyst(toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), false), planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+    val dist = toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), true)
+    val globalSortExprs: Seq[SortOrder] = toCatalyst(dist, planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+
+    /*
+    val globalSortExprs: Seq[SortOrder] = toCatalyst(toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), true), planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+
+     */
+    val numShufflePartitions = SQLConf.get.numShufflePartitions
+
+    (globalSortEnabled, partitionExpressions.isEmpty, sortExpressions.isEmpty) match {
+      case (true, _, _) =>
+        // If global sorting is preferred then build a sort order based on partition and
+        // sort specification of the table. If none is present, then sort based on the
+        // target join key attributes.
+        if (globalSortExprs.nonEmpty) {
+          Sort(globalSortExprs, true, childPlan)
+        } else if (targetJoinAttrs.nonEmpty) {
+          Sort(buildSortOrder(targetJoinAttrs), true, childPlan)
+        } else {
+          childPlan
+        }
+      case (_, true, true) =>
+        // If no partition spec or sort order is defined for the table then repartition and
+        // locally sort the data based on the join key attributes.
+        if (targetJoinAttrs.nonEmpty) {
+          val repartition = RepartitionByExpression(targetJoinAttrs, childPlan, numShufflePartitions)
+          Sort(buildSortOrder(targetJoinAttrs), global = globalSortEnabled, repartition)

Review comment:
       Since `globalSortEnabled` is always false, it would be more clear for these cases to use `global = false`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r560619673



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(childPlan: LogicalPlan,
+                     planToResolveFrom: LogicalPlan,
+                     table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+    val iceTable = table.asInstanceOf[SparkTable].table()
+    val globalSortEnabled = isGlobalSortEnabled(table)
+    val partitionExpressions = toCatalyst(toRequiredDistribution(iceTable.spec), planToResolveFrom)
+    val sortExpressions: Seq[SortOrder] =  toCatalyst(toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), false), planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+    val dist = toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), true)
+    val globalSortExprs: Seq[SortOrder] = toCatalyst(dist, planToResolveFrom).asInstanceOf[Seq[SortOrder]]

Review comment:
       Following up on this, I think we should implement the proposal from our [discussion about Flink hash distribution](https://github.com/apache/iceberg/pull/2064#discussion_r559018257). That PR is going to add `write.distribution-mode` with 3 values: `none`, `hash`, and `range`. Here's what we would use them for here:
   
   | Spark | `none` | `hash` | `range` |
   |-|--------|-------------|--------|
   | unordered | no distribution, locally sort by partition key | hash distribute by partition key, locally sort by partition key | range distribute by partition key, locally sort by partition key |
   | ordered | no distribution, locally sorted | hash distribute by partition key, locally sorted | globally sorted |
   
   Or in terms of the methods you've added:
   
   | Spark | `none` | `partition` | `sort` |
   |-|--------|-------------|--------|
   | unordered | no distribution, local sort by `toOrderedDistribution(spec, order, true)` | distribute by `toClusteredDistribution(spec)`, local sort by `toOrderedDistribution(spec, order, true)` | global sort by `toOrderedDistribution(spec, order, true)` |
   | ordered | no distribution, local sort by `toOrderedDistribution(spec, order, true)` | distribute by `toClusteredDistribution(spec)`, local sort by `toOrderedDistribution(spec, order, true)` | global sort by `toOrderedDistribution(spec, order, true)` |
   
   The result of `toOrderedDistribution(spec, order, true)` is always used to sort, whether locally or globally. If the sort order is unordered, then it will infer the order from the partition spec just as we wanted from the top table. For hash partitioning, we can use `toClusteredDistribution` that you added. Then the only other concern is that we need to add a round-robin shuffle before the global sort.
   
   With that addition, let me revise my code from above:
   
   ```scala
   val distributionMode = table.properties.getOrDefault("write.distribution-mode", "global")
   val order = toCatalyst(toOrderedDistribution(spec, order, true))
   distributionMode.toLower(Locale.ROOT) match {
     case "none" =>
       Sort(order, global = false, childPlan)
     case "hash" =>
       val clustering = toCatalyst(toClusteredDistribution(spec))
       val hashPartitioned = RepartitionByExpression(clustering, childPlan, numShufflePartitions)
       Sort(order, global = false, hashPartitioned)
     case "range" =>
       val roundRobin = Repartition(numShufflePartitions, shuffle = true, childPlan)
       Sort(order, global = true, roundRobin)
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559052435



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -164,4 +164,11 @@ private TableProperties() {
 
   public static final String MERGE_MODE = "write.merge.mode";
   public static final String MERGE_MODE_DEFAULT = "copy-on-write";
+
+  public static final String MERGE_WRITE_CARDINALITY_CHECK = "write.merge.cardinality-check.enabled";
+  public static final boolean MERGE_WRITE_CARDINALITY_CHECK_DEFAULT = true;
+
+  public static final String MERGE_WRITE_SORT_MODE = "write.merge.sort.mode";

Review comment:
       We should consider this with the [context from the thread about Flink write distribution](https://github.com/apache/iceberg/pull/2064#discussion_r559018257).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r551541866



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanHelper.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.catalyst.utils
+
+import java.util.UUID
+import org.apache.iceberg.{PartitionSpec, Schema}
+import org.apache.iceberg.{SortDirection => IcebergSortDirection, SortOrder => IcebergSortOrder}
+import org.apache.iceberg.{NullOrder => IcebergNullOrder}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{AccumulateFiles, Alias, Ascending, Attribute, AttributeReference, Descending, Expression, GreaterThan, Literal, NamedExpression, NullOrdering, NullsFirst, NullsLast, PredicateHelper, SortDirection, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Sum}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DynamicFileFilter, DynamicFileFilterWithCountCheck, Filter, LogicalPlan, Project}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter
+import org.apache.spark.sql.connector.iceberg.write.MergeBuilder
+import org.apache.spark.sql.connector.read.ScanBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, PushDownUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import scala.collection.JavaConverters.iterableAsScalaIterableConverter
+
+trait PlanHelper extends PredicateHelper {
+  val FILE_NAME_COL = "_file"
+  val ROW_POS_COL = "_pos"
+  val ROW_ID_COL = "_row_id"
+  val AFFECTED_FILES_ACC_NAME = "affectedFiles"
+  val AFFECTED_FILES_ACC_ALIAS_NAME = "_affectedFiles_"
+  val SUM_ROW_ID_ALIAS_NAME = "_sum_"
+
+  def buildScanPlan(spark: SparkSession,
+                    table: Table,
+                    output: Seq[AttributeReference],
+                    mergeBuilder: MergeBuilder,
+                    cond: Option[Expression],
+                    prunedTargetPlan: Option[LogicalPlan] = None,
+                    performCountCheckForMerge: Boolean = false): LogicalPlan = {
+
+    val scanBuilder = mergeBuilder.asScanBuilder
+    cond.map(pushDownCondition(_, scanBuilder, output))
+
+    val scan = scanBuilder.build()
+    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
+
+    scan match {
+      case filterable: SupportsFileFilter =>
+        val pruneTablePlan = cond.map(getFileFilterPlan(_, scanRelation)).getOrElse(prunedTargetPlan.get)
+        if (performCountCheckForMerge) {
+          val affectedFilesAcc = new SetAccumulator[String]()
+          spark.sparkContext.register(affectedFilesAcc, AFFECTED_FILES_ACC_NAME)
+          val planWithAccumulator = buildPlanWithFileAccumulator(affectedFilesAcc, pruneTablePlan)
+          DynamicFileFilterWithCountCheck(scanRelation, affectedFilesAcc,
+            planWithAccumulator, filterable, table.name())
+        } else {
+          val matchingFilePlan = buildAggregatePlan(pruneTablePlan)
+          DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
+        }
+      case _ =>
+        scanRelation
+    }
+  }
+
+  private def getFileFilterPlan(cond: Expression, scanRelation: DataSourceV2ScanRelation): LogicalPlan = {
+    Filter(cond, scanRelation)
+  }
+
+  private def pushDownCondition(cond: Expression,
+                                scanBuilder: ScanBuilder,
+                                output: Seq[AttributeReference]): Unit = {
+    val predicates = splitConjunctivePredicates(cond)
+    val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, output)
+    PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
+  }
+
+  private def buildAggregatePlan(prunedTargetPlan: LogicalPlan): LogicalPlan = {
+    val fileAttr = findOutputAttr(prunedTargetPlan, FILE_NAME_COL)
+    Aggregate(Seq(fileAttr), Seq(fileAttr), prunedTargetPlan)
+  }
+
+  private def buildPlanWithFileAccumulator( fileAccum: SetAccumulator[String],
+                                            prunedTargetPlan: LogicalPlan): LogicalPlan = {
+    val fileAttr = findOutputAttr(prunedTargetPlan, FILE_NAME_COL)
+    val rowIdAttr = findOutputAttr(prunedTargetPlan, ROW_ID_COL)
+    val projectList = Seq(fileAttr, rowIdAttr,
+      Alias(AccumulateFiles(fileAccum, fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME )())
+    val projectPlan = Project(projectList, prunedTargetPlan)
+    val affectedFilesAttr = findOutputAttr(projectPlan, AFFECTED_FILES_ACC_ALIAS_NAME)
+    val aggSumCol =
+      Alias(AggregateExpression(Sum(affectedFilesAttr), Complete, false), SUM_ROW_ID_ALIAS_NAME)()
+    val aggPlan = Aggregate(Seq(rowIdAttr), Seq(aggSumCol), projectPlan)
+    val sumAttr = findOutputAttr(aggPlan, SUM_ROW_ID_ALIAS_NAME)
+    val havingExpr = GreaterThan(sumAttr, Literal(1L))
+    Filter(havingExpr, aggPlan)
+  }
+
+  def findOutputAttr(plan: LogicalPlan, attrName: String): Attribute = {
+    val resolver = SQLConf.get.resolver
+    plan.output.find(attr => resolver(attr.name, attrName)).getOrElse {
+      throw new AnalysisException(s"Cannot find $attrName in ${plan.output}")
+    }
+  }
+
+  def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+    val uuid = UUID.randomUUID()
+    LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+  }
+
+  private def toOutputAttrs(schema: StructType, output: Seq[AttributeReference]): Seq[AttributeReference] = {
+    val nameToAttr = output.map(_.name).zip(output).toMap
+    schema.toAttributes.map {
+      a => nameToAttr.get(a.name) match {
+        case Some(ref) =>
+          // keep the attribute id if it was present in the relation
+          a.withExprId(ref.exprId)
+        case _ =>
+          // if the field is new, create a new attribute
+          AttributeReference(a.name, a.dataType, a.nullable, a.metadata)()
+      }
+    }
+  }
+
+  def toCatalyst(partitionSpec: PartitionSpec, plan: LogicalPlan): Seq[Expression] = {
+    val partitionFields = partitionSpec.fields().asScala.toSeq
+    partitionFields.map { field =>
+      findOutputAttr(plan, field.name())
+    }
+  }

Review comment:
       The conversion for partition spec and sort order needs to handle transforms. And the partition field name is not a column in the Spark schema.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#issuecomment-764862696


   Thanks @dilipbiswal and @mehtaashish23! Looks good.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#issuecomment-764862696


   Thanks @dilipbiswal and @mehtaashish23! Looks good.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561408143



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -215,5 +227,41 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
     }
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
+
+  def buildWritePlan(
+     childPlan: LogicalPlan,
+     table: Table): LogicalPlan = {
+    table match {
+      case iceTable: SparkTable =>
+        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
+        val table = iceTable.table()
+        val distributionMode: String = table.properties
+          .getOrDefault("write.distribution-mode", TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)
+        val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan)
+        distributionMode.toLowerCase(Locale.ROOT) match {
+          case TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT =>

Review comment:
       We should rename this to `WRITE_DISTRIBUTION_MODE_NONE` in a follow-up, since the default depends on the engine. We can also add `WRITE_DISTRIBUTION_MODE_FLINK_DEFAULT` and `WRITE_DISTRIBUTION_MODE_SPARK_DEFAULT`.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -215,5 +227,41 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
     }
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
+
+  def buildWritePlan(
+     childPlan: LogicalPlan,
+     table: Table): LogicalPlan = {
+    table match {
+      case iceTable: SparkTable =>
+        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
+        val table = iceTable.table()
+        val distributionMode: String = table.properties
+          .getOrDefault("write.distribution-mode", TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)
+        val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan)
+        distributionMode.toLowerCase(Locale.ROOT) match {

Review comment:
       The Flink commit also added a Java enum for this. We could use that instead of string matching here. It handles case insensitive mapping, too: `DistributionMode.fromName(modeName)`

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeInto.scala
##########
@@ -21,13 +21,12 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 
 case class MergeInto(
     mergeIntoProcessor: MergeIntoParams,
-    targetRelation: DataSourceV2Relation,
+    targetOutput: Seq[Attribute],

Review comment:
       I think this could just be `output` and you wouldn't need to override `def output` below.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeIntoExec.scala
##########
@@ -32,10 +32,10 @@ import org.apache.spark.sql.execution.UnaryExecNode
 
 case class MergeIntoExec(
     mergeIntoParams: MergeIntoParams,
-    @transient targetRelation: DataSourceV2Relation,
+    targetOutput: Seq[Attribute],

Review comment:
       Same here, using `output` would make the method definition unnecessary.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
##########
@@ -81,8 +81,8 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy {
     case ReplaceData(_, batchWrite, query) =>
       ReplaceDataExec(batchWrite, planLater(query)) :: Nil
 
-    case MergeInto(mergeIntoProcessor, targetRelation, child) =>
-      MergeIntoExec(mergeIntoProcessor, targetRelation, planLater(child)) :: Nil
+    case MergeInto(mergeIntoParms, targetAttributes, child) =>
+      MergeIntoExec(mergeIntoParms, targetAttributes, planLater(child)) :: Nil

Review comment:
       I think it would be more clear to use `output` instead of `targetAttributes` here since that's what this is setting, but this is minor.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
##########
@@ -194,4 +220,84 @@ trait RewriteRowLevelOperationHelper extends PredicateHelper with Logging {
       }
     }
   }
+
+  private object BucketTransform {
+    def unapply(transform: Transform): Option[(Int, FieldReference)] = transform match {
+      case bt: BucketTransform => bt.columns match {
+        case Seq(nf: NamedReference) =>
+          Some(bt.numBuckets.value(), FieldReference(nf.fieldNames()))
+        case _ =>
+          None
+      }
+      case _ => None
+    }
+  }
+
+  protected def toCatalyst(
+      distribution: Distribution,
+      plan: LogicalPlan): Seq[catalyst.expressions.Expression] = {
+
+    distribution match {
+      case d: OrderedDistribution =>
+        d.ordering.map(e => toCatalyst(e, plan, resolver))
+      case d: ClusteredDistribution =>
+        d.clustering.map(e => toCatalyst(e, plan, resolver))
+      case _: UnspecifiedDistribution =>
+        Array.empty[catalyst.expressions.Expression]
+    }
+  }
+
+  private def toCatalyst(
+      expr: Expression,
+      query: LogicalPlan,
+      resolver: Resolver): catalyst.expressions.Expression = {
+
+    def resolve(parts: Seq[String]): NamedExpression = {
+      // this part is controversial as we perform resolution in the optimizer
+      // we cannot perform this step in the analyzer since we need to optimize expressions
+      // in nodes like OverwriteByExpression before constructing a logical write
+      query.resolve(parts, resolver) match {
+        case Some(attr) => attr
+        case None => throw new AnalysisException(s"Cannot resolve '${parts.map(quoteIfNeeded).mkString(".")}'" +
+          s" using ${query.output}")
+      }
+    }
+
+    expr match {
+      case s: SortOrder =>
+        val catalystChild = toCatalyst(s.expression(), query, resolver)
+        catalyst.expressions.SortOrder(catalystChild, toCatalyst(s.direction), toCatalyst(s.nullOrdering), Set.empty)
+      case it: IdentityTransform =>
+        resolve(it.ref.fieldNames())
+      case BucketTransform(numBuckets, ref) =>
+        IcebergBucketTransform(numBuckets, resolve(ref.fieldNames))
+      case yt: YearsTransform =>
+        IcebergYearTransform(resolve(yt.ref.fieldNames))
+      case mt: MonthsTransform =>
+        IcebergMonthTransform(resolve(mt.ref.fieldNames))
+      case dt: DaysTransform =>
+        IcebergDayTransform(resolve(dt.ref.fieldNames))
+      case ht: HoursTransform =>
+        IcebergHourTransform(resolve(ht.ref.fieldNames))
+      case ref: FieldReference =>
+        resolve(ref.fieldNames)
+      case _ =>
+        throw new RuntimeException(s"$expr is not currently supported")
+    }
+  }
+

Review comment:
       Nit: extra newline.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -215,5 +227,41 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
     }
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
+
+  def buildWritePlan(
+     childPlan: LogicalPlan,
+     table: Table): LogicalPlan = {
+    table match {
+      case iceTable: SparkTable =>
+        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
+        val table = iceTable.table()
+        val distributionMode: String = table.properties
+          .getOrDefault("write.distribution-mode", TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)

Review comment:
       Isn't `write.distribution-mode` listed in `TableProperties`? We should use the constant.

##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -303,25 +305,107 @@ public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableEx
   }
 
   @Test
-  public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {

Review comment:
       Looks like this test case was accidentally deleted?

##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -303,25 +305,107 @@ public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableEx
   }
 
   @Test
-  public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
+  public void testIdentityPartition()  {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");

Review comment:
       Minor: passing an extra empty string and passing table names embedded in the SQL text.

##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -355,9 +439,17 @@ private void initTable(String tabName) {
     });
   }
 
-  protected void append(String tabName, Employee... employees) throws NoSuchTableException {
-    List<Employee> input = Arrays.asList(employees);
-    Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class);
-    inputDF.coalesce(1).writeTo(tabName).append();
+  private void setWriteMode(String tabName, String mode) {

Review comment:
       I think this should be `setDistributionMode` instead because `setWriteMode` sounds more general, like "copy-on-write" or "merge-on-read".

##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -303,25 +305,107 @@ public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableEx
   }
 
   @Test
-  public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
+  public void testIdentityPartition()  {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
 
-    String sqlText = "MERGE INTO %s AS target " +
-           "USING %s AS source " +
-           "ON target.id = source.id " +
-           "WHEN MATCHED AND target.id = 1 THEN DELETE " +
-           "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+  @Test
+  public void testDaysTransform() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg PARTITIONED BY (days(ts))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg", sourceName);
+      initTable(sourceName);
+      sql("INSERT INTO " + targetName + " VALUES (1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+      sql("INSERT INto " + sourceName + " VALUES (2, timestamp('2001-01-02 00:00:00'))," +
+              "(1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "2001-01-01 00:00:00"), row(2, "2001-01-02 00:00:00")),
+              sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
 
-    String tabName = catalogName + "." + "default.target";
-    String errorMsg = "The same row of target table `" + tabName + "` was identified more than\n" +
-            " once for an update, delete or insert operation of the MERGE statement.";
-    AssertHelpers.assertThrows("Should complain ambiguous row in target",
-           SparkException.class, errorMsg, () -> sql(sqlText, targetName, sourceName));
-    assertEquals("Target should be unchanged",
-           ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
-           sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+  @Test
+  public void testBucketExpression() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
+              " CLUSTERED BY (dep) INTO 2 BUCKETS", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testPartitionedAndOrderedTable() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
+              " PARTITIONED BY (id) CLUSTERED BY (dep) INTO 2 BUCKETS", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));

Review comment:
       Nit: Indentation is off in the new methods. Should be 2 indents or 4 spaces for continuations.

##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -303,25 +305,107 @@ public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableEx
   }
 
   @Test
-  public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
+  public void testIdentityPartition()  {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
 
-    String sqlText = "MERGE INTO %s AS target " +
-           "USING %s AS source " +
-           "ON target.id = source.id " +
-           "WHEN MATCHED AND target.id = 1 THEN DELETE " +
-           "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+  @Test
+  public void testDaysTransform() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg PARTITIONED BY (days(ts))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg", sourceName);
+      initTable(sourceName);
+      sql("INSERT INTO " + targetName + " VALUES (1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+      sql("INSERT INto " + sourceName + " VALUES (2, timestamp('2001-01-02 00:00:00'))," +
+              "(1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "2001-01-01 00:00:00"), row(2, "2001-01-02 00:00:00")),
+              sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
 
-    String tabName = catalogName + "." + "default.target";
-    String errorMsg = "The same row of target table `" + tabName + "` was identified more than\n" +
-            " once for an update, delete or insert operation of the MERGE statement.";
-    AssertHelpers.assertThrows("Should complain ambiguous row in target",
-           SparkException.class, errorMsg, () -> sql(sqlText, targetName, sourceName));
-    assertEquals("Target should be unchanged",
-           ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
-           sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+  @Test
+  public void testBucketExpression() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
+              " CLUSTERED BY (dep) INTO 2 BUCKETS", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testPartitionedAndOrderedTable() {

Review comment:
       Where does this set the table ordering? I would expect it to run `ALTER TABLE %s WRITE ORDERED BY ...`

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -85,7 +96,7 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
           joinedAttributes = joinPlan.output
         )
 
-        val mergePlan = MergeInto(mergeParams, target, joinPlan)
+        val mergePlan = MergeInto(mergeParams, target.output, joinPlan)

Review comment:
       Why not sort this case as well? If the user has requested a sort order on the table, it makes sense to enforce it even if we aren't also rewriting files.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -142,10 +154,10 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
           targetOutput = target.output,
           joinedAttributes = joinPlan.output
         )
-        val mergePlan = MergeInto(mergeParams, target, joinPlan)
+        val mergePlan = MergeInto(mergeParams, target.output, joinPlan)
+        val writePlan = buildWritePlan(mergePlan, target.table)
         val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
-
-        ReplaceData(target, batchWrite, mergePlan)
+        ReplaceData(target, batchWrite, writePlan)

Review comment:
       Nit: newline was removed, which is a whitespace change.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -215,5 +227,42 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
     }
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
+
+  def buildWritePlan(
+     childPlan: LogicalPlan,
+     table: Table): LogicalPlan = {
+    table match {
+      case iceTable: SparkTable =>
+        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
+        val table = iceTable.table()
+        val distributionMode: String = table.properties
+          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)
+        val mode = DistributionMode.fromName(distributionMode)
+        val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan)
+        mode match {

Review comment:
       Minor: The `mode` variable isn't really needed. It could be `DistributionMode.fromName(distributionMode) match { ... }`

##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -324,6 +326,111 @@ public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException
            sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
   }
 
+  @Test
+  public void testIdentityPartition()  {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
+      initTable(targetName);
+      setDistributionMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+
+      String sqlText = "MERGE INTO %s AS target " +
+              "USING %s AS source " +
+              "ON target.id = source.id " +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE " +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, targetName, sourceName);
+      assertEquals("Should have expected rows",
+             ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+             sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testDaysTransform() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg PARTITIONED BY (days(ts))", targetName);
+      initTable(targetName);
+      setDistributionMode(targetName, mode);
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg", sourceName);
+      initTable(sourceName);
+      sql("INSERT INTO " + targetName + " VALUES (1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+      sql("INSERT INto " + sourceName + " VALUES (2, timestamp('2001-01-02 00:00:00'))," +
+              "(1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+
+      String sqlText = "MERGE INTO %s AS target \n" +
+              "USING %s AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, targetName, sourceName);
+      assertEquals("Should have expected rows",
+             ImmutableList.of(row(1, "2001-01-01 00:00:00"), row(2, "2001-01-02 00:00:00")),
+             sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testBucketExpression() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
+              " CLUSTERED BY (dep) INTO 2 BUCKETS", targetName);
+      initTable(targetName);
+      setDistributionMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+      String sqlText = "MERGE INTO %s AS target \n" +
+              "USING %s AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, targetName, sourceName);
+      assertEquals("Should have expected rows",
+             ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+             sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testPartitionedAndOrderedTable() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
+              " PARTITIONED BY (id)", targetName);
+      sql("ALTER TABLE %s  WRITE ORDERED BY (dep)", targetName);
+      initTable(targetName);
+      setDistributionMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");

Review comment:
       Nit: use of "" instead of filling in table names in this test as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559054501



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)

Review comment:
       Looks like this adds the sort in `buildWritePlan` to the joined data, not the merged data. I think that will be less efficient than adding the sort after the merge for two reasons:
   1. Deleted rows will be included, only to be removed on the write nodes
   2. The joined row will contain both the target columns and source columns, so this would be shuffling both the original values and update values in a lot of cases when only the updated values are needed.
   
   I think that it should be this instead:
   
   ```java
   val mergePlan = MergeInto(mergeParams, target, joinPlan)
   val writePlan = buildWritePlan(mergePlan, table)
   val batchWrite = ...
   ReplaceData(target, batchWrite, writePlan)
   ```
   
   I think that will also allow you to remove some of the additional arguments from `buildWritePlan`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559054566



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(childPlan: LogicalPlan,
+                     planToResolveFrom: LogicalPlan,
+                     table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+    val iceTable = table.asInstanceOf[SparkTable].table()

Review comment:
       Rather than using `asInstanceOf`, I think this should return `childPlan` if the table isn't a `SparkTable`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559055006



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -269,6 +279,51 @@ public Transform unknown(int fieldId, String sourceName, int sourceId, String tr
     return transforms.toArray(new Transform[0]);
   }
 
+  public static Distribution toRequiredDistribution(PartitionSpec spec, SortOrder sortOrder, boolean inferFromSpec) {
+    if (sortOrder.isUnsorted()) {
+      if (inferFromSpec) {
+        SortOrder specOrder = Partitioning.sortOrderFor(spec);
+        return Distributions.ordered(convert(specOrder));
+      }
+
+      return Distributions.unspecified();
+    }
+
+    Schema schema = spec.schema();
+    Multimap<Integer, SortField> sortFieldIndex = Multimaps.index(sortOrder.fields(), SortField::sourceId);
+
+    // build a sort prefix of partition fields that are not already in the sort order
+    SortOrder.Builder builder = SortOrder.builderFor(schema);
+    for (PartitionField field : spec.fields()) {
+      Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
+      boolean isSorted = sortFields.stream().anyMatch(sortField ->
+              field.transform().equals(sortField.transform()) ||
+                      sortField.transform().satisfiesOrderOf(field.transform()));
+      if (!isSorted) {
+        String sourceName = schema.findColumnName(field.sourceId());
+        builder.asc(org.apache.iceberg.expressions.Expressions.transform(sourceName, field.transform()));
+      }
+    }
+
+    // add the configured sort to the partition spec prefix sort
+    SortOrderVisitor.visit(sortOrder, new CopySortOrderFields(builder));
+
+    return Distributions.ordered(convert(builder.build()));
+  }
+
+  public static Distribution toRequiredDistribution(PartitionSpec spec) {
+    if (spec.isUnpartitioned()) {

Review comment:
       I think the cases are reversed. If the spec is _un_-partitioned then the distribution should be unspecified. Otherwise, it should be clustered by the transforms.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561461806



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -142,10 +154,10 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
           targetOutput = target.output,
           joinedAttributes = joinPlan.output
         )
-        val mergePlan = MergeInto(mergeParams, target, joinPlan)
+        val mergePlan = MergeInto(mergeParams, target.output, joinPlan)
+        val writePlan = buildWritePlan(mergePlan, target.table)
         val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
-
-        ReplaceData(target, batchWrite, mergePlan)
+        ReplaceData(target, batchWrite, writePlan)

Review comment:
       Nit: newline was removed, which is a whitespace change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r560686019



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(childPlan: LogicalPlan,
+                     planToResolveFrom: LogicalPlan,
+                     table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+    val iceTable = table.asInstanceOf[SparkTable].table()
+    val globalSortEnabled = isGlobalSortEnabled(table)
+    val partitionExpressions = toCatalyst(toRequiredDistribution(iceTable.spec), planToResolveFrom)
+    val sortExpressions: Seq[SortOrder] =  toCatalyst(toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), false), planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+    val dist = toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), true)
+    val globalSortExprs: Seq[SortOrder] = toCatalyst(dist, planToResolveFrom).asInstanceOf[Seq[SortOrder]]

Review comment:
       +1 here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561451604



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -303,25 +305,107 @@ public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableEx
   }
 
   @Test
-  public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
+  public void testIdentityPartition()  {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
 
-    String sqlText = "MERGE INTO %s AS target " +
-           "USING %s AS source " +
-           "ON target.id = source.id " +
-           "WHEN MATCHED AND target.id = 1 THEN DELETE " +
-           "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+  @Test
+  public void testDaysTransform() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg PARTITIONED BY (days(ts))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg", sourceName);
+      initTable(sourceName);
+      sql("INSERT INTO " + targetName + " VALUES (1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+      sql("INSERT INto " + sourceName + " VALUES (2, timestamp('2001-01-02 00:00:00'))," +
+              "(1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "2001-01-01 00:00:00"), row(2, "2001-01-02 00:00:00")),
+              sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
 
-    String tabName = catalogName + "." + "default.target";
-    String errorMsg = "The same row of target table `" + tabName + "` was identified more than\n" +
-            " once for an update, delete or insert operation of the MERGE statement.";
-    AssertHelpers.assertThrows("Should complain ambiguous row in target",
-           SparkException.class, errorMsg, () -> sql(sqlText, targetName, sourceName));
-    assertEquals("Target should be unchanged",
-           ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
-           sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+  @Test
+  public void testBucketExpression() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
+              " CLUSTERED BY (dep) INTO 2 BUCKETS", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testPartitionedAndOrderedTable() {

Review comment:
       @rdblue Sorry, i didn't know about this extension before. I have made the change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559055531



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanHelper.scala
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.catalyst.utils
+
+import java.util.UUID
+import org.apache.spark.sql
+import org.apache.spark.sql.{catalyst, AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{AccumulateFiles, Alias, Attribute, AttributeReference, GreaterThan, IcebergBucketTransform, IcebergDayTransform, IcebergHourTransform, IcebergMonthTransform, IcebergYearTransform, Literal, NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Sum}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DynamicFileFilter, DynamicFileFilterWithCountCheck, Filter, LogicalPlan, Project}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, Expression, FieldReference, HoursTransform, IdentityTransform, Lit, MonthsTransform, NamedReference, NamedTransform, Ref, Transform, YearsTransform}
+import org.apache.spark.sql.connector.iceberg.distributions.{ClusteredDistribution, Distribution, OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.connector.iceberg.expressions.{NullOrdering, SortDirection, SortOrder}
+import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter
+import org.apache.spark.sql.connector.iceberg.write.MergeBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+
+trait PlanHelper extends PredicateHelper {
+  val FILE_NAME_COL = "_file"
+  val ROW_POS_COL = "_pos"
+  val ROW_ID_COL = "_row_id"
+  val AFFECTED_FILES_ACC_NAME = "affectedFiles"
+  val AFFECTED_FILES_ACC_ALIAS_NAME = "_affectedFiles_"
+  val SUM_ROW_ID_ALIAS_NAME = "_sum_"
+
+  def buildScanPlan(spark: SparkSession,
+                    table: Table,
+                    output: Seq[AttributeReference],
+                    mergeBuilder: MergeBuilder,
+                    prunedTargetPlan: LogicalPlan,
+                    performCountCheckForMerge: Boolean = false): LogicalPlan = {
+
+    val scanBuilder = mergeBuilder.asScanBuilder
+    val scan = scanBuilder.build()
+    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
+
+    scan match {
+      case filterable: SupportsFileFilter =>
+        if (performCountCheckForMerge) {
+          val affectedFilesAcc = new SetAccumulator[String]()
+          spark.sparkContext.register(affectedFilesAcc, AFFECTED_FILES_ACC_NAME)
+          val planWithAccumulator = buildPlanWithFileAccumulator(affectedFilesAcc, prunedTargetPlan)
+          DynamicFileFilterWithCountCheck(scanRelation, affectedFilesAcc,
+            planWithAccumulator, filterable, table.name())
+        } else {
+          val matchingFilePlan = buildAggregatePlan(prunedTargetPlan)
+          DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
+        }
+      case _ =>
+        scanRelation
+    }
+  }
+
+  private def buildAggregatePlan(prunedTargetPlan: LogicalPlan): LogicalPlan = {
+    val fileAttr = findOutputAttr(prunedTargetPlan, FILE_NAME_COL)
+    Aggregate(Seq(fileAttr), Seq(fileAttr), prunedTargetPlan)
+  }
+
+  private def buildPlanWithFileAccumulator( fileAccum: SetAccumulator[String],
+                                            prunedTargetPlan: LogicalPlan): LogicalPlan = {
+    val fileAttr = findOutputAttr(prunedTargetPlan, FILE_NAME_COL)
+    val rowIdAttr = findOutputAttr(prunedTargetPlan, ROW_ID_COL)
+    val projectList = Seq(fileAttr, rowIdAttr,
+      Alias(AccumulateFiles(fileAccum, fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME )())
+    val projectPlan = Project(projectList, prunedTargetPlan)
+    val affectedFilesAttr = findOutputAttr(projectPlan, AFFECTED_FILES_ACC_ALIAS_NAME)
+    val aggSumCol =
+      Alias(AggregateExpression(Sum(affectedFilesAttr), Complete, false), SUM_ROW_ID_ALIAS_NAME)()
+    val aggPlan = Aggregate(Seq(rowIdAttr), Seq(aggSumCol), projectPlan)
+    val sumAttr = findOutputAttr(aggPlan, SUM_ROW_ID_ALIAS_NAME)
+    val havingExpr = GreaterThan(sumAttr, Literal(1L))
+    Filter(havingExpr, aggPlan)
+  }
+
+  def findOutputAttr(plan: LogicalPlan, attrName: String): Attribute = {
+    val resolver = SQLConf.get.resolver
+    plan.output.find(attr => resolver(attr.name, attrName)).getOrElse {
+      throw new AnalysisException(s"Cannot find $attrName in ${plan.output}")
+    }
+  }
+
+  def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+    val uuid = UUID.randomUUID()
+    LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+  }
+
+  private def toOutputAttrs(schema: StructType, output: Seq[AttributeReference]): Seq[AttributeReference] = {
+    val nameToAttr = output.map(_.name).zip(output).toMap
+    schema.toAttributes.map {
+      a => nameToAttr.get(a.name) match {
+        case Some(ref) =>
+          // keep the attribute id if it was present in the relation
+          a.withExprId(ref.exprId)
+        case _ =>
+          // if the field is new, create a new attribute
+          AttributeReference(a.name, a.dataType, a.nullable, a.metadata)()
+      }
+    }
+  }
+
+  object BucketTransform {

Review comment:
       `private`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561462123



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -215,5 +227,42 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
     }
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
+
+  def buildWritePlan(
+     childPlan: LogicalPlan,
+     table: Table): LogicalPlan = {
+    table match {
+      case iceTable: SparkTable =>
+        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
+        val table = iceTable.table()
+        val distributionMode: String = table.properties
+          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)
+        val mode = DistributionMode.fromName(distributionMode)
+        val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan)
+        mode match {

Review comment:
       Minor: The `mode` variable isn't really needed. It could be `DistributionMode.fromName(distributionMode) match { ... }`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r551544826



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.MERGE_WRITE_CARDINALITY_CHECK
+import org.apache.iceberg.TableProperties.MERGE_WRITE_CARDINALITY_CHECK_DEFAULT
+import org.apache.iceberg.TableProperties.MERGE_WRITE_GLOBAL_SORT_ENABLED
+import org.apache.iceberg.TableProperties.MERGE_WRITE_GLOBAL_SORT_ENABLED_DEFAULT
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, None, Some(prunedTargetPlan),
+          isCountCheckEnabled(target.table))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(plan: LogicalPlan, table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+     val iceTable = table.asInstanceOf[SparkTable].table()

Review comment:
       I don't think this should cast the table. Instead, it should attempt to handle non-Iceberg tables and have a branch for Iceberg tables that adds the sort.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559052692



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpressions.scala
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.nio.ByteBuffer
+import org.apache.iceberg.spark.SparkSchemaUtil
+import org.apache.iceberg.transforms.{Transform, Transforms}
+import org.apache.iceberg.types.{Type, Types}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{AbstractDataType, DataType, Decimal, DecimalType, IntegerType, StringType, TimestampType}
+import org.apache.spark.unsafe.types.UTF8String
+
+abstract class IcebergTransformExpression

Review comment:
       Can you implement `truncate` or add a TODO for it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561462819



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -324,6 +326,111 @@ public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException
            sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
   }
 
+  @Test
+  public void testIdentityPartition()  {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
+      initTable(targetName);
+      setDistributionMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+
+      String sqlText = "MERGE INTO %s AS target " +
+              "USING %s AS source " +
+              "ON target.id = source.id " +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * " +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE " +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, targetName, sourceName);
+      assertEquals("Should have expected rows",
+             ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+             sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testDaysTransform() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg PARTITIONED BY (days(ts))", targetName);
+      initTable(targetName);
+      setDistributionMode(targetName, mode);
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg", sourceName);
+      initTable(sourceName);
+      sql("INSERT INTO " + targetName + " VALUES (1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+      sql("INSERT INto " + sourceName + " VALUES (2, timestamp('2001-01-02 00:00:00'))," +
+              "(1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+
+      String sqlText = "MERGE INTO %s AS target \n" +
+              "USING %s AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, targetName, sourceName);
+      assertEquals("Should have expected rows",
+             ImmutableList.of(row(1, "2001-01-01 00:00:00"), row(2, "2001-01-02 00:00:00")),
+             sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testBucketExpression() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
+              " CLUSTERED BY (dep) INTO 2 BUCKETS", targetName);
+      initTable(targetName);
+      setDistributionMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+      String sqlText = "MERGE INTO %s AS target \n" +
+              "USING %s AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, targetName, sourceName);
+      assertEquals("Should have expected rows",
+             ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+             sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testPartitionedAndOrderedTable() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
+              " PARTITIONED BY (id)", targetName);
+      sql("ALTER TABLE %s  WRITE ORDERED BY (dep)", targetName);
+      initTable(targetName);
+      setDistributionMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");

Review comment:
       Nit: use of "" instead of filling in table names in this test as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561431174



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -303,25 +305,107 @@ public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableEx
   }
 
   @Test
-  public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {

Review comment:
       Looks like this test case was accidentally deleted?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559059301



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(childPlan: LogicalPlan,
+                     planToResolveFrom: LogicalPlan,
+                     table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+    val iceTable = table.asInstanceOf[SparkTable].table()
+    val globalSortEnabled = isGlobalSortEnabled(table)
+    val partitionExpressions = toCatalyst(toRequiredDistribution(iceTable.spec), planToResolveFrom)
+    val sortExpressions: Seq[SortOrder] =  toCatalyst(toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), false), planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+    val dist = toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), true)
+    val globalSortExprs: Seq[SortOrder] = toCatalyst(dist, planToResolveFrom).asInstanceOf[Seq[SortOrder]]

Review comment:
       I find this method really hard to follow. This creates expressions for every possible combination and then decides which ones to use.
   
   The logic below also seems more complicated than it needs to be, I think because it uses a different order to check the possible combinations of `isGlobal`, `isPartitioned` and `isSorted`. For example, if `isGlobal` is true, but there is no ordering, then it needs to create an order rather than just not ordering. I was expecting something a bit simpler, like this:
   
   ```scala
   val hasSortOrder = !table.sortOrder.isUnordered();
   if (hasSortOrder) {
     if (distributionMode == Sort) {
       val order = toCatalyst(toOrderedDistribution(spec, order, true))
       val roundRobin = Repartition(numShufflePartitions, shuffle = true, childPlan)
       Sort(order, global = true, roundRobin)
     } else if (distributionMode == Partition) {
       val order = toCatalyst(order)
       val hashPartitioned = RepartitionByExpression(toCatalyst(toClusteredDistribution(spec)), childPlan, numShufflePartitions)
       Sort(order, global = false, hashPartitioned)
     } else {
       val order = toCatalyst(order)
       Sort(order, global = false, childPlan)
     }
   } else if (distributionMode == Partition) {
     RepartitionByExpression(toCatalyst(toClusteredDistribution(spec)), childPlan, numShufflePartitions)
   } else {
     childPlan
   }
   ```
   
   That is a bit simpler because global ordering is only considered if the table has an ordering. It may still be a good idea to do what you're doing and create an order from join attributes, but I'd like to make sure that we're choosing to do that. The simplest option is to turn off sorting if there is no global order.
   
   Another alternative if the "distribution mode" is to sort: we could sort by partition keys, _and_ the join attributes. As long as we are using a range partitioner, we should try to minimize files created in each partition.
   
   Also: in my code above, I added a round-robin shuffle before the range partitioner to avoid running the join and merge twice (once to calculate skew and once to produce records).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559056024



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanHelper.scala
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.catalyst.utils
+
+import java.util.UUID
+import org.apache.spark.sql
+import org.apache.spark.sql.{catalyst, AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{AccumulateFiles, Alias, Attribute, AttributeReference, GreaterThan, IcebergBucketTransform, IcebergDayTransform, IcebergHourTransform, IcebergMonthTransform, IcebergYearTransform, Literal, NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Sum}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DynamicFileFilter, DynamicFileFilterWithCountCheck, Filter, LogicalPlan, Project}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, Expression, FieldReference, HoursTransform, IdentityTransform, Lit, MonthsTransform, NamedReference, NamedTransform, Ref, Transform, YearsTransform}
+import org.apache.spark.sql.connector.iceberg.distributions.{ClusteredDistribution, Distribution, OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.connector.iceberg.expressions.{NullOrdering, SortDirection, SortOrder}
+import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter
+import org.apache.spark.sql.connector.iceberg.write.MergeBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+
+trait PlanHelper extends PredicateHelper {
+  val FILE_NAME_COL = "_file"
+  val ROW_POS_COL = "_pos"
+  val ROW_ID_COL = "_row_id"
+  val AFFECTED_FILES_ACC_NAME = "affectedFiles"
+  val AFFECTED_FILES_ACC_ALIAS_NAME = "_affectedFiles_"
+  val SUM_ROW_ID_ALIAS_NAME = "_sum_"
+
+  def buildScanPlan(spark: SparkSession,
+                    table: Table,
+                    output: Seq[AttributeReference],
+                    mergeBuilder: MergeBuilder,
+                    prunedTargetPlan: LogicalPlan,
+                    performCountCheckForMerge: Boolean = false): LogicalPlan = {
+
+    val scanBuilder = mergeBuilder.asScanBuilder
+    val scan = scanBuilder.build()
+    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
+
+    scan match {
+      case filterable: SupportsFileFilter =>
+        if (performCountCheckForMerge) {
+          val affectedFilesAcc = new SetAccumulator[String]()
+          spark.sparkContext.register(affectedFilesAcc, AFFECTED_FILES_ACC_NAME)
+          val planWithAccumulator = buildPlanWithFileAccumulator(affectedFilesAcc, prunedTargetPlan)
+          DynamicFileFilterWithCountCheck(scanRelation, affectedFilesAcc,
+            planWithAccumulator, filterable, table.name())
+        } else {
+          val matchingFilePlan = buildAggregatePlan(prunedTargetPlan)
+          DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
+        }
+      case _ =>
+        scanRelation
+    }
+  }
+
+  private def buildAggregatePlan(prunedTargetPlan: LogicalPlan): LogicalPlan = {
+    val fileAttr = findOutputAttr(prunedTargetPlan, FILE_NAME_COL)
+    Aggregate(Seq(fileAttr), Seq(fileAttr), prunedTargetPlan)
+  }
+
+  private def buildPlanWithFileAccumulator( fileAccum: SetAccumulator[String],
+                                            prunedTargetPlan: LogicalPlan): LogicalPlan = {
+    val fileAttr = findOutputAttr(prunedTargetPlan, FILE_NAME_COL)
+    val rowIdAttr = findOutputAttr(prunedTargetPlan, ROW_ID_COL)
+    val projectList = Seq(fileAttr, rowIdAttr,
+      Alias(AccumulateFiles(fileAccum, fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME )())
+    val projectPlan = Project(projectList, prunedTargetPlan)
+    val affectedFilesAttr = findOutputAttr(projectPlan, AFFECTED_FILES_ACC_ALIAS_NAME)
+    val aggSumCol =
+      Alias(AggregateExpression(Sum(affectedFilesAttr), Complete, false), SUM_ROW_ID_ALIAS_NAME)()
+    val aggPlan = Aggregate(Seq(rowIdAttr), Seq(aggSumCol), projectPlan)
+    val sumAttr = findOutputAttr(aggPlan, SUM_ROW_ID_ALIAS_NAME)
+    val havingExpr = GreaterThan(sumAttr, Literal(1L))
+    Filter(havingExpr, aggPlan)
+  }
+
+  def findOutputAttr(plan: LogicalPlan, attrName: String): Attribute = {
+    val resolver = SQLConf.get.resolver
+    plan.output.find(attr => resolver(attr.name, attrName)).getOrElse {
+      throw new AnalysisException(s"Cannot find $attrName in ${plan.output}")
+    }
+  }
+
+  def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+    val uuid = UUID.randomUUID()
+    LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+  }
+
+  private def toOutputAttrs(schema: StructType, output: Seq[AttributeReference]): Seq[AttributeReference] = {
+    val nameToAttr = output.map(_.name).zip(output).toMap
+    schema.toAttributes.map {
+      a => nameToAttr.get(a.name) match {
+        case Some(ref) =>
+          // keep the attribute id if it was present in the relation
+          a.withExprId(ref.exprId)
+        case _ =>
+          // if the field is new, create a new attribute
+          AttributeReference(a.name, a.dataType, a.nullable, a.metadata)()
+      }
+    }
+  }
+
+  object BucketTransform {
+    def unapply(transform: Transform): Option[(Int, FieldReference)] = transform match {
+      case bt: BucketTransform => bt.columns match {
+        case Seq(nf: NamedReference) =>
+          Some(bt.numBuckets.value(), FieldReference(nf.fieldNames()))
+        case _ =>
+          None
+      }
+      case _ => None
+    }
+  }
+
+  def toCatalyst(distribution: Distribution,
+                 plan: LogicalPlan): Seq[catalyst.expressions.Expression] = {
+    val resolver = SQLConf.get.resolver
+    distribution match {
+      case d: OrderedDistribution =>
+        d.ordering.map(e => toCatalyst(e, plan, resolver))
+      case d: ClusteredDistribution =>
+        d.clustering.map(e => toCatalyst(e, plan, resolver))
+      case _: UnspecifiedDistribution =>
+        Array.empty[catalyst.expressions.Expression]
+    }
+  }
+
+  private def toCatalyst(expr: Expression,
+                 query: LogicalPlan,
+                 resolver: Resolver): catalyst.expressions.Expression = {
+    def resolve(ref: FieldReference): NamedExpression = {
+      // this part is controversial as we perform resolution in the optimizer
+      // we cannot perform this step in the analyzer since we need to optimize expressions
+      // in nodes like OverwriteByExpression before constructing a logical write
+      query.resolve(ref.parts, resolver) match {
+        case Some(attr) => attr
+        case None => throw new AnalysisException(s"Cannot resolve '$ref' using ${query.output}")
+      }
+    }
+    expr match {
+      case s: SortOrder =>
+        val catalystChild = toCatalyst(s.expression(), query, resolver)
+        catalyst.expressions.SortOrder(catalystChild, toCatalyst(s.direction), toCatalyst(s.nullOrdering), Set.empty)
+      case it: IdentityTransform =>
+        resolve(FieldReference(it.ref.fieldNames()))
+      case BucketTransform(numBuckets, ref) =>
+        IcebergBucketTransform(numBuckets, resolve(ref))
+      case yt: YearsTransform =>
+        IcebergYearTransform(resolve(FieldReference(yt.ref.fieldNames())))
+      case mt: MonthsTransform =>
+        IcebergMonthTransform(resolve(FieldReference(mt.ref.fieldNames())))
+      case dt: DaysTransform =>
+        IcebergDayTransform(resolve(FieldReference(dt.ref.fieldNames())))
+      case ht: HoursTransform =>
+        IcebergHourTransform(resolve(FieldReference(ht.ref.fieldNames())))

Review comment:
       Instead of constructing `FieldReference`, I think this could just rewrite the `resolve` method to pass `parts: Seq[String]`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r560619673



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(childPlan: LogicalPlan,
+                     planToResolveFrom: LogicalPlan,
+                     table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+    val iceTable = table.asInstanceOf[SparkTable].table()
+    val globalSortEnabled = isGlobalSortEnabled(table)
+    val partitionExpressions = toCatalyst(toRequiredDistribution(iceTable.spec), planToResolveFrom)
+    val sortExpressions: Seq[SortOrder] =  toCatalyst(toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), false), planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+    val dist = toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), true)
+    val globalSortExprs: Seq[SortOrder] = toCatalyst(dist, planToResolveFrom).asInstanceOf[Seq[SortOrder]]

Review comment:
       Following up on this, I think we should implement the proposal from our [discussion about Flink hash distribution](https://github.com/apache/iceberg/pull/2064#discussion_r559018257). That PR is going to add `write.distribution-mode` with 3 values: `none`, `hash`, and `range`. Here's what we would use them for here:
   
   | Spark | `none` | `hash` | `range` |
   |-|--------|-------------|--------|
   | unordered | no distribution, locally sort by partition key | hash distribute by partition key, locally sort by partition key | range distribute by partition key, locally sort by partition key |
   | ordered | no distribution, locally sorted | hash distribute by partition key, locally sorted | globally sorted |
   
   Or in terms of the methods you've added:
   
   | Spark | `none` | `hash` | `range` |
   |-|--------|-------------|--------|
   | unordered | no distribution, local sort by `toOrderedDistribution(spec, order, true)` | distribute by `toClusteredDistribution(spec)`, local sort by `toOrderedDistribution(spec, order, true)` | global sort by `toOrderedDistribution(spec, order, true)` |
   | ordered | no distribution, local sort by `toOrderedDistribution(spec, order, true)` | distribute by `toClusteredDistribution(spec)`, local sort by `toOrderedDistribution(spec, order, true)` | global sort by `toOrderedDistribution(spec, order, true)` |
   
   The result of `toOrderedDistribution(spec, order, true)` is always used to sort, whether locally or globally. If the sort order is unordered, then it will infer the order from the partition spec just as we wanted from the top table. For hash partitioning, we can use `toClusteredDistribution` that you added. Then the only other concern is that we need to add a round-robin shuffle before the global sort.
   
   With that addition, let me revise my code from above:
   
   ```scala
   val distributionMode = table.properties.getOrDefault("write.distribution-mode", "global")
   val order = toCatalyst(toOrderedDistribution(spec, order, true))
   distributionMode.toLower(Locale.ROOT) match {
     case "none" =>
       Sort(order, global = false, childPlan)
     case "hash" =>
       val clustering = toCatalyst(toClusteredDistribution(spec))
       val hashPartitioned = RepartitionByExpression(clustering, childPlan, numShufflePartitions)
       Sort(order, global = false, hashPartitioned)
     case "range" =>
       val roundRobin = Repartition(numShufflePartitions, shuffle = true, childPlan)
       Sort(order, global = true, roundRobin)
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561461677



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -85,7 +96,7 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
           joinedAttributes = joinPlan.output
         )
 
-        val mergePlan = MergeInto(mergeParams, target, joinPlan)
+        val mergePlan = MergeInto(mergeParams, target.output, joinPlan)

Review comment:
       Why not sort this case as well? If the user has requested a sort order on the table, it makes sense to enforce it even if we aren't also rewriting files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561415376



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala
##########
@@ -81,8 +81,8 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy {
     case ReplaceData(_, batchWrite, query) =>
       ReplaceDataExec(batchWrite, planLater(query)) :: Nil
 
-    case MergeInto(mergeIntoProcessor, targetRelation, child) =>
-      MergeIntoExec(mergeIntoProcessor, targetRelation, planLater(child)) :: Nil
+    case MergeInto(mergeIntoParms, targetAttributes, child) =>
+      MergeIntoExec(mergeIntoParms, targetAttributes, planLater(child)) :: Nil

Review comment:
       I think it would be more clear to use `output` instead of `targetAttributes` here since that's what this is setting, but this is minor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561409595



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -215,5 +227,41 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
     }
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
+
+  def buildWritePlan(
+     childPlan: LogicalPlan,
+     table: Table): LogicalPlan = {
+    table match {
+      case iceTable: SparkTable =>
+        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
+        val table = iceTable.table()
+        val distributionMode: String = table.properties
+          .getOrDefault("write.distribution-mode", TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)
+        val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan)
+        distributionMode.toLowerCase(Locale.ROOT) match {

Review comment:
       The Flink commit also added a Java enum for this. We could use that instead of string matching here. It handles case insensitive mapping, too: `DistributionMode.fromName(modeName)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559057530



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -269,6 +279,51 @@ public Transform unknown(int fieldId, String sourceName, int sourceId, String tr
     return transforms.toArray(new Transform[0]);
   }
 
+  public static Distribution toRequiredDistribution(PartitionSpec spec, SortOrder sortOrder, boolean inferFromSpec) {
+    if (sortOrder.isUnsorted()) {
+      if (inferFromSpec) {
+        SortOrder specOrder = Partitioning.sortOrderFor(spec);
+        return Distributions.ordered(convert(specOrder));
+      }
+
+      return Distributions.unspecified();
+    }
+
+    Schema schema = spec.schema();
+    Multimap<Integer, SortField> sortFieldIndex = Multimaps.index(sortOrder.fields(), SortField::sourceId);
+
+    // build a sort prefix of partition fields that are not already in the sort order
+    SortOrder.Builder builder = SortOrder.builderFor(schema);
+    for (PartitionField field : spec.fields()) {
+      Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
+      boolean isSorted = sortFields.stream().anyMatch(sortField ->
+              field.transform().equals(sortField.transform()) ||
+                      sortField.transform().satisfiesOrderOf(field.transform()));
+      if (!isSorted) {
+        String sourceName = schema.findColumnName(field.sourceId());
+        builder.asc(org.apache.iceberg.expressions.Expressions.transform(sourceName, field.transform()));
+      }
+    }
+
+    // add the configured sort to the partition spec prefix sort
+    SortOrderVisitor.visit(sortOrder, new CopySortOrderFields(builder));
+
+    return Distributions.ordered(convert(builder.build()));
+  }
+
+  public static Distribution toRequiredDistribution(PartitionSpec spec) {

Review comment:
       I don't think that the name `toRequiredDistribution` makes sense for the context these methods are called in. I think it would be better to use `toClusteredDistribution(PartitionSpec)` and `toOrderedDistribution(PartitionSpec, SortOrder, boolean)`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r560619673



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(childPlan: LogicalPlan,
+                     planToResolveFrom: LogicalPlan,
+                     table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+    val iceTable = table.asInstanceOf[SparkTable].table()
+    val globalSortEnabled = isGlobalSortEnabled(table)
+    val partitionExpressions = toCatalyst(toRequiredDistribution(iceTable.spec), planToResolveFrom)
+    val sortExpressions: Seq[SortOrder] =  toCatalyst(toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), false), planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+    val dist = toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), true)
+    val globalSortExprs: Seq[SortOrder] = toCatalyst(dist, planToResolveFrom).asInstanceOf[Seq[SortOrder]]

Review comment:
       Following up on this, I think we should implement the proposal from our [discussion about Flink hash distribution](https://github.com/apache/iceberg/pull/2064#discussion_r559018257). That PR is going to add `write.distribution-mode` with 3 values: `none`, `hash`, and `range`. Here's what we would use them for here:
   
   | Spark | `none` | `hash` | `range` |
   |-|--------|-------------|--------|
   | unordered | no distribution, locally sort by partition key | hash distribute by partition key, locally sort by partition key | range distribute by partition key, locally sort by partition key |
   | ordered | no distribution, locally sorted | hash distribute by partition key, locally sorted | globally sorted |
   
   Or in terms of the methods you've added:
   
   | Spark | `none` | `hash` | `range` |
   |-|--------|-------------|--------|
   | unordered | no distribution, local sort by `toOrderedDistribution(spec, order, true)` | distribute by `toClusteredDistribution(spec)`, local sort by `toOrderedDistribution(spec, order, true)` | global sort by `toOrderedDistribution(spec, order, true)` |
   | ordered | no distribution, local sort by `toOrderedDistribution(spec, order, true)` | distribute by `toClusteredDistribution(spec)`, local sort by `toOrderedDistribution(spec, order, true)` | global sort by `toOrderedDistribution(spec, order, true)` |
   
   The result of `toOrderedDistribution(spec, order, true)` is always used to sort, whether locally or globally. If the sort order is unordered, then it will infer the order from the partition spec just as we wanted from the top table. For hash partitioning, we can use `toClusteredDistribution` that you added. Then the only other concern is that we need to add a round-robin shuffle before the global sort.
   
   With that addition, let me revise my code from above:
   
   ```scala
   val distributionMode = table.properties.getOrDefault("write.distribution-mode", "range")
   val order = toCatalyst(toOrderedDistribution(spec, order, true))
   distributionMode.toLower(Locale.ROOT) match {
     case "none" =>
       Sort(order, global = false, childPlan)
     case "hash" =>
       val clustering = toCatalyst(toClusteredDistribution(spec))
       val hashPartitioned = RepartitionByExpression(clustering, childPlan, numShufflePartitions)
       Sort(order, global = false, hashPartitioned)
     case "range" =>
       val roundRobin = Repartition(numShufflePartitions, shuffle = true, childPlan)
       Sort(order, global = true, roundRobin)
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r560630705



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(childPlan: LogicalPlan,
+                     planToResolveFrom: LogicalPlan,
+                     table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+    val iceTable = table.asInstanceOf[SparkTable].table()
+    val globalSortEnabled = isGlobalSortEnabled(table)
+    val partitionExpressions = toCatalyst(toRequiredDistribution(iceTable.spec), planToResolveFrom)
+    val sortExpressions: Seq[SortOrder] =  toCatalyst(toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), false), planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+    val dist = toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), true)
+    val globalSortExprs: Seq[SortOrder] = toCatalyst(dist, planToResolveFrom).asInstanceOf[Seq[SortOrder]]

Review comment:
       @rdblue Thank you very much !!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561408143



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -215,5 +227,41 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
     }
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
+
+  def buildWritePlan(
+     childPlan: LogicalPlan,
+     table: Table): LogicalPlan = {
+    table match {
+      case iceTable: SparkTable =>
+        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
+        val table = iceTable.table()
+        val distributionMode: String = table.properties
+          .getOrDefault("write.distribution-mode", TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)
+        val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan)
+        distributionMode.toLowerCase(Locale.ROOT) match {
+          case TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT =>

Review comment:
       We should rename this to `WRITE_DISTRIBUTION_MODE_NONE` in a follow-up, since the default depends on the engine. We can also add `WRITE_DISTRIBUTION_MODE_FLINK_DEFAULT` and `WRITE_DISTRIBUTION_MODE_SPARK_DEFAULT`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561431956



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -303,25 +305,107 @@ public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableEx
   }
 
   @Test
-  public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
+  public void testIdentityPartition()  {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");

Review comment:
       Minor: passing an extra empty string and passing table names embedded in the SQL text.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561433683



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -303,25 +305,107 @@ public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableEx
   }
 
   @Test
-  public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
+  public void testIdentityPartition()  {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
 
-    String sqlText = "MERGE INTO %s AS target " +
-           "USING %s AS source " +
-           "ON target.id = source.id " +
-           "WHEN MATCHED AND target.id = 1 THEN DELETE " +
-           "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+  @Test
+  public void testDaysTransform() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg PARTITIONED BY (days(ts))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg", sourceName);
+      initTable(sourceName);
+      sql("INSERT INTO " + targetName + " VALUES (1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+      sql("INSERT INto " + sourceName + " VALUES (2, timestamp('2001-01-02 00:00:00'))," +
+              "(1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "2001-01-01 00:00:00"), row(2, "2001-01-02 00:00:00")),
+              sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
 
-    String tabName = catalogName + "." + "default.target";
-    String errorMsg = "The same row of target table `" + tabName + "` was identified more than\n" +
-            " once for an update, delete or insert operation of the MERGE statement.";
-    AssertHelpers.assertThrows("Should complain ambiguous row in target",
-           SparkException.class, errorMsg, () -> sql(sqlText, targetName, sourceName));
-    assertEquals("Target should be unchanged",
-           ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
-           sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+  @Test
+  public void testBucketExpression() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
+              " CLUSTERED BY (dep) INTO 2 BUCKETS", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testPartitionedAndOrderedTable() {

Review comment:
       Where does this set the table ordering? I would expect it to run `ALTER TABLE %s WRITE ORDERED BY ...`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561417740



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
##########
@@ -194,4 +220,84 @@ trait RewriteRowLevelOperationHelper extends PredicateHelper with Logging {
       }
     }
   }
+
+  private object BucketTransform {
+    def unapply(transform: Transform): Option[(Int, FieldReference)] = transform match {
+      case bt: BucketTransform => bt.columns match {
+        case Seq(nf: NamedReference) =>
+          Some(bt.numBuckets.value(), FieldReference(nf.fieldNames()))
+        case _ =>
+          None
+      }
+      case _ => None
+    }
+  }
+
+  protected def toCatalyst(
+      distribution: Distribution,
+      plan: LogicalPlan): Seq[catalyst.expressions.Expression] = {
+
+    distribution match {
+      case d: OrderedDistribution =>
+        d.ordering.map(e => toCatalyst(e, plan, resolver))
+      case d: ClusteredDistribution =>
+        d.clustering.map(e => toCatalyst(e, plan, resolver))
+      case _: UnspecifiedDistribution =>
+        Array.empty[catalyst.expressions.Expression]
+    }
+  }
+
+  private def toCatalyst(
+      expr: Expression,
+      query: LogicalPlan,
+      resolver: Resolver): catalyst.expressions.Expression = {
+
+    def resolve(parts: Seq[String]): NamedExpression = {
+      // this part is controversial as we perform resolution in the optimizer
+      // we cannot perform this step in the analyzer since we need to optimize expressions
+      // in nodes like OverwriteByExpression before constructing a logical write
+      query.resolve(parts, resolver) match {
+        case Some(attr) => attr
+        case None => throw new AnalysisException(s"Cannot resolve '${parts.map(quoteIfNeeded).mkString(".")}'" +
+          s" using ${query.output}")
+      }
+    }
+
+    expr match {
+      case s: SortOrder =>
+        val catalystChild = toCatalyst(s.expression(), query, resolver)
+        catalyst.expressions.SortOrder(catalystChild, toCatalyst(s.direction), toCatalyst(s.nullOrdering), Set.empty)
+      case it: IdentityTransform =>
+        resolve(it.ref.fieldNames())
+      case BucketTransform(numBuckets, ref) =>
+        IcebergBucketTransform(numBuckets, resolve(ref.fieldNames))
+      case yt: YearsTransform =>
+        IcebergYearTransform(resolve(yt.ref.fieldNames))
+      case mt: MonthsTransform =>
+        IcebergMonthTransform(resolve(mt.ref.fieldNames))
+      case dt: DaysTransform =>
+        IcebergDayTransform(resolve(dt.ref.fieldNames))
+      case ht: HoursTransform =>
+        IcebergHourTransform(resolve(ht.ref.fieldNames))
+      case ref: FieldReference =>
+        resolve(ref.fieldNames)
+      case _ =>
+        throw new RuntimeException(s"$expr is not currently supported")
+    }
+  }
+

Review comment:
       Nit: extra newline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559056161



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(childPlan: LogicalPlan,
+                     planToResolveFrom: LogicalPlan,
+                     table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+    val iceTable = table.asInstanceOf[SparkTable].table()
+    val globalSortEnabled = isGlobalSortEnabled(table)
+    val partitionExpressions = toCatalyst(toRequiredDistribution(iceTable.spec), planToResolveFrom)

Review comment:
       I think that if `MergeInto` is passed in, it will no longer be necessary to use `planToResolveFrom`. That could just be the child plan.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559055832



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanHelper.scala
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.catalyst.utils
+
+import java.util.UUID
+import org.apache.spark.sql
+import org.apache.spark.sql.{catalyst, AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{AccumulateFiles, Alias, Attribute, AttributeReference, GreaterThan, IcebergBucketTransform, IcebergDayTransform, IcebergHourTransform, IcebergMonthTransform, IcebergYearTransform, Literal, NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Sum}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DynamicFileFilter, DynamicFileFilterWithCountCheck, Filter, LogicalPlan, Project}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, Expression, FieldReference, HoursTransform, IdentityTransform, Lit, MonthsTransform, NamedReference, NamedTransform, Ref, Transform, YearsTransform}
+import org.apache.spark.sql.connector.iceberg.distributions.{ClusteredDistribution, Distribution, OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.connector.iceberg.expressions.{NullOrdering, SortDirection, SortOrder}
+import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter
+import org.apache.spark.sql.connector.iceberg.write.MergeBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+
+trait PlanHelper extends PredicateHelper {
+  val FILE_NAME_COL = "_file"
+  val ROW_POS_COL = "_pos"
+  val ROW_ID_COL = "_row_id"
+  val AFFECTED_FILES_ACC_NAME = "affectedFiles"
+  val AFFECTED_FILES_ACC_ALIAS_NAME = "_affectedFiles_"
+  val SUM_ROW_ID_ALIAS_NAME = "_sum_"
+
+  def buildScanPlan(spark: SparkSession,
+                    table: Table,
+                    output: Seq[AttributeReference],
+                    mergeBuilder: MergeBuilder,
+                    prunedTargetPlan: LogicalPlan,
+                    performCountCheckForMerge: Boolean = false): LogicalPlan = {
+
+    val scanBuilder = mergeBuilder.asScanBuilder
+    val scan = scanBuilder.build()
+    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
+
+    scan match {
+      case filterable: SupportsFileFilter =>
+        if (performCountCheckForMerge) {
+          val affectedFilesAcc = new SetAccumulator[String]()
+          spark.sparkContext.register(affectedFilesAcc, AFFECTED_FILES_ACC_NAME)
+          val planWithAccumulator = buildPlanWithFileAccumulator(affectedFilesAcc, prunedTargetPlan)
+          DynamicFileFilterWithCountCheck(scanRelation, affectedFilesAcc,
+            planWithAccumulator, filterable, table.name())
+        } else {
+          val matchingFilePlan = buildAggregatePlan(prunedTargetPlan)
+          DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
+        }
+      case _ =>
+        scanRelation
+    }
+  }
+
+  private def buildAggregatePlan(prunedTargetPlan: LogicalPlan): LogicalPlan = {
+    val fileAttr = findOutputAttr(prunedTargetPlan, FILE_NAME_COL)
+    Aggregate(Seq(fileAttr), Seq(fileAttr), prunedTargetPlan)
+  }
+
+  private def buildPlanWithFileAccumulator( fileAccum: SetAccumulator[String],
+                                            prunedTargetPlan: LogicalPlan): LogicalPlan = {
+    val fileAttr = findOutputAttr(prunedTargetPlan, FILE_NAME_COL)
+    val rowIdAttr = findOutputAttr(prunedTargetPlan, ROW_ID_COL)
+    val projectList = Seq(fileAttr, rowIdAttr,
+      Alias(AccumulateFiles(fileAccum, fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME )())
+    val projectPlan = Project(projectList, prunedTargetPlan)
+    val affectedFilesAttr = findOutputAttr(projectPlan, AFFECTED_FILES_ACC_ALIAS_NAME)
+    val aggSumCol =
+      Alias(AggregateExpression(Sum(affectedFilesAttr), Complete, false), SUM_ROW_ID_ALIAS_NAME)()
+    val aggPlan = Aggregate(Seq(rowIdAttr), Seq(aggSumCol), projectPlan)
+    val sumAttr = findOutputAttr(aggPlan, SUM_ROW_ID_ALIAS_NAME)
+    val havingExpr = GreaterThan(sumAttr, Literal(1L))
+    Filter(havingExpr, aggPlan)
+  }
+
+  def findOutputAttr(plan: LogicalPlan, attrName: String): Attribute = {
+    val resolver = SQLConf.get.resolver
+    plan.output.find(attr => resolver(attr.name, attrName)).getOrElse {
+      throw new AnalysisException(s"Cannot find $attrName in ${plan.output}")
+    }
+  }
+
+  def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+    val uuid = UUID.randomUUID()
+    LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+  }
+
+  private def toOutputAttrs(schema: StructType, output: Seq[AttributeReference]): Seq[AttributeReference] = {
+    val nameToAttr = output.map(_.name).zip(output).toMap
+    schema.toAttributes.map {
+      a => nameToAttr.get(a.name) match {
+        case Some(ref) =>
+          // keep the attribute id if it was present in the relation
+          a.withExprId(ref.exprId)
+        case _ =>
+          // if the field is new, create a new attribute
+          AttributeReference(a.name, a.dataType, a.nullable, a.metadata)()
+      }
+    }
+  }
+
+  object BucketTransform {
+    def unapply(transform: Transform): Option[(Int, FieldReference)] = transform match {
+      case bt: BucketTransform => bt.columns match {
+        case Seq(nf: NamedReference) =>
+          Some(bt.numBuckets.value(), FieldReference(nf.fieldNames()))
+        case _ =>
+          None
+      }
+      case _ => None
+    }
+  }
+
+  def toCatalyst(distribution: Distribution,

Review comment:
       I think this should probably be `private` as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561425445



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -215,5 +227,41 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
     }
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
+
+  def buildWritePlan(
+     childPlan: LogicalPlan,
+     table: Table): LogicalPlan = {
+    table match {
+      case iceTable: SparkTable =>
+        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
+        val table = iceTable.table()
+        val distributionMode: String = table.properties
+          .getOrDefault("write.distribution-mode", TableProperties.WRITE_DISTRIBUTION_MODE_RANGE)

Review comment:
       Isn't `write.distribution-mode` listed in `TableProperties`? We should use the constant.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561474354



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -85,7 +96,7 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
           joinedAttributes = joinPlan.output
         )
 
-        val mergePlan = MergeInto(mergeParams, target, joinPlan)
+        val mergePlan = MergeInto(mergeParams, target.output, joinPlan)

Review comment:
       @rdblue Wouldn't AppendData have logic to do the right thing based on child's distribution and ordering ? I remember seeing something from Anton on this.  I will go ahead and change it for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559057674



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.iceberg.TableProperties.{MERGE_WRITE_CARDINALITY_CHECK, MERGE_WRITE_CARDINALITY_CHECK_DEFAULT, MERGE_WRITE_SORT_MODE, MERGE_WRITE_SORT_MODE_GLOBAL}
+import org.apache.iceberg.spark.Spark3Util
+import org.apache.iceberg.spark.Spark3Util.toRequiredDistribution
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.iceberg.util.PropertyUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+import org.apache.spark.sql.catalyst.plans.logical.{MergeAction, _}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.utils.PlanHelper
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
+
+case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with PlanHelper with Logging  {
+  val ROW_FROM_SOURCE = "_row_from_source_"
+  val ROW_FROM_TARGET = "_row_from_target_"
+
+  import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Implicits._
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      // rewrite all operations that require reading the table to delete records
+      case MergeIntoTable(target: DataSourceV2Relation,
+                          source: LogicalPlan, cond, actions, notActions) =>
+        val targetOutputCols = target.output
+        val newProjectCols = target.output ++
+          Seq(
+            Alias(InputFileName(), FILE_NAME_COL)(),
+            Alias(monotonically_increasing_id().expr, ROW_ID_COL)()
+          )
+        val newTargetTable = Project(newProjectCols, target)
+
+        // Construct the plan to prune target based on join condition between source and
+        // target.
+        val prunedTargetPlan = Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE)
+        val writeInfo = newWriteInfo(target.schema)
+        val mergeBuilder = target.table.asMergeable.newMergeBuilder("delete", writeInfo)
+        val targetTableScan =  buildScanPlan(spark, target.table,
+          target.output, mergeBuilder, prunedTargetPlan, isCountCheckEnabled(target.table, actions))
+
+        // Construct an outer join to help track changes in source and target.
+        // TODO : Optimize this to use LEFT ANTI or RIGHT OUTER when applicable.
+        val sourceTableProj = source.output ++ Seq(Alias(lit(true).expr, ROW_FROM_SOURCE)())
+        val targetTableProj = target.output ++ Seq(Alias(lit(true).expr, ROW_FROM_TARGET)())
+        val newTargetTableScan = Project(targetTableProj, targetTableScan)
+        val newSourceTableScan = Project(sourceTableProj, source)
+        val joinPlan = Join(newSourceTableScan, newTargetTableScan, FullOuter, Some(cond), JoinHint.NONE)
+
+        // Construct the plan to replace the data based on the output of `MergeInto`
+        val mergeParams = MergeIntoParams(
+          isSourceRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_SOURCE)),
+          isTargetRowNotPresent = IsNull(findOutputAttr(joinPlan, ROW_FROM_TARGET)),
+          matchedConditions = actions.map(getClauseCondition),
+          matchedOutputs = actions.map(actionOutput(_, targetOutputCols)),
+          notMatchedConditions = notActions.map(getClauseCondition),
+          notMatchedOutputs = notActions.map(actionOutput(_, targetOutputCols)),
+          targetOutput = targetOutputCols :+ Literal(false),
+          deleteOutput = targetOutputCols :+ Literal(true),
+          joinedAttributes = joinPlan.output
+        )
+        val joinKeysFromTarget = targetOutputCols.filter (
+          attr => cond.references.exists(attr.semanticEquals(_))
+        )
+        val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget)
+        val mergePlan = MergeInto(mergeParams, target, writePlan)
+        val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch()
+        ReplaceData(target, batchWrite, mergePlan)
+    }
+  }
+
+  def buildWritePlan(childPlan: LogicalPlan,
+                     planToResolveFrom: LogicalPlan,
+                     table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = {
+    val iceTable = table.asInstanceOf[SparkTable].table()
+    val globalSortEnabled = isGlobalSortEnabled(table)
+    val partitionExpressions = toCatalyst(toRequiredDistribution(iceTable.spec), planToResolveFrom)
+    val sortExpressions: Seq[SortOrder] =  toCatalyst(toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), false), planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+    val dist = toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), true)
+    val globalSortExprs: Seq[SortOrder] = toCatalyst(dist, planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+
+    /*
+    val globalSortExprs: Seq[SortOrder] = toCatalyst(toRequiredDistribution(iceTable.spec(),
+      iceTable.sortOrder(), true), planToResolveFrom).asInstanceOf[Seq[SortOrder]]
+
+     */
+    val numShufflePartitions = SQLConf.get.numShufflePartitions
+
+    (globalSortEnabled, partitionExpressions.isEmpty, sortExpressions.isEmpty) match {
+      case (true, _, _) =>
+        // If global sorting is preferred then build a sort order based on partition and
+        // sort specification of the table. If none is present, then sort based on the
+        // target join key attributes.
+        if (globalSortExprs.nonEmpty) {
+          Sort(globalSortExprs, true, childPlan)
+        } else if (targetJoinAttrs.nonEmpty) {
+          Sort(buildSortOrder(targetJoinAttrs), true, childPlan)
+        } else {
+          childPlan
+        }
+      case (_, true, true) =>
+        // If no partition spec or sort order is defined for the table then repartition and
+        // locally sort the data based on the join key attributes.
+        if (targetJoinAttrs.nonEmpty) {
+          val repartition = RepartitionByExpression(targetJoinAttrs, childPlan, numShufflePartitions)
+          Sort(buildSortOrder(targetJoinAttrs), global = globalSortEnabled, repartition)
+        } else {
+          childPlan
+        }
+      case (_, true, false) =>
+        // Only sort order is specified but no partition spec is defined. In this case
+        // Reparttion the data by sort order expression and then locally sort the data

Review comment:
       Typo: `Reparttion` -> `Repartition`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561415024



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeIntoExec.scala
##########
@@ -32,10 +32,10 @@ import org.apache.spark.sql.execution.UnaryExecNode
 
 case class MergeIntoExec(
     mergeIntoParams: MergeIntoParams,
-    @transient targetRelation: DataSourceV2Relation,
+    targetOutput: Seq[Attribute],

Review comment:
       Same here, using `output` would make the method definition unnecessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559055221



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -269,6 +279,51 @@ public Transform unknown(int fieldId, String sourceName, int sourceId, String tr
     return transforms.toArray(new Transform[0]);
   }
 
+  public static Distribution toRequiredDistribution(PartitionSpec spec, SortOrder sortOrder, boolean inferFromSpec) {
+    if (sortOrder.isUnsorted()) {
+      if (inferFromSpec) {
+        SortOrder specOrder = Partitioning.sortOrderFor(spec);
+        return Distributions.ordered(convert(specOrder));
+      }
+
+      return Distributions.unspecified();
+    }
+
+    Schema schema = spec.schema();
+    Multimap<Integer, SortField> sortFieldIndex = Multimaps.index(sortOrder.fields(), SortField::sourceId);
+
+    // build a sort prefix of partition fields that are not already in the sort order
+    SortOrder.Builder builder = SortOrder.builderFor(schema);
+    for (PartitionField field : spec.fields()) {
+      Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
+      boolean isSorted = sortFields.stream().anyMatch(sortField ->
+              field.transform().equals(sortField.transform()) ||
+                      sortField.transform().satisfiesOrderOf(field.transform()));

Review comment:
       I think this would fit on one line if the indentation were fixed. A continuing indent should be 2 indents, 4 spaces.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561410691



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeInto.scala
##########
@@ -21,13 +21,12 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 
 case class MergeInto(
     mergeIntoProcessor: MergeIntoParams,
-    targetRelation: DataSourceV2Relation,
+    targetOutput: Seq[Attribute],

Review comment:
       I think this could just be `output` and you wouldn't need to override `def output` below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559055552



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanHelper.scala
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.catalyst.utils
+
+import java.util.UUID
+import org.apache.spark.sql
+import org.apache.spark.sql.{catalyst, AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{AccumulateFiles, Alias, Attribute, AttributeReference, GreaterThan, IcebergBucketTransform, IcebergDayTransform, IcebergHourTransform, IcebergMonthTransform, IcebergYearTransform, Literal, NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Sum}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, DynamicFileFilter, DynamicFileFilterWithCountCheck, Filter, LogicalPlan, Project}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, Expression, FieldReference, HoursTransform, IdentityTransform, Lit, MonthsTransform, NamedReference, NamedTransform, Ref, Transform, YearsTransform}
+import org.apache.spark.sql.connector.iceberg.distributions.{ClusteredDistribution, Distribution, OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.connector.iceberg.expressions.{NullOrdering, SortDirection, SortOrder}
+import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter
+import org.apache.spark.sql.connector.iceberg.write.MergeBuilder
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, LogicalWriteInfoImpl}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+
+trait PlanHelper extends PredicateHelper {
+  val FILE_NAME_COL = "_file"
+  val ROW_POS_COL = "_pos"
+  val ROW_ID_COL = "_row_id"
+  val AFFECTED_FILES_ACC_NAME = "affectedFiles"
+  val AFFECTED_FILES_ACC_ALIAS_NAME = "_affectedFiles_"
+  val SUM_ROW_ID_ALIAS_NAME = "_sum_"
+
+  def buildScanPlan(spark: SparkSession,
+                    table: Table,
+                    output: Seq[AttributeReference],
+                    mergeBuilder: MergeBuilder,
+                    prunedTargetPlan: LogicalPlan,
+                    performCountCheckForMerge: Boolean = false): LogicalPlan = {
+
+    val scanBuilder = mergeBuilder.asScanBuilder
+    val scan = scanBuilder.build()
+    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
+
+    scan match {
+      case filterable: SupportsFileFilter =>
+        if (performCountCheckForMerge) {
+          val affectedFilesAcc = new SetAccumulator[String]()
+          spark.sparkContext.register(affectedFilesAcc, AFFECTED_FILES_ACC_NAME)
+          val planWithAccumulator = buildPlanWithFileAccumulator(affectedFilesAcc, prunedTargetPlan)
+          DynamicFileFilterWithCountCheck(scanRelation, affectedFilesAcc,
+            planWithAccumulator, filterable, table.name())
+        } else {
+          val matchingFilePlan = buildAggregatePlan(prunedTargetPlan)
+          DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
+        }
+      case _ =>
+        scanRelation
+    }
+  }
+
+  private def buildAggregatePlan(prunedTargetPlan: LogicalPlan): LogicalPlan = {
+    val fileAttr = findOutputAttr(prunedTargetPlan, FILE_NAME_COL)
+    Aggregate(Seq(fileAttr), Seq(fileAttr), prunedTargetPlan)
+  }
+
+  private def buildPlanWithFileAccumulator( fileAccum: SetAccumulator[String],
+                                            prunedTargetPlan: LogicalPlan): LogicalPlan = {
+    val fileAttr = findOutputAttr(prunedTargetPlan, FILE_NAME_COL)
+    val rowIdAttr = findOutputAttr(prunedTargetPlan, ROW_ID_COL)
+    val projectList = Seq(fileAttr, rowIdAttr,
+      Alias(AccumulateFiles(fileAccum, fileAttr), AFFECTED_FILES_ACC_ALIAS_NAME )())
+    val projectPlan = Project(projectList, prunedTargetPlan)
+    val affectedFilesAttr = findOutputAttr(projectPlan, AFFECTED_FILES_ACC_ALIAS_NAME)
+    val aggSumCol =
+      Alias(AggregateExpression(Sum(affectedFilesAttr), Complete, false), SUM_ROW_ID_ALIAS_NAME)()
+    val aggPlan = Aggregate(Seq(rowIdAttr), Seq(aggSumCol), projectPlan)
+    val sumAttr = findOutputAttr(aggPlan, SUM_ROW_ID_ALIAS_NAME)
+    val havingExpr = GreaterThan(sumAttr, Literal(1L))
+    Filter(havingExpr, aggPlan)
+  }
+
+  def findOutputAttr(plan: LogicalPlan, attrName: String): Attribute = {
+    val resolver = SQLConf.get.resolver
+    plan.output.find(attr => resolver(attr.name, attrName)).getOrElse {
+      throw new AnalysisException(s"Cannot find $attrName in ${plan.output}")
+    }
+  }
+
+  def newWriteInfo(schema: StructType): LogicalWriteInfo = {
+    val uuid = UUID.randomUUID()
+    LogicalWriteInfoImpl(queryId = uuid.toString, schema, CaseInsensitiveStringMap.empty)
+  }
+
+  private def toOutputAttrs(schema: StructType, output: Seq[AttributeReference]): Seq[AttributeReference] = {
+    val nameToAttr = output.map(_.name).zip(output).toMap
+    schema.toAttributes.map {
+      a => nameToAttr.get(a.name) match {
+        case Some(ref) =>
+          // keep the attribute id if it was present in the relation
+          a.withExprId(ref.exprId)
+        case _ =>
+          // if the field is new, create a new attribute
+          AttributeReference(a.name, a.dataType, a.nullable, a.metadata)()
+      }
+    }
+  }
+
+  object BucketTransform {
+    def unapply(transform: Transform): Option[(Int, FieldReference)] = transform match {
+      case bt: BucketTransform => bt.columns match {
+        case Seq(nf: NamedReference) =>
+          Some(bt.numBuckets.value(), FieldReference(nf.fieldNames()))
+        case _ =>
+          None
+      }
+      case _ => None
+    }
+  }
+
+  def toCatalyst(distribution: Distribution,
+                 plan: LogicalPlan): Seq[catalyst.expressions.Expression] = {
+    val resolver = SQLConf.get.resolver

Review comment:
       Need to remember to get the resolver from the `def resolver` method when rebased.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r559055341



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/OrderField.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.NullOrder;
+import org.apache.spark.sql.connector.expressions.Expression;
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.iceberg.expressions.NullOrdering;
+import org.apache.spark.sql.connector.iceberg.expressions.SortDirection;
+import org.apache.spark.sql.connector.iceberg.expressions.SortOrder;
+
+class OrderField implements SortOrder {
+  static OrderField column(String fieldName, org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+    return new OrderField(Expressions.column(fieldName), toSpark(direction), toSpark(nullOrder));
+  }
+
+  static OrderField bucket(String fieldName, int numBuckets,
+                           org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+    return new OrderField(Expressions.bucket(numBuckets, fieldName), toSpark(direction), toSpark(nullOrder));
+  }
+
+  static OrderField truncate(String fieldName, int width,
+                             org.apache.iceberg.SortDirection direction, NullOrder nullOrder) {
+    return new OrderField(Expressions.apply(
+               "truncate", Expressions.column(fieldName), Expressions.literal(width)),
+               toSpark(direction), toSpark(nullOrder));

Review comment:
       Indentation is off in this file as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] dilipbiswal commented on a change in pull request #2022: Implement logic to group and sort rows before writing rows for MERGE INTO.

Posted by GitBox <gi...@apache.org>.
dilipbiswal commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561451604



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
##########
@@ -303,25 +305,107 @@ public void testSingleUnconditionalDeleteDisbleCountCheck() throws NoSuchTableEx
   }
 
   @Test
-  public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException {
-    append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
-    append(sourceName, new Employee(1, "emp-id-1"), new Employee(1, "emp-id-1"),
-           new Employee(2, "emp-id-2"), new Employee(6, "emp-id-6"));
+  public void testIdentityPartition()  {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg PARTITIONED BY (identity(dep))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
 
-    String sqlText = "MERGE INTO %s AS target " +
-           "USING %s AS source " +
-           "ON target.id = source.id " +
-           "WHEN MATCHED AND target.id = 1 THEN DELETE " +
-           "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+  @Test
+  public void testDaysTransform() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg PARTITIONED BY (days(ts))", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      sql("CREATE TABLE %s (id INT, ts timestamp) USING iceberg", sourceName);
+      initTable(sourceName);
+      sql("INSERT INTO " + targetName + " VALUES (1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+      sql("INSERT INto " + sourceName + " VALUES (2, timestamp('2001-01-02 00:00:00'))," +
+              "(1, timestamp('2001-01-01 00:00:00'))," +
+              "(6, timestamp('2001-01-06 00:00:00'))");
+
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "2001-01-01 00:00:00"), row(2, "2001-01-02 00:00:00")),
+              sql("SELECT id, CAST(ts AS STRING) FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
 
-    String tabName = catalogName + "." + "default.target";
-    String errorMsg = "The same row of target table `" + tabName + "` was identified more than\n" +
-            " once for an update, delete or insert operation of the MERGE statement.";
-    AssertHelpers.assertThrows("Should complain ambiguous row in target",
-           SparkException.class, errorMsg, () -> sql(sqlText, targetName, sourceName));
-    assertEquals("Target should be unchanged",
-           ImmutableList.of(row(1, "emp-id-one"), row(6, "emp-id-6")),
-           sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+  @Test
+  public void testBucketExpression() {
+    writeModes.forEach(mode -> {
+      removeTables();
+      sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg" +
+              " CLUSTERED BY (dep) INTO 2 BUCKETS", targetName);
+      initTable(targetName);
+      setWriteMode(targetName, mode);
+      createAndInitSourceTable(sourceName);
+      append(targetName, new Employee(1, "emp-id-one"), new Employee(6, "emp-id-6"));
+      append(sourceName, new Employee(2, "emp-id-2"), new Employee(1, "emp-id-1"), new Employee(6, "emp-id-6"));
+      String sqlText = "MERGE INTO " + targetName + " AS target \n" +
+              "USING " + sourceName + " AS source \n" +
+              "ON target.id = source.id \n" +
+              "WHEN MATCHED AND target.id = 1 THEN UPDATE SET * \n" +
+              "WHEN MATCHED AND target.id = 6 THEN DELETE \n" +
+              "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * ";
+
+      sql(sqlText, "");
+      assertEquals("Should have expected rows",
+              ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")),
+              sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName));
+    });
+  }
+
+  @Test
+  public void testPartitionedAndOrderedTable() {

Review comment:
       @rdblue Sorry, i didn't know about this extension before. I have made the change.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -85,7 +96,7 @@ case class RewriteMergeInto(spark: SparkSession) extends Rule[LogicalPlan] with
           joinedAttributes = joinPlan.output
         )
 
-        val mergePlan = MergeInto(mergeParams, target, joinPlan)
+        val mergePlan = MergeInto(mergeParams, target.output, joinPlan)

Review comment:
       @rdblue Wouldn't AppendData have logic to do the right thing based on child's distribution and ordering ? I remember seeing something from Anton on this.  I will go ahead and change it for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org