You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/10/25 19:44:41 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request, #6055: Spark 3.3: Use separate scan during file filtering in copy-on-write operations

aokolnychyi opened a new pull request, #6055:
URL: https://github.com/apache/iceberg/pull/6055

   This PR enables using a separate scan during file filtering in row-level copy-on-write operations. This means the runtime filter subquery will now be able to push down filters into row groups as well as prune columns. This should make the initial filtering phase more efficient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6055: Spark 3.3: Use separate scan during file filtering in copy-on-write operations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6055:
URL: https://github.com/apache/iceberg/pull/6055#discussion_r1004901728


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -764,12 +765,18 @@ public synchronized void testDeleteWithSerializableIsolation() throws Interrupte
     Future<?> appendFuture =
         executorService.submit(
             () -> {
+              SparkSession sparkSession = spark.cloneSession();
               for (int numOperations = 0; numOperations < Integer.MAX_VALUE; numOperations++) {
                 while (barrier.get() < numOperations * 2) {
                   sleep(10);
                 }
 
                 try {
+                  Dataset<Row> inputDF =

Review Comment:
   I was running into some weird thread-local issues without cloning the session in the thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #6055: Spark 3.3: Use separate scan during file filtering in copy-on-write operations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on PR #6055:
URL: https://github.com/apache/iceberg/pull/6055#issuecomment-1291625026

   I will need to figure out what to do with the test. It doesn’t fail locally. Ignore the changes around the client pool size.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6055: Spark 3.3: Use separate scan during file filtering in copy-on-write operations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6055:
URL: https://github.com/apache/iceberg/pull/6055#discussion_r1005232712


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -807,6 +815,20 @@ public synchronized void testDeleteWithSnapshotIsolation()
         "ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')",
         tableName, DELETE_ISOLATION_LEVEL, "snapshot");
 
+    // pre-populate the table to force it to use the Spark writers instead of metadata-only delete

Review Comment:
   Copied from a similar test above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6055: Spark 3.3: Use separate scan during file filtering in copy-on-write operations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6055:
URL: https://github.com/apache/iceberg/pull/6055#discussion_r1004901122


##########
spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala:
##########
@@ -69,27 +73,34 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[Logic
       // use reference equality to find exactly the required scan relations
       val newRewritePlan = rewritePlan transformUp {
         case r: DataSourceV2ScanRelation if r.scan eq scan =>
-          val pruningKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](
-            ArraySeq.unsafeWrapArray(scan.filterAttributes), r)
-          val dynamicPruningCond = buildDynamicPruningCondition(r, command, pruningKeys)
-          val filter = Filter(dynamicPruningCond, r)
-          // always optimize dynamic filtering subqueries for row-level commands as it is important
-          // to rewrite introduced predicates as joins because Spark recently stopped optimizing
-          // dynamic subqueries to facilitate broadcast reuse
-          optimizeSubquery(filter)
+          // use the original table instance that was loaded for this row-level operation

Review Comment:
   I adapted this place to match what we did in Spark (including comments).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6055: Spark 3.3: Use separate scan during file filtering in copy-on-write operations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6055:
URL: https://github.com/apache/iceberg/pull/6055#discussion_r1004904674


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java:
##########
@@ -102,6 +103,14 @@ public NamedReference[] filterAttributes() {
 
   @Override
   public void filter(Filter[] filters) {
+    Preconditions.checkState(
+        Objects.equals(snapshotId(), currentSnapshotId()),
+        "Runtime file filtering is not possible: the table has been concurrently refreshed. "

Review Comment:
   I initially thought about not doing runtime filtering if the table has been refreshed to avoid failing the query. However, we change such a critical place. I think it is better to let the use know rather than proceeding with the query. Otherwise, the jobs that leverage multiple threads may take much more resources. If runtime filtering stops working, jobs will simply start taking tons of time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6055: Spark 3.3: Use separate scan during file filtering in copy-on-write operations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6055:
URL: https://github.com/apache/iceberg/pull/6055#discussion_r1004904674


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java:
##########
@@ -102,6 +103,14 @@ public NamedReference[] filterAttributes() {
 
   @Override
   public void filter(Filter[] filters) {
+    Preconditions.checkState(
+        Objects.equals(snapshotId(), currentSnapshotId()),
+        "Runtime file filtering is not possible: the table has been concurrently refreshed. "

Review Comment:
   I initially thought about not doing runtime filtering if the table has been refreshed to avoid failing the query. However, we change such a critical place. I think it is better to let the use know rather than proceeding with the query. Otherwise, the jobs that leverage multiple threads may take much more resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6055: Spark 3.3: Use separate scan during file filtering in copy-on-write operations

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on code in PR #6055:
URL: https://github.com/apache/iceberg/pull/6055#discussion_r1004904674


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java:
##########
@@ -102,6 +103,14 @@ public NamedReference[] filterAttributes() {
 
   @Override
   public void filter(Filter[] filters) {
+    Preconditions.checkState(
+        Objects.equals(snapshotId(), currentSnapshotId()),
+        "Runtime file filtering is not possible: the table has been concurrently refreshed. "

Review Comment:
   I initially thought about not doing runtime filtering if the table has been refreshed to avoid failing the query. However, we change such a critical place. I think it is better to let the user know rather than proceeding with the query. Otherwise, the jobs that leverage multiple threads may take much more resources. If runtime filtering stops working, jobs will simply start taking tons of time.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java:
##########
@@ -102,6 +103,14 @@ public NamedReference[] filterAttributes() {
 
   @Override
   public void filter(Filter[] filters) {
+    Preconditions.checkState(
+        Objects.equals(snapshotId(), currentSnapshotId()),
+        "Runtime file filtering is not possible: the table has been concurrently refreshed. "

Review Comment:
   I initially thought about not doing runtime filtering if the table has been refreshed to avoid failing the query. However, we change such a critical place. I think it is better to let the user know rather than proceeding with the query. Otherwise, jobs that leverage multiple threads may take much more resources. If runtime filtering stops working, jobs will simply start taking tons of time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi closed pull request #6055: Spark 3.3: Use separate scan during file filtering in copy-on-write operations

Posted by GitBox <gi...@apache.org>.
aokolnychyi closed pull request #6055: Spark 3.3: Use separate scan during file filtering in copy-on-write operations
URL: https://github.com/apache/iceberg/pull/6055


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org