You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/05/08 18:34:51 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

aokolnychyi opened a new pull request, #7558:
URL: https://github.com/apache/iceberg/pull/7558

   This PR avoids an extra local sort to perform the cardinality check in MERGE. Instead, we can use a bitmap to check for matches within a partition as we rely on `MonotonicallyIncreasingID` expression that generates 64 bit longs. This change would be important when the size of a task is big and we may spill to disk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi merged PR #7558:
URL: https://github.com/apache/iceberg/pull/7558


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187813930


##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ *   ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ *       -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {

Review Comment:
   Any chance you can put that into a spreadsheet or bar charts? Very hard to read atm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187803191


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -257,8 +259,17 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand with Predicat
 
     val (targetCond, joinCond) = splitMergeCond(cond, readRelation)
 
+    val performCardinalityCheck = isCardinalityCheckNeeded(matchedActions)
+
     // project an extra column to check if a target row exists after the join
-    val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, ROW_FROM_TARGET)()
+    // if needed, project a synthetic row ID to perform the cardinality check
+    val rowFromTarget = Alias(TrueLiteral, ROW_FROM_TARGET)()

Review Comment:
   I initially added a method for this logic but it made the method harder to read by hiding the actual projection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187803997


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
       }
     }
 
-    var lastMatchedRowId: InternalRow = null
+    val rowIdAttrOrdinal = if (performCardinalityCheck) {
+      child.output.indexWhere(attr => conf.resolver(attr.name, ROW_ID))
+    } else {
+      -1
+    }
+    val matchedRowIds = new Roaring64Bitmap()
 
     def processRowWithCardinalityCheck(inputRow: InternalRow): InternalRow = {
       val isSourceRowPresent = isSourceRowPresentPred.eval(inputRow)
       val isTargetRowPresent = isTargetRowPresentPred.eval(inputRow)
 
       if (isSourceRowPresent && isTargetRowPresent) {
-        val currentRowId = rowIdProj.apply(inputRow)
-        if (currentRowId == lastMatchedRowId) {
+        val currentRowId = inputRow.getLong(rowIdAttrOrdinal)
+        if (matchedRowIds.contains(currentRowId)) {

Review Comment:
   We have existing tests that cover this logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187801505


##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ *   ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ *       -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {

Review Comment:
   I ran this benchmark with limited amount of memory to trigger a spill.
   
   After this PR:
   
   ```
   Benchmark                                                                                                              Mode  Cnt            Score             Error   Units
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates                                          ss    5           14.008 ±           1.114    s/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate                           ss    5          624.164 ±        1334.312  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate.norm                      ss    5   9505459424.000 ± 20288960578.627    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          780.734 ±          92.906  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5  11950350313.600 ±   694094571.732    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           31.881 ±           6.943  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    488349792.000 ±   124003813.363    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5           15.077 ±          12.701  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    231542206.400 ±   207651939.834    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.count                                ss    5          472.000                    counts
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.time                                 ss    5         2582.000                        ms
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates                                          ss    5           14.293 ±           2.359    s/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate                           ss    5          659.724 ±        1410.072  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate.norm                      ss    5  10049619374.400 ± 21458040059.395    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          806.934 ±         173.684  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5  12564385644.800 ±   513377912.659    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           29.238 ±          42.954  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    460825030.400 ±   745650149.127    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5           14.464 ±          18.159  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    223303942.400 ±   256535347.399    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.count                                ss    5          482.000                    counts
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.time                                 ss    5         2816.000                        ms
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates                                          ss    5           14.536 ±           1.344    s/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate                           ss    5          730.682 ±        1562.804  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate.norm                      ss    5  11538325014.400 ± 24664038783.803    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          912.245 ±          52.012  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5  14474100220.800 ±   866377294.592    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           30.659 ±           8.003  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    486578179.200 ±   138192193.973    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5           14.411 ±          16.066  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    229303304.000 ±   263701209.435    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.count                                ss    5          546.000                    counts
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.time                                 ss    5         2852.000                        ms
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates                                          ss    5            9.585 ±           0.467    s/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate                           ss    5          403.703 ±         858.176  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate.norm                      ss    5   4268313212.800 ±  9069692042.323    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          506.239 ±          18.015  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5   5401220540.800 ±   311378551.093    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           47.000 ±           7.876  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    501325486.400 ±    73440119.656    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5           35.291 ±           7.181  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    376348009.600 ±    59329430.340    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.count                                ss    5          283.000                    counts
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.time                                 ss    5         2046.000                        ms
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates                                          ss    5           14.910 ±           0.264    s/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate                           ss    5          471.898 ±        1008.648  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate.norm                      ss    5   7642251948.800 ± 16332182468.282    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Compressed_Class_Space         ss    5           ≈ 10⁻⁵                    MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Compressed_Class_Space.norm    ss    5          340.800 ±        2934.391    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Metaspace                      ss    5           ≈ 10⁻⁴                    MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Metaspace.norm                 ss    5         2308.800 ±       19879.464    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          590.454 ±          19.922  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5   9599305955.200 ±   356788651.250    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           36.759 ±          22.159  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    597412112.000 ±   351814646.900    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5           19.864 ±          20.618  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    322619472.000 ±   330982316.720    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.count                                ss    5          413.000                    counts
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.time                                 ss    5         2768.000                        ms
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates                                          ss    5           30.746 ±           0.237    s/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate                           ss    5          558.963 ±        1199.621  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate.norm                      ss    5  18319455320.000 ± 39313853255.399    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.Compressed_Class_Space         ss    5           ≈ 10⁻⁵                    MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.Compressed_Class_Space.norm    ss    5          227.200 ±        1956.261    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.Metaspace                      ss    5           ≈ 10⁻⁴                    MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.Metaspace.norm                 ss    5         1532.800 ±       13197.870    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          699.587 ±          19.525  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5  22995350668.800 ±   217073351.921    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           18.798 ±          15.458  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    617572155.200 ±   501348578.823    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5            6.876 ±          14.164  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    226223860.800 ±   466613461.679    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.count                                ss    5          782.000                    counts
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.time                                 ss    5         4540.000                        ms
   ```
   
   Prior to this PR:
   
   ```
   Benchmark                                                                                                              Mode  Cnt            Score             Error   Units
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates                                          ss    5           16.711 ±           0.288    s/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate                           ss    5          475.484 ±        1014.578  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate.norm                      ss    5   8583528067.200 ± 18305445243.355    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          590.190 ±          55.152  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5  10712825942.400 ±   743358360.806    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           31.562 ±          26.607  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    572645452.800 ±   476142338.417    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5           22.599 ±          18.581  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    409911558.400 ±   329191091.194    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.count                                ss    5          500.000                    counts
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.time                                 ss    5         3326.000                        ms
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates                                          ss    5           17.242 ±           2.855    s/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate                           ss    5          492.190 ±        1054.126  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate.norm                      ss    5   8992843612.800 ± 19250288178.246    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          610.326 ±         116.058  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5  11411591486.400 ±  2581947180.157    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           34.300 ±          16.759  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    642631184.000 ±   364011738.560    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5           30.416 ±          53.728  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    575864764.800 ±  1089214428.030    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.count                                ss    5          540.000                    counts
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.time                                 ss    5         3560.000                        ms
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates                                          ss    5           18.966 ±           6.966    s/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate                           ss    5          513.704 ±        1103.472  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate.norm                      ss    5  10075823168.000 ± 21520293879.410    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          624.511 ±         179.942  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5  12742282712.000 ±  1322795064.801    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           29.553 ±          19.751  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    604876179.200 ±   409265589.171    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5           23.897 ±          27.556  MB/sec
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    498747096.000 ±   747887624.405    B/op
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.count                                ss    5          573.000                    counts
   MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.time                                 ss    5         3785.000                        ms
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates                                          ss    5           10.493 ±           5.130    s/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate                           ss    5          366.271 ±         802.929  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate.norm                      ss    5   4243684422.400 ±  9020075458.315    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.Compressed_Class_Space         ss    5           ≈ 10⁻⁵                    MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.Compressed_Class_Space.norm    ss    5          113.600 ±         978.130    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.Metaspace                      ss    5           ≈ 10⁻⁴                    MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.Metaspace.norm                 ss    5          763.200 ±        6571.382    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          464.535 ±         194.300  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5   5346972729.600 ±   267965489.668    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           41.309 ±          14.946  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    476511608.000 ±    81677371.186    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5           34.097 ±          21.026  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    391727609.600 ±   148897428.365    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.count                                ss    5          285.000                    counts
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.time                                 ss    5         2674.000                        ms
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates                                          ss    5           15.171 ±           0.275    s/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate                           ss    5          454.574 ±         971.471  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate.norm                      ss    5   7464753795.200 ± 15949181369.907    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Compressed_Class_Space         ss    5           ≈ 10⁻⁵                    MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Compressed_Class_Space.norm    ss    5          340.800 ±        2934.391    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Metaspace                      ss    5           ≈ 10⁻⁴                    MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Metaspace.norm                 ss    5         2289.600 ±       19714.147    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          569.495 ±          24.852  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5   9415747144.000 ±   409317072.829    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           39.776 ±          28.493  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    658224896.000 ±   482483321.484    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5           26.547 ±          12.463  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    439423488.000 ±   227418830.538    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.count                                ss    5          440.000                    counts
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.time                                 ss    5         2880.000                        ms
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates                                          ss    5           32.714 ±           0.544    s/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·async                                   ss                   NaN                       ---
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate                           ss    5          532.731 ±        1143.312  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate.norm                      ss    5  18556670550.400 ± 39822371528.195    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space                  ss    5          666.734 ±          18.715  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space.norm             ss    5  23297075236.800 ±   342988031.685    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen                     ss    5           21.372 ±           3.232  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen.norm                ss    5    746814603.200 ±   117162314.172    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space              ss    5            9.224 ±          19.190  MB/sec
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space.norm         ss    5    323181336.000 ±   679009937.529    B/op
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.count                                ss    5          813.000                    counts
   MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.time                                 ss    5         7027.000                        ms
   ```
   
   The new solution is a bit faster and memory efficient but it could make a huge difference in real examples when the size of data is bigger (i.e. the spill is a big issue) and the disk is slower. Also, keep in mind this benchmark still does a local sort triggering a spill before the write as the table is ordered. If we disable that local sort too, the difference would be much bigger as the new solution would avoid the spill completely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187796353


##########
spark/v3.4/build.gradle:
##########
@@ -135,6 +135,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
 
   dependencies {
     implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}")
+    implementation "org.roaringbitmap:RoaringBitmap"

Review Comment:
   We already depend on `RoaringBitmap` for vectorized reads with deletes. It is already relocated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187802164


##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ *   ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ *       -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {

Review Comment:
   My initial idea was to use a set but that degraded the performance for cases with 90% matches as the GC time became an issue. Using a bitmap solved the problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1538918349

   cc @singhpk234 @amogh-jahagirdar @jackye1995 @flyrain @RussellSpitzer @szehon-ho @rdblue @nastra @Fokko 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1541336715

   I took a closer look. If I remember correctly, we added `NoStatsUnaryNode` to prevent issues with broadcast nested loop joins when the target would be the broadcasted side. It is an extremely edge case as the target must be smaller and there must be no equality condition in the ON clause. That means we should probably keep it for now unless it causes a performance degradation. Then we may workaround it.
   
   I am reluctant to provide an explicit hint to prefer a shuffle hash join as such joins can lead to OOM if the size of each partition after the shuffle on the build size is too big to fit into memory. We highly advise our users to configure Spark to use shuffle joins but that's not always possible. That said, let me think more about this tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187836896


##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ *   ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ *       -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {

Review Comment:
   ```
   Benchmark                                                                                     Mode  Cnt            Score             Error   Units
   [NEW] copyOnWriteMergeCardinalityCheck10PercentUpdates                                          ss    5           14.008 ±           1.114    s/op
   [OLD] copyOnWriteMergeCardinalityCheck10PercentUpdates                                          ss    5           16.711 ±           0.288    s/op
   
   [NEW] copyOnWriteMergeCardinalityCheck30PercentUpdates                                          ss    5           14.293 ±           2.359    s/op
   [OLD] copyOnWriteMergeCardinalityCheck30PercentUpdates                                          ss    5           17.242 ±           2.855    s/op
   
   [NEW] copyOnWriteMergeCardinalityCheck90PercentUpdates                                          ss    5           14.536 ±           1.344    s/op
   [OLD] copyOnWriteMergeCardinalityCheck90PercentUpdates                                          ss    5           18.966 ±           6.966    s/op
   
   [NEW] mergeOnReadMergeCardinalityCheck10PercentUpdates                                          ss    5            9.585 ±           0.467    s/op
   [OLD] mergeOnReadMergeCardinalityCheck10PercentUpdates                                          ss    5           10.493 ±           5.130    s/op
   
   [NEW] mergeOnReadMergeCardinalityCheck30PercentUpdates                                          ss    5           14.910 ±           0.264    s/op
   [OLD] mergeOnReadMergeCardinalityCheck30PercentUpdates                                          ss    5           15.171 ±           0.275    s/op
   
   [NEW] mergeOnReadMergeCardinalityCheck90PercentUpdates                                          ss    5           30.746 ±           0.237    s/op
   [OLD] mergeOnReadMergeCardinalityCheck90PercentUpdates                                          ss    5           32.714 ±           0.544    s/op
   ```
   
   Here is a shorter representation. There is a small reduction in terms of memory too. Like said above, this run still does a local sort and trigger a spill, we just reduce the amount of spilled data. If we skip the final sort, we can actually avoid the local sort altogether.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187836896


##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ *   ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ *       -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {

Review Comment:
   ```
   Benchmark                                                                                     Mode  Cnt            Score             Error   Units
   [NEW] copyOnWriteMergeCardinalityCheck10PercentUpdates                                          ss    5           14.008 ±           1.114    s/op
   [OLD] copyOnWriteMergeCardinalityCheck10PercentUpdates                                          ss    5           16.711 ±           0.288    s/op
   
   [NEW] copyOnWriteMergeCardinalityCheck30PercentUpdates                                          ss    5           14.293 ±           2.359    s/op
   [OLD] copyOnWriteMergeCardinalityCheck30PercentUpdates                                          ss    5           17.242 ±           2.855    s/op
   
   [NEW] copyOnWriteMergeCardinalityCheck90PercentUpdates                                          ss    5           14.536 ±           1.344    s/op
   [OLD] copyOnWriteMergeCardinalityCheck90PercentUpdates                                          ss    5           18.966 ±           6.966    s/op
   
   [NEW] mergeOnReadMergeCardinalityCheck10PercentUpdates                                          ss    5            9.585 ±           0.467    s/op
   [OLD] mergeOnReadMergeCardinalityCheck10PercentUpdates                                          ss    5           10.493 ±           5.130    s/op
   
   [NEW] mergeOnReadMergeCardinalityCheck30PercentUpdates                                          ss    5           14.910 ±           0.264    s/op
   [OLD] mergeOnReadMergeCardinalityCheck30PercentUpdates                                          ss    5           15.171 ±           0.275    s/op
   
   [NEW] mergeOnReadMergeCardinalityCheck90PercentUpdates                                          ss    5           30.746 ±           0.237    s/op
   [OLD] mergeOnReadMergeCardinalityCheck90PercentUpdates                                          ss    5           32.714 ±           0.544    s/op
   ```
   
   Here is a shorter representation. There is a small reduction in terms memory too. Like said above, this run still does a local sort and trigger a spill, we just reduce the amount of spilled data. If we skip the final sort, we can actually avoid the local sort altogether.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1539307223

   Let me take a closer look, I forgot the details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1550351186

   Thanks for reviewing, @singhpk234 @RussellSpitzer @amogh-jahagirdar!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187802164


##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ *   ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ *       -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {

Review Comment:
   My initial idea was to use a set but that degraded the performance for cases with 90% matches as the GC time became an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1539000223

   @singhpk234, it is still critical to ensure the target table is not broadcasted to execute the cardinality check. If we broadcast the target table, we will not be able to detect multiple matches from different source partitions. That is usually not a problem as the target is much bigger.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195535191


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
       }
     }
 
-    var lastMatchedRowId: InternalRow = null
+    val rowIdAttrOrdinal = if (performCardinalityCheck) {
+      child.output.indexWhere(attr => conf.resolver(attr.name, ROW_ID))
+    } else {
+      -1
+    }
+    val matchedRowIds = new Roaring64Bitmap()
 
     def processRowWithCardinalityCheck(inputRow: InternalRow): InternalRow = {
       val isSourceRowPresent = isSourceRowPresentPred.eval(inputRow)
       val isTargetRowPresent = isTargetRowPresentPred.eval(inputRow)
 
       if (isSourceRowPresent && isTargetRowPresent) {
-        val currentRowId = rowIdProj.apply(inputRow)
-        if (currentRowId == lastMatchedRowId) {
+        val currentRowId = inputRow.getLong(rowIdAttrOrdinal)
+        if (matchedRowIds.contains(currentRowId)) {

Review Comment:
   Should we have a sanity check that `rowIdAttrOrdinal` is not -1 at this point? Should not be since we only hit this case when we want to perform the cardinality check but just to make sure that there's a valid row ID at this point, and in case there's not a clear error message is surfaced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1550486478

   Just a note for later pr : the new benchmark makes a ErrorProne warning:
   
   ```
   > Task :iceberg-spark:iceberg-spark-extensions-3.4_2.12:compileJmhJava
   /Users/szehon/repos/apache-iceberg/iceberg/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:224: warning: [AnnotateFormatMethod] This method passes a pair of parameters through to String.format, but the enclosing method wasn't annotated @FormatMethod. Doing so gives compile-time rather than run-time protection against malformed format strings.
     private void sql(String query, Object... args) {
                  ^
       (see https://errorprone.info/bugpattern/AnnotateFormatMethod)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1548886089

   I took another look. I think providing a hint would always trigger a shuffle hash join so that's not safe. I'd keep it as is for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195550649


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
       }
     }
 
-    var lastMatchedRowId: InternalRow = null
+    val rowIdAttrOrdinal = if (performCardinalityCheck) {

Review Comment:
   Actually maybe not ... Mainly i just want to remove the branch if we can. Maybe just make this an option?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195549503


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
       }
     }
 
-    var lastMatchedRowId: InternalRow = null
+    val rowIdAttrOrdinal = if (performCardinalityCheck) {

Review Comment:
   Can we move this into
   `def processRowWithCadinalityCheck`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187836896


##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ *   ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ *       -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {

Review Comment:
   ```
   Benchmark                                                                                     Mode  Cnt            Score             Error   Units
   [NEW] copyOnWriteMergeCardinalityCheck10PercentUpdates                                          ss    5           14.008 ±           1.114    s/op
   [OLD] copyOnWriteMergeCardinalityCheck10PercentUpdates                                          ss    5           16.711 ±           0.288    s/op
   
   [NEW] copyOnWriteMergeCardinalityCheck30PercentUpdates                                          ss    5           14.293 ±           2.359    s/op
   [OLD] copyOnWriteMergeCardinalityCheck30PercentUpdates                                          ss    5           17.242 ±           2.855    s/op
   
   [NEW] copyOnWriteMergeCardinalityCheck90PercentUpdates                                          ss    5           14.536 ±           1.344    s/op
   [OLD] copyOnWriteMergeCardinalityCheck90PercentUpdates                                          ss    5           18.966 ±           6.966    s/op
   
   [NEW] mergeOnReadMergeCardinalityCheck10PercentUpdates                                          ss    5            9.585 ±           0.467    s/op
   [OLD] mergeOnReadMergeCardinalityCheck10PercentUpdates                                          ss    5           10.493 ±           5.130    s/op
   
   [NEW] mergeOnReadMergeCardinalityCheck30PercentUpdates                                          ss    5           14.910 ±           0.264    s/op
   [OLD] mergeOnReadMergeCardinalityCheck30PercentUpdates                                          ss    5           15.171 ±           0.275    s/op
   
   [NEW] mergeOnReadMergeCardinalityCheck90PercentUpdates                                          ss    5           30.746 ±           0.237    s/op
   [OLD] mergeOnReadMergeCardinalityCheck90PercentUpdates                                          ss    5           32.714 ±           0.544    s/op
   ```
   
   Here is a shorter representation. There is a small reduction in terms of memory too. Like said above, this run still does a local sort and trigger a spill, we just reduce the amount of spilled data. If we skip the final sort, we can actually avoid the local sort altogether, which was not possible before. A slower disk would also increase the difference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187803191


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -257,8 +259,17 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand with Predicat
 
     val (targetCond, joinCond) = splitMergeCond(cond, readRelation)
 
+    val performCardinalityCheck = isCardinalityCheckNeeded(matchedActions)
+
     // project an extra column to check if a target row exists after the join
-    val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, ROW_FROM_TARGET)()
+    // if needed, project a synthetic row ID to perform the cardinality check
+    val rowFromTarget = Alias(TrueLiteral, ROW_FROM_TARGET)()

Review Comment:
   I initially added a method for this logic but it made the main method harder to read by hiding the actual projection expressions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187844125


##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ *   ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ *       -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {

Review Comment:
   Also, MoR is slower than CoW for 90% of updates as we are doing twice the work marking almost all records as deleted. I just tested that to provide there is no memory or performance degradation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195611995


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
       }
     }
 
-    var lastMatchedRowId: InternalRow = null
+    val rowIdAttrOrdinal = if (performCardinalityCheck) {
+      child.output.indexWhere(attr => conf.resolver(attr.name, ROW_ID))
+    } else {
+      -1
+    }
+    val matchedRowIds = new Roaring64Bitmap()
 
     def processRowWithCardinalityCheck(inputRow: InternalRow): InternalRow = {
       val isSourceRowPresent = isSourceRowPresentPred.eval(inputRow)
       val isTargetRowPresent = isTargetRowPresentPred.eval(inputRow)
 
       if (isSourceRowPresent && isTargetRowPresent) {
-        val currentRowId = rowIdProj.apply(inputRow)
-        if (currentRowId == lastMatchedRowId) {
+        val currentRowId = inputRow.getLong(rowIdAttrOrdinal)
+        if (matchedRowIds.contains(currentRowId)) {

Review Comment:
   I can add validation, let me see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195611262


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
       }
     }
 
-    var lastMatchedRowId: InternalRow = null
+    val rowIdAttrOrdinal = if (performCardinalityCheck) {

Review Comment:
   Unlike `processRowWithCadinalityCheck`, this is not called per every row. It is called once per task so having a branch should not impact the performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1539032218

   Agree, we definitely require shuffling, looks like NoStatsUnary node handling was only to avoid BNLJ and not just for BHJ, for avoiding BHJ we still need NO_BROADCAST_HASH hint as some one can set the bhj threshold > Long.MaxValue is this understanding correct ? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org