You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/27 06:09:52 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

aokolnychyi opened a new pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992


   This PR fixes the cardinality check for alternative join implementations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r796805400



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -186,7 +187,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand {
     // disable broadcasts for the target table to perform the cardinality check
     val joinType = if (notMatchedActions.isEmpty) LeftOuter else FullOuter
     val joinHint = JoinHint(leftHint = Some(HintInfo(Some(NO_BROADCAST_HASH))), rightHint = None)
-    val joinPlan = Join(targetTableProj, sourceTableProj, joinType, Some(cond), joinHint)
+    val joinPlan = Join(NoStatsUnaryNode(targetTableProj), sourceTableProj, joinType, Some(cond), joinHint)

Review comment:
       Let me see. We handle predicate pushdown for row-level operations differently but it is definitely worth checking.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r794072817



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NoStatsUnaryNode.scala
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+case class NoStatsUnaryNode(child: LogicalPlan) extends UnaryNode {

Review comment:
       Yeah, here is the logic to pick the smaller side.
   
   ```
     def getSmallerSide(left: LogicalPlan, right: LogicalPlan): BuildSide = {
       if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793280025



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NoStatsUnaryNode.scala
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+case class NoStatsUnaryNode(child: LogicalPlan) extends UnaryNode {

Review comment:
       As much as I don't like this solution, I don't have a better one for fixing broadcast nested loop joins. Those may happen if the ON condition does not have an equality predicate (see added tests below). Extremely edge case...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793994519



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NoStatsUnaryNode.scala
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+case class NoStatsUnaryNode(child: LogicalPlan) extends UnaryNode {

Review comment:
       Got it. So this forces the source to be broadcasted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r796278084



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -186,7 +187,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand {
     // disable broadcasts for the target table to perform the cardinality check
     val joinType = if (notMatchedActions.isEmpty) LeftOuter else FullOuter
     val joinHint = JoinHint(leftHint = Some(HintInfo(Some(NO_BROADCAST_HASH))), rightHint = None)
-    val joinPlan = Join(targetTableProj, sourceTableProj, joinType, Some(cond), joinHint)
+    val joinPlan = Join(NoStatsUnaryNode(targetTableProj), sourceTableProj, joinType, Some(cond), joinHint)

Review comment:
       [doubt] Can adding a new UnaryNode fiddle with predicatePushDown ?
    
   At present predicatePushDown rule maintains the list of unary logical operator from which it's safe to push down the filter's from. so when a new node we introduce since it's not present in the allow-list and hence filter will get stuck at the parent of that node ... and will not be pushed through, which in turn cause perf regression. 
   
   Not sure if this is the case what's the best possible way to handle it. [Spark 3.2 code pointer](https://github.com/apache/spark/blob/v3.2.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1528-L1550)
   
   ```
   
       case filter @ Filter(_, u: UnaryNode)
           if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
         pushDownPredicate(filter, u.child) { predicate =>
           u.withNewChildren(Seq(Filter(predicate, u.child)))
         }
     }
   
     def canPushThrough(p: UnaryNode): Boolean = p match {
       // Note that some operators (e.g. project, aggregate, union) are being handled separately
       // (earlier in this rule).
       case _: AppendColumns => true
       case _: Distinct => true
       case _: Generate => true
       case _: Pivot => true
       case _: RepartitionByExpression => true
       case _: Repartition => true
       case _: ScriptTransformation => true
       case _: Sort => true
       case _: BatchEvalPython => true
       case _: ArrowEvalPython => true
       case _: Expand => true
       case _ => false
     }
   ```
   
   I recently hit something similar (not in iceberg), hence posting here just it case it's applicable here as well.
   
   cc @aokolnychyi @rdblue @RussellSpitzer 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#issuecomment-1023630724


   Could you explain at a high level what the bug is and how it is fixed here? It looks like we are just subbing in a join node that can't be replaced by normal optimization pathways?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793983226



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,16 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    // request a local sort by the row ID attrs if shuffle hash joins are enabled
+    // this is needed to co-locate matches for the same target row after the shuffle
+    if (performCardinalityCheck && !conf.preferSortMergeJoin) {

Review comment:
       I checked AQE and I think it would be safer for us to always request a local sort. In many cases, the incoming records would be implicitly sorted so it shouldn't be too expensive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r794071636



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,15 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    if (performCardinalityCheck) {
+      // request a local sort by the row ID attrs to co-locate matches for the same target row
+      Seq(rowIdAttrs.map(attr => SortOrder(attr, Ascending)))

Review comment:
       We used to rely on a separate query and an accumulator, which was a source of issues too. I think we will have more options when we move this to Spark (e.g. we will be able to insert a rule at the query prep stage when we already know the physical plan).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r794072229



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,15 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    if (performCardinalityCheck) {
+      // request a local sort by the row ID attrs to co-locate matches for the same target row
+      Seq(rowIdAttrs.map(attr => SortOrder(attr, Ascending)))

Review comment:
       Yeah our old approach was also not great, I guess if we have things built into spark it will be a lot safer




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r796831705



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -186,7 +187,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand {
     // disable broadcasts for the target table to perform the cardinality check
     val joinType = if (notMatchedActions.isEmpty) LeftOuter else FullOuter
     val joinHint = JoinHint(leftHint = Some(HintInfo(Some(NO_BROADCAST_HASH))), rightHint = None)
-    val joinPlan = Join(targetTableProj, sourceTableProj, joinType, Some(cond), joinHint)
+    val joinPlan = Join(NoStatsUnaryNode(targetTableProj), sourceTableProj, joinType, Some(cond), joinHint)

Review comment:
       Yeah, I think we should be fine as `RowLevelCommandScanRelationPushDown` takes the condition directly and applies it to the scan relation. That being said, there is probably another issue I missed.  I'll submit a separate PR for that.
   
   Thanks for checking, though, @singhpk234!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r794065099



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,15 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    if (performCardinalityCheck) {
+      // request a local sort by the row ID attrs to co-locate matches for the same target row
+      Seq(rowIdAttrs.map(attr => SortOrder(attr, Ascending)))

Review comment:
       Feels like this is all leaning heavily on implementation details to actually do our check. While I think this fix is fine for now, maybe we should consider actually doing a group by the cardinality check or a custom aggregation which just throws an error on duplicate records?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r794194711



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,16 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    // request a local sort by the row ID attrs if shuffle hash joins are enabled
+    // this is needed to co-locate matches for the same target row after the shuffle
+    if (performCardinalityCheck && !conf.preferSortMergeJoin) {

Review comment:
       Thank you @aokolnychyi for checking, it makes perfect sense :) !!!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793850856



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,16 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    // request a local sort by the row ID attrs if shuffle hash joins are enabled
+    // this is needed to co-locate matches for the same target row after the shuffle
+    if (performCardinalityCheck && !conf.preferSortMergeJoin) {

Review comment:
       Good point, @singhpk234. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793968478



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NoStatsUnaryNode.scala
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+case class NoStatsUnaryNode(child: LogicalPlan) extends UnaryNode {

Review comment:
       How does this address broadcasted nested loop joins? Does it make Spark fail if one is required because it doesn't think the data can be broadcasted?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r796278084



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -186,7 +187,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand {
     // disable broadcasts for the target table to perform the cardinality check
     val joinType = if (notMatchedActions.isEmpty) LeftOuter else FullOuter
     val joinHint = JoinHint(leftHint = Some(HintInfo(Some(NO_BROADCAST_HASH))), rightHint = None)
-    val joinPlan = Join(targetTableProj, sourceTableProj, joinType, Some(cond), joinHint)
+    val joinPlan = Join(NoStatsUnaryNode(targetTableProj), sourceTableProj, joinType, Some(cond), joinHint)

Review comment:
       [doubt] Hi All, Can adding a new UnaryNode mess with predicatePushDown rule ?
    
   At present predicatePushDown rule maintains the list of unary logical operator from which it's safe to push down the filter's from. so when a new node we introduce since it's not present in the allow-list and hence filter will get stuck at the parent of that node ... and will not be pushed through, which in turn cause perf regression. 
   
   Not sure if this is the case what's the best possible way to handle it. [Spark 3.2 code pointer](https://github.com/apache/spark/blob/v3.2.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1528-L1550)
   
   ```
   
       case filter @ Filter(_, u: UnaryNode)
           if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
         pushDownPredicate(filter, u.child) { predicate =>
           u.withNewChildren(Seq(Filter(predicate, u.child)))
         }
     }
   
     def canPushThrough(p: UnaryNode): Boolean = p match {
       // Note that some operators (e.g. project, aggregate, union) are being handled separately
       // (earlier in this rule).
       case _: AppendColumns => true
       case _: Distinct => true
       case _: Generate => true
       case _: Pivot => true
       case _: RepartitionByExpression => true
       case _: Repartition => true
       case _: ScriptTransformation => true
       case _: Sort => true
       case _: BatchEvalPython => true
       case _: ArrowEvalPython => true
       case _: Expand => true
       case _ => false
     }
   ```
   
   I recently hit something similar (not in iceberg), hence posting here just in case it's applicable here as well.
   
   cc @aokolnychyi @rdblue @RussellSpitzer 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
singhpk234 commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793328800



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,16 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    // request a local sort by the row ID attrs if shuffle hash joins are enabled
+    // this is needed to co-locate matches for the same target row after the shuffle
+    if (performCardinalityCheck && !conf.preferSortMergeJoin) {

Review comment:
       Hello Anton, 
   
   I have a small doubt, as per my understanding, at present not preferingSMJ doesn't implies SHJ being selected it's conjuncted with canBuildLocalHashMapBySize threshold as well as muchSmaller check's ([CP](https://github.com/apache/spark/blob/v3.2.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala#L277)).
   
   Also there are conf's such as "spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold" in AQE which even works slightly orthogonal by adding internal "PREFER_SHUFFLE_HASH" hint
   
   Any pointer's how are we thinking of handling it or is the handling even required (appologies I am very new to iceberg)  ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793979296



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/NoStatsUnaryNode.scala
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+case class NoStatsUnaryNode(child: LogicalPlan) extends UnaryNode {

Review comment:
       We have a problem only if the target is broadcasted. Otherwise, broadcast nested loop joins are fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793984876



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,15 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    if (performCardinalityCheck) {
+      // request a local sort by the row ID attrs to co-locate matches for the same target row
+      Seq(rowIdAttrs.map(attr => SortOrder(attr, Ascending)))

Review comment:
       Instead of requesting a sort, we could keep track of all seen row IDs in a task (we keep only the last one right now). That will potentially require more memory.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793280624



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,16 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    // request a local sort by the row ID attrs if shuffle hash joins are enabled
+    // this is needed to co-locate matches for the same target row after the shuffle
+    if (performCardinalityCheck && !conf.preferSortMergeJoin) {

Review comment:
       It is only the shuffle hash join that needs to be fixed. In case of sort-merge joins, the incoming and outcoming records are already sorted by the attributes in the ON condition, which is sufficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793849424



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,16 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    // request a local sort by the row ID attrs if shuffle hash joins are enabled
+    // this is needed to co-locate matches for the same target row after the shuffle
+    if (performCardinalityCheck && !conf.preferSortMergeJoin) {

Review comment:
       Yeah, the problem is that only Spark knows which join type it is going to select and we have almost no control over it. I knew we won’t have a shuffle hash join if that config is set in the non-AQE path. I need to take a closer look at AQE. If it can add a shuffle hash join, then we probably have to always request a sort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#issuecomment-1023652086


   @RussellSpitzer, added some description. Added tests used to fail before too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793285394



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,16 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    // request a local sort by the row ID attrs if shuffle hash joins are enabled
+    // this is needed to co-locate matches for the same target row after the shuffle
+    if (performCardinalityCheck && !conf.preferSortMergeJoin) {

Review comment:
       Joins implementations that involve a broadcast are handled differently too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3992: Spark 3.2: Fix cardinality check for alternative join implementations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3992:
URL: https://github.com/apache/iceberg/pull/3992#discussion_r793984876



##########
File path: spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
##########
@@ -46,6 +48,15 @@ case class MergeRowsExec(
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryExecNode {
 
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
+    if (performCardinalityCheck) {
+      // request a local sort by the row ID attrs to co-locate matches for the same target row
+      Seq(rowIdAttrs.map(attr => SortOrder(attr, Ascending)))

Review comment:
       Instead of requesting a sort, we could keep track of all seen row IDs in a task (we keep only the last one right now). That will require more memory but we will be able to avoid the local sort.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org