You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/05/08 18:34:51 UTC
[GitHub] [iceberg] aokolnychyi opened a new pull request, #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
aokolnychyi opened a new pull request, #7558:
URL: https://github.com/apache/iceberg/pull/7558
This PR avoids an extra local sort to perform the cardinality check in MERGE. Instead, we can use a bitmap to check for matches within a partition as we rely on `MonotonicallyIncreasingID` expression that generates 64 bit longs. This change would be important when the size of a task is big and we may spill to disk.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi merged pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi merged PR #7558:
URL: https://github.com/apache/iceberg/pull/7558
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187813930
##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ * ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ * -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {
Review Comment:
Any chance you can put that into a spreadsheet or bar charts? Very hard to read atm
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187803191
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -257,8 +259,17 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand with Predicat
val (targetCond, joinCond) = splitMergeCond(cond, readRelation)
+ val performCardinalityCheck = isCardinalityCheckNeeded(matchedActions)
+
// project an extra column to check if a target row exists after the join
- val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, ROW_FROM_TARGET)()
+ // if needed, project a synthetic row ID to perform the cardinality check
+ val rowFromTarget = Alias(TrueLiteral, ROW_FROM_TARGET)()
Review Comment:
I initially added a method for this logic but it made the method harder to read by hiding the actual projection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187803997
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
}
}
- var lastMatchedRowId: InternalRow = null
+ val rowIdAttrOrdinal = if (performCardinalityCheck) {
+ child.output.indexWhere(attr => conf.resolver(attr.name, ROW_ID))
+ } else {
+ -1
+ }
+ val matchedRowIds = new Roaring64Bitmap()
def processRowWithCardinalityCheck(inputRow: InternalRow): InternalRow = {
val isSourceRowPresent = isSourceRowPresentPred.eval(inputRow)
val isTargetRowPresent = isTargetRowPresentPred.eval(inputRow)
if (isSourceRowPresent && isTargetRowPresent) {
- val currentRowId = rowIdProj.apply(inputRow)
- if (currentRowId == lastMatchedRowId) {
+ val currentRowId = inputRow.getLong(rowIdAttrOrdinal)
+ if (matchedRowIds.contains(currentRowId)) {
Review Comment:
We have existing tests that cover this logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187801505
##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ * ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ * -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {
Review Comment:
I ran this benchmark with limited amount of memory to trigger a spill.
After this PR:
```
Benchmark Mode Cnt Score Error Units
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates ss 5 14.008 ± 1.114 s/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate ss 5 624.164 ± 1334.312 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate.norm ss 5 9505459424.000 ± 20288960578.627 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space ss 5 780.734 ± 92.906 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 11950350313.600 ± 694094571.732 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen ss 5 31.881 ± 6.943 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 488349792.000 ± 124003813.363 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 15.077 ± 12.701 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 231542206.400 ± 207651939.834 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.count ss 5 472.000 counts
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.time ss 5 2582.000 ms
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates ss 5 14.293 ± 2.359 s/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate ss 5 659.724 ± 1410.072 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate.norm ss 5 10049619374.400 ± 21458040059.395 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space ss 5 806.934 ± 173.684 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 12564385644.800 ± 513377912.659 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen ss 5 29.238 ± 42.954 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 460825030.400 ± 745650149.127 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 14.464 ± 18.159 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 223303942.400 ± 256535347.399 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.count ss 5 482.000 counts
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.time ss 5 2816.000 ms
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates ss 5 14.536 ± 1.344 s/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate ss 5 730.682 ± 1562.804 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate.norm ss 5 11538325014.400 ± 24664038783.803 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space ss 5 912.245 ± 52.012 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 14474100220.800 ± 866377294.592 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen ss 5 30.659 ± 8.003 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 486578179.200 ± 138192193.973 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 14.411 ± 16.066 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 229303304.000 ± 263701209.435 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.count ss 5 546.000 counts
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.time ss 5 2852.000 ms
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates ss 5 9.585 ± 0.467 s/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate ss 5 403.703 ± 858.176 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate.norm ss 5 4268313212.800 ± 9069692042.323 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space ss 5 506.239 ± 18.015 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 5401220540.800 ± 311378551.093 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen ss 5 47.000 ± 7.876 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 501325486.400 ± 73440119.656 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 35.291 ± 7.181 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 376348009.600 ± 59329430.340 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.count ss 5 283.000 counts
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.time ss 5 2046.000 ms
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates ss 5 14.910 ± 0.264 s/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate ss 5 471.898 ± 1008.648 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate.norm ss 5 7642251948.800 ± 16332182468.282 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Compressed_Class_Space ss 5 ≈ 10⁻⁵ MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Compressed_Class_Space.norm ss 5 340.800 ± 2934.391 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Metaspace ss 5 ≈ 10⁻⁴ MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Metaspace.norm ss 5 2308.800 ± 19879.464 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space ss 5 590.454 ± 19.922 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 9599305955.200 ± 356788651.250 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen ss 5 36.759 ± 22.159 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 597412112.000 ± 351814646.900 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 19.864 ± 20.618 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 322619472.000 ± 330982316.720 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.count ss 5 413.000 counts
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.time ss 5 2768.000 ms
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates ss 5 30.746 ± 0.237 s/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate ss 5 558.963 ± 1199.621 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate.norm ss 5 18319455320.000 ± 39313853255.399 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.Compressed_Class_Space ss 5 ≈ 10⁻⁵ MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.Compressed_Class_Space.norm ss 5 227.200 ± 1956.261 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.Metaspace ss 5 ≈ 10⁻⁴ MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.Metaspace.norm ss 5 1532.800 ± 13197.870 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space ss 5 699.587 ± 19.525 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 22995350668.800 ± 217073351.921 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen ss 5 18.798 ± 15.458 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 617572155.200 ± 501348578.823 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 6.876 ± 14.164 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 226223860.800 ± 466613461.679 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.count ss 5 782.000 counts
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.time ss 5 4540.000 ms
```
Prior to this PR:
```
Benchmark Mode Cnt Score Error Units
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates ss 5 16.711 ± 0.288 s/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate ss 5 475.484 ± 1014.578 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate.norm ss 5 8583528067.200 ± 18305445243.355 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space ss 5 590.190 ± 55.152 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 10712825942.400 ± 743358360.806 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen ss 5 31.562 ± 26.607 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 572645452.800 ± 476142338.417 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 22.599 ± 18.581 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 409911558.400 ± 329191091.194 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.count ss 5 500.000 counts
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck10PercentUpdates:·gc.time ss 5 3326.000 ms
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates ss 5 17.242 ± 2.855 s/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate ss 5 492.190 ± 1054.126 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate.norm ss 5 8992843612.800 ± 19250288178.246 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space ss 5 610.326 ± 116.058 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 11411591486.400 ± 2581947180.157 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen ss 5 34.300 ± 16.759 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 642631184.000 ± 364011738.560 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 30.416 ± 53.728 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 575864764.800 ± 1089214428.030 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.count ss 5 540.000 counts
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck30PercentUpdates:·gc.time ss 5 3560.000 ms
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates ss 5 18.966 ± 6.966 s/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate ss 5 513.704 ± 1103.472 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate.norm ss 5 10075823168.000 ± 21520293879.410 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space ss 5 624.511 ± 179.942 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 12742282712.000 ± 1322795064.801 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen ss 5 29.553 ± 19.751 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 604876179.200 ± 409265589.171 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 23.897 ± 27.556 MB/sec
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 498747096.000 ± 747887624.405 B/op
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.count ss 5 573.000 counts
MergeCardinalityCheckBenchmark.copyOnWriteMergeCardinalityCheck90PercentUpdates:·gc.time ss 5 3785.000 ms
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates ss 5 10.493 ± 5.130 s/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate ss 5 366.271 ± 802.929 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.alloc.rate.norm ss 5 4243684422.400 ± 9020075458.315 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.Compressed_Class_Space ss 5 ≈ 10⁻⁵ MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.Compressed_Class_Space.norm ss 5 113.600 ± 978.130 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.Metaspace ss 5 ≈ 10⁻⁴ MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.Metaspace.norm ss 5 763.200 ± 6571.382 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space ss 5 464.535 ± 194.300 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 5346972729.600 ± 267965489.668 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen ss 5 41.309 ± 14.946 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 476511608.000 ± 81677371.186 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 34.097 ± 21.026 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 391727609.600 ± 148897428.365 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.count ss 5 285.000 counts
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck10PercentUpdates:·gc.time ss 5 2674.000 ms
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates ss 5 15.171 ± 0.275 s/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate ss 5 454.574 ± 971.471 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.alloc.rate.norm ss 5 7464753795.200 ± 15949181369.907 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Compressed_Class_Space ss 5 ≈ 10⁻⁵ MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Compressed_Class_Space.norm ss 5 340.800 ± 2934.391 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Metaspace ss 5 ≈ 10⁻⁴ MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.Metaspace.norm ss 5 2289.600 ± 19714.147 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space ss 5 569.495 ± 24.852 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 9415747144.000 ± 409317072.829 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen ss 5 39.776 ± 28.493 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 658224896.000 ± 482483321.484 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 26.547 ± 12.463 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 439423488.000 ± 227418830.538 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.count ss 5 440.000 counts
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck30PercentUpdates:·gc.time ss 5 2880.000 ms
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates ss 5 32.714 ± 0.544 s/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·async ss NaN ---
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate ss 5 532.731 ± 1143.312 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.alloc.rate.norm ss 5 18556670550.400 ± 39822371528.195 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space ss 5 666.734 ± 18.715 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Eden_Space.norm ss 5 23297075236.800 ± 342988031.685 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen ss 5 21.372 ± 3.232 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Old_Gen.norm ss 5 746814603.200 ± 117162314.172 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space ss 5 9.224 ± 19.190 MB/sec
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.churn.PS_Survivor_Space.norm ss 5 323181336.000 ± 679009937.529 B/op
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.count ss 5 813.000 counts
MergeCardinalityCheckBenchmark.mergeOnReadMergeCardinalityCheck90PercentUpdates:·gc.time ss 5 7027.000 ms
```
The new solution is a bit faster and memory efficient but it could make a huge difference in real examples when the size of data is bigger (i.e. the spill is a big issue) and the disk is slower. Also, keep in mind this benchmark still does a local sort triggering a spill before the write as the table is ordered. If we disable that local sort too, the difference would be much bigger as the new solution would avoid the spill completely.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187796353
##########
spark/v3.4/build.gradle:
##########
@@ -135,6 +135,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
dependencies {
implementation("org.scala-lang.modules:scala-collection-compat_${scalaVersion}")
+ implementation "org.roaringbitmap:RoaringBitmap"
Review Comment:
We already depend on `RoaringBitmap` for vectorized reads with deletes. It is already relocated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187802164
##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ * ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ * -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {
Review Comment:
My initial idea was to use a set but that degraded the performance for cases with 90% matches as the GC time became an issue. Using a bitmap solved the problem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1538918349
cc @singhpk234 @amogh-jahagirdar @jackye1995 @flyrain @RussellSpitzer @szehon-ho @rdblue @nastra @Fokko
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1541336715
I took a closer look. If I remember correctly, we added `NoStatsUnaryNode` to prevent issues with broadcast nested loop joins when the target would be the broadcasted side. It is an extremely edge case as the target must be smaller and there must be no equality condition in the ON clause. That means we should probably keep it for now unless it causes a performance degradation. Then we may workaround it.
I am reluctant to provide an explicit hint to prefer a shuffle hash join as such joins can lead to OOM if the size of each partition after the shuffle on the build size is too big to fit into memory. We highly advise our users to configure Spark to use shuffle joins but that's not always possible. That said, let me think more about this tomorrow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187836896
##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ * ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ * -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {
Review Comment:
```
Benchmark Mode Cnt Score Error Units
[NEW] copyOnWriteMergeCardinalityCheck10PercentUpdates ss 5 14.008 ± 1.114 s/op
[OLD] copyOnWriteMergeCardinalityCheck10PercentUpdates ss 5 16.711 ± 0.288 s/op
[NEW] copyOnWriteMergeCardinalityCheck30PercentUpdates ss 5 14.293 ± 2.359 s/op
[OLD] copyOnWriteMergeCardinalityCheck30PercentUpdates ss 5 17.242 ± 2.855 s/op
[NEW] copyOnWriteMergeCardinalityCheck90PercentUpdates ss 5 14.536 ± 1.344 s/op
[OLD] copyOnWriteMergeCardinalityCheck90PercentUpdates ss 5 18.966 ± 6.966 s/op
[NEW] mergeOnReadMergeCardinalityCheck10PercentUpdates ss 5 9.585 ± 0.467 s/op
[OLD] mergeOnReadMergeCardinalityCheck10PercentUpdates ss 5 10.493 ± 5.130 s/op
[NEW] mergeOnReadMergeCardinalityCheck30PercentUpdates ss 5 14.910 ± 0.264 s/op
[OLD] mergeOnReadMergeCardinalityCheck30PercentUpdates ss 5 15.171 ± 0.275 s/op
[NEW] mergeOnReadMergeCardinalityCheck90PercentUpdates ss 5 30.746 ± 0.237 s/op
[OLD] mergeOnReadMergeCardinalityCheck90PercentUpdates ss 5 32.714 ± 0.544 s/op
```
Here is a shorter representation. There is a small reduction in terms of memory too. Like said above, this run still does a local sort and trigger a spill, we just reduce the amount of spilled data. If we skip the final sort, we can actually avoid the local sort altogether.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187836896
##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ * ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ * -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {
Review Comment:
```
Benchmark Mode Cnt Score Error Units
[NEW] copyOnWriteMergeCardinalityCheck10PercentUpdates ss 5 14.008 ± 1.114 s/op
[OLD] copyOnWriteMergeCardinalityCheck10PercentUpdates ss 5 16.711 ± 0.288 s/op
[NEW] copyOnWriteMergeCardinalityCheck30PercentUpdates ss 5 14.293 ± 2.359 s/op
[OLD] copyOnWriteMergeCardinalityCheck30PercentUpdates ss 5 17.242 ± 2.855 s/op
[NEW] copyOnWriteMergeCardinalityCheck90PercentUpdates ss 5 14.536 ± 1.344 s/op
[OLD] copyOnWriteMergeCardinalityCheck90PercentUpdates ss 5 18.966 ± 6.966 s/op
[NEW] mergeOnReadMergeCardinalityCheck10PercentUpdates ss 5 9.585 ± 0.467 s/op
[OLD] mergeOnReadMergeCardinalityCheck10PercentUpdates ss 5 10.493 ± 5.130 s/op
[NEW] mergeOnReadMergeCardinalityCheck30PercentUpdates ss 5 14.910 ± 0.264 s/op
[OLD] mergeOnReadMergeCardinalityCheck30PercentUpdates ss 5 15.171 ± 0.275 s/op
[NEW] mergeOnReadMergeCardinalityCheck90PercentUpdates ss 5 30.746 ± 0.237 s/op
[OLD] mergeOnReadMergeCardinalityCheck90PercentUpdates ss 5 32.714 ± 0.544 s/op
```
Here is a shorter representation. There is a small reduction in terms memory too. Like said above, this run still does a local sort and trigger a spill, we just reduce the amount of spilled data. If we skip the final sort, we can actually avoid the local sort altogether.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1539307223
Let me take a closer look, I forgot the details.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1550351186
Thanks for reviewing, @singhpk234 @RussellSpitzer @amogh-jahagirdar!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187802164
##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ * ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ * -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {
Review Comment:
My initial idea was to use a set but that degraded the performance for cases with 90% matches as the GC time became an issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1539000223
@singhpk234, it is still critical to ensure the target table is not broadcasted to execute the cardinality check. If we broadcast the target table, we will not be able to detect multiple matches from different source partitions. That is usually not a problem as the target is much bigger.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195535191
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
}
}
- var lastMatchedRowId: InternalRow = null
+ val rowIdAttrOrdinal = if (performCardinalityCheck) {
+ child.output.indexWhere(attr => conf.resolver(attr.name, ROW_ID))
+ } else {
+ -1
+ }
+ val matchedRowIds = new Roaring64Bitmap()
def processRowWithCardinalityCheck(inputRow: InternalRow): InternalRow = {
val isSourceRowPresent = isSourceRowPresentPred.eval(inputRow)
val isTargetRowPresent = isTargetRowPresentPred.eval(inputRow)
if (isSourceRowPresent && isTargetRowPresent) {
- val currentRowId = rowIdProj.apply(inputRow)
- if (currentRowId == lastMatchedRowId) {
+ val currentRowId = inputRow.getLong(rowIdAttrOrdinal)
+ if (matchedRowIds.contains(currentRowId)) {
Review Comment:
Should we have a sanity check that `rowIdAttrOrdinal` is not -1 at this point? Should not be since we only hit this case when we want to perform the cardinality check but just to make sure that there's a valid row ID at this point, and in case there's not a clear error message is surfaced.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] szehon-ho commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1550486478
Just a note for later pr : the new benchmark makes a ErrorProne warning:
```
> Task :iceberg-spark:iceberg-spark-extensions-3.4_2.12:compileJmhJava
/Users/szehon/repos/apache-iceberg/iceberg/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:224: warning: [AnnotateFormatMethod] This method passes a pair of parameters through to String.format, but the enclosing method wasn't annotated @FormatMethod. Doing so gives compile-time rather than run-time protection against malformed format strings.
private void sql(String query, Object... args) {
^
(see https://errorprone.info/bugpattern/AnnotateFormatMethod)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1548886089
I took another look. I think providing a hint would always trigger a shuffle hash join so that's not safe. I'd keep it as is for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195550649
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
}
}
- var lastMatchedRowId: InternalRow = null
+ val rowIdAttrOrdinal = if (performCardinalityCheck) {
Review Comment:
Actually maybe not ... Mainly i just want to remove the branch if we can. Maybe just make this an option?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195549503
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
}
}
- var lastMatchedRowId: InternalRow = null
+ val rowIdAttrOrdinal = if (performCardinalityCheck) {
Review Comment:
Can we move this into
`def processRowWithCadinalityCheck`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187836896
##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ * ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ * -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {
Review Comment:
```
Benchmark Mode Cnt Score Error Units
[NEW] copyOnWriteMergeCardinalityCheck10PercentUpdates ss 5 14.008 ± 1.114 s/op
[OLD] copyOnWriteMergeCardinalityCheck10PercentUpdates ss 5 16.711 ± 0.288 s/op
[NEW] copyOnWriteMergeCardinalityCheck30PercentUpdates ss 5 14.293 ± 2.359 s/op
[OLD] copyOnWriteMergeCardinalityCheck30PercentUpdates ss 5 17.242 ± 2.855 s/op
[NEW] copyOnWriteMergeCardinalityCheck90PercentUpdates ss 5 14.536 ± 1.344 s/op
[OLD] copyOnWriteMergeCardinalityCheck90PercentUpdates ss 5 18.966 ± 6.966 s/op
[NEW] mergeOnReadMergeCardinalityCheck10PercentUpdates ss 5 9.585 ± 0.467 s/op
[OLD] mergeOnReadMergeCardinalityCheck10PercentUpdates ss 5 10.493 ± 5.130 s/op
[NEW] mergeOnReadMergeCardinalityCheck30PercentUpdates ss 5 14.910 ± 0.264 s/op
[OLD] mergeOnReadMergeCardinalityCheck30PercentUpdates ss 5 15.171 ± 0.275 s/op
[NEW] mergeOnReadMergeCardinalityCheck90PercentUpdates ss 5 30.746 ± 0.237 s/op
[OLD] mergeOnReadMergeCardinalityCheck90PercentUpdates ss 5 32.714 ± 0.544 s/op
```
Here is a shorter representation. There is a small reduction in terms of memory too. Like said above, this run still does a local sort and trigger a spill, we just reduce the amount of spilled data. If we skip the final sort, we can actually avoid the local sort altogether, which was not possible before. A slower disk would also increase the difference.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187803191
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -257,8 +259,17 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand with Predicat
val (targetCond, joinCond) = splitMergeCond(cond, readRelation)
+ val performCardinalityCheck = isCardinalityCheckNeeded(matchedActions)
+
// project an extra column to check if a target row exists after the join
- val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, ROW_FROM_TARGET)()
+ // if needed, project a synthetic row ID to perform the cardinality check
+ val rowFromTarget = Alias(TrueLiteral, ROW_FROM_TARGET)()
Review Comment:
I initially added a method for this logic but it made the main method harder to read by hiding the actual projection expressions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1187844125
##########
spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/MergeCardinalityCheckBenchmark.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.current_date;
+import static org.apache.spark.sql.functions.date_add;
+import static org.apache.spark.sql.functions.expr;
+
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * A benchmark that evaluates the performance of the cardinality check in MERGE operations.
+ *
+ * <p>To run this benchmark for spark-3.3: <code>
+ * ./gradlew -DsparkVersions=3.4 :iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ * -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.SingleShotTime)
+public class MergeCardinalityCheckBenchmark {
Review Comment:
Also, MoR is slower than CoW for 90% of updates as we are doing twice the work marking almost all records as deleted. I just tested that to provide there is no memory or performance degradation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195611995
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
}
}
- var lastMatchedRowId: InternalRow = null
+ val rowIdAttrOrdinal = if (performCardinalityCheck) {
+ child.output.indexWhere(attr => conf.resolver(attr.name, ROW_ID))
+ } else {
+ -1
+ }
+ val matchedRowIds = new Roaring64Bitmap()
def processRowWithCardinalityCheck(inputRow: InternalRow): InternalRow = {
val isSourceRowPresent = isSourceRowPresentPred.eval(inputRow)
val isTargetRowPresent = isTargetRowPresentPred.eval(inputRow)
if (isSourceRowPresent && isTargetRowPresent) {
- val currentRowId = rowIdProj.apply(inputRow)
- if (currentRowId == lastMatchedRowId) {
+ val currentRowId = inputRow.getLong(rowIdAttrOrdinal)
+ if (matchedRowIds.contains(currentRowId)) {
Review Comment:
I can add validation, let me see.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195611262
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
}
}
- var lastMatchedRowId: InternalRow = null
+ val rowIdAttrOrdinal = if (performCardinalityCheck) {
Review Comment:
Unlike `processRowWithCadinalityCheck`, this is not called per every row. It is called once per task so having a branch should not impact the performance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org
[GitHub] [iceberg] singhpk234 commented on pull request #7558: Spark 3.4: Avoid local sort for MERGE cardinality check
Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#issuecomment-1539032218
Agree, we definitely require shuffling, looks like NoStatsUnary node handling was only to avoid BNLJ and not just for BHJ, for avoiding BHJ we still need NO_BROADCAST_HASH hint as some one can set the bhj threshold > Long.MaxValue is this understanding correct ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org