You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/12/18 01:17:02 UTC

[GitHub] [iceberg] rdblue opened a new pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

rdblue opened a new pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955


   This updates Spark's `DELETE FROM` command to sort the retained rows by original file and position to ensure that the original data clustering is preserved by the command.
   
   Because Spark does not yet support metadata columns, this exposes `_file` and `_pos` by adding them automatically to all merge scans. Projecting both columns was mostly supported, with only minor changes needed to project `_file` using the constants map supported by Avro, Parquet, and ORC.
   
   This also required refactoring `DynamicFileFilter`. When projecting `_file` and `_pos` but only using file, the optimizer would throw an exception that the node could not be copied because the optimizer was attempting to rewrite the node with a projection to remove the unused `_pos_`. The fix is to update `DynamicFileFilter` so that the `SupportsFileFilter` is passed separately. Then the scan can be passed as a logical plan that can be rewritten by the planner. This also required updating conversion to physical plan because the scan plan may be more complicated than a single scan node. This ensures that the scan is converted to an extended scan by using a new logical plan wrapper so that `planLater` can be used in conversion like normal.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546184999



##########
File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -381,9 +382,13 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
         for (int i = 0; i < numValsToRead; i += 1) {
           vec.getDataBuffer().setLong(i * Long.BYTES, rowStart + i);
         }
+        for (int i = 0; i < numValsToRead; i += 1) {
+          BitVectorHelper.setValidityBitToOne(vec.getValidityBuffer(), i);
+        }
         nulls = new NullabilityHolder(numValsToRead);
       }
 
+      rowStart += numValsToRead;

Review comment:
       Great catch! Let me update the unit test as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545510376



##########
File path: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
##########
@@ -39,13 +40,17 @@ private PartitionUtil() {
   }
 
   public static Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> convertConstant) {
-    return constantsMap(task.spec(), task.file().partition(), convertConstant);
-  }
+    PartitionSpec spec = task.spec();
+    StructLike partitionData = task.file().partition();
 
-  private static Map<Integer, ?> constantsMap(PartitionSpec spec, StructLike partitionData,
-                                              BiFunction<Type, Object, Object> convertConstant) {
     // use java.util.HashMap because partition data may contain null values
     Map<Integer, Object> idToConstant = new HashMap<>();
+
+    // add _file
+    idToConstant.put(
+        MetadataColumns.FILE_PATH.fieldId(),
+        convertConstant.apply(Types.StringType.get(), task.file().path()));

Review comment:
       This adds `_file` to the constants map so it is set in records like a partition value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546158023



##########
File path: spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
##########
@@ -68,18 +68,7 @@
     // update the current file for Spark's filename() function
     InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
 
-    // schema or rows returned by readers
-    PartitionSpec spec = task.spec();
-    Set<Integer> idColumns = spec.identitySourceIds();
-    Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
-    boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
-
-    Map<Integer, ?> idToConstant;
-    if (projectsIdentityPartitionColumns) {
-      idToConstant = PartitionUtil.constantsMap(task, BatchDataReader::convertConstant);
-    } else {
-      idToConstant = ImmutableMap.of();
-    }
+    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, BatchDataReader::convertConstant);

Review comment:
       It isn't necessary to check whether there are projected ID columns. The code is shorter if the values are available by default, even if they aren't used. This fixes the problem where there are constants to add (like `_file`) but no identity partition values are projected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] chenjunjiedada commented on pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
chenjunjiedada commented on pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#issuecomment-748106333


   You may want this PR: https://github.com/apache/iceberg/pull/1356.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545702382



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -77,33 +101,30 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
     PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
 
     val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, output)
+    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
 
     val scanPlan = scan match {
-      case _: SupportsFileFilter =>
+      case filterable: SupportsFileFilter =>
         val matchingFilePlan = buildFileFilterPlan(cond, scanRelation)
-        val dynamicFileFilter = DynamicFileFilter(scanRelation, matchingFilePlan)
+        val dynamicFileFilter = DynamicFileFilter(ExtendedScanRelation(scanRelation), matchingFilePlan, filterable)
         dynamicFileFilter
       case _ =>
         scanRelation
     }
 
-    // include file name so that we can group data back
-    val fileNameExpr = Alias(InputFileName(), FILE_NAME_COL)()
-    Project(scanPlan.output :+ fileNameExpr, scanPlan)
+    scanPlan
   }
 
   private def buildWritePlan(
       remainingRowsPlan: LogicalPlan,
       output: Seq[AttributeReference]): LogicalPlan = {
 
-    // TODO: sort by _pos to keep the original ordering of rows
-    // TODO: consider setting a file size limit
-
     val fileNameCol = findOutputAttr(remainingRowsPlan, FILE_NAME_COL)
+    val rowPosCol = findOutputAttr(remainingRowsPlan, ROW_POS_COL)
+    val order = Seq(SortOrder(fileNameCol, Ascending), SortOrder(rowPosCol, Ascending))

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545510234



##########
File path: core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
##########
@@ -96,7 +96,7 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
 
       } else {
         Preconditions.checkArgument(
-            field.isOptional() || field.fieldId() == MetadataColumns.ROW_POSITION.fieldId(),
+            field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()),

Review comment:
       Needed to allow projecting `_file` even though it isn't in the data file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546033284



##########
File path: core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
##########
@@ -96,7 +96,7 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
 
       } else {
         Preconditions.checkArgument(
-            field.isOptional() || field.fieldId() == MetadataColumns.ROW_POSITION.fieldId(),
+            field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()),

Review comment:
       This reminds me we need to fix that projection bug / selection bug




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545985486



##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -55,4 +55,16 @@ private MetadataColumns() {
   public static Set<Integer> metadataFieldIds() {
     return META_IDS;
   }
+
+  public static NestedField get(String name) {
+    return META_COLUMNS.get(name);
+  }
+
+  public static boolean isMetadataColumn(String name) {
+    return META_COLUMNS.containsKey(name);
+  }
+
+  public static boolean nonMetadataColumn(String name) {

Review comment:
       Yeah, we could do that. I was following the slightly shorter Scala convention, where there are methods like `nonEmpty`. I prefer this way, but if others agree I'm happy to change it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#issuecomment-748383960


   Rebased and fixed a couple of bugs with `_file` and `_pos` in vectorized Parquet reads.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546870175



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ExtendedScanRelation.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+case class ExtendedScanRelation(relation: DataSourceV2ScanRelation) extends LogicalPlan {

Review comment:
       Scan nodes shouldn't be reused in general, but it is usually okay if there is not a join. The problem is reused attribute IDs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545511060



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -77,33 +101,30 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
     PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
 
     val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, output)
+    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))

Review comment:
       Spark's contract is that the scan's schema is the one that should be used, not the original table schema. This allows the merge scan to return the extra `_file` and `_pos` columns and matches the behavior of normal scans that are configured with `PushDownUtils.pruneColumns`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546157599



##########
File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -381,9 +382,13 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
         for (int i = 0; i < numValsToRead; i += 1) {
           vec.getDataBuffer().setLong(i * Long.BYTES, rowStart + i);
         }
+        for (int i = 0; i < numValsToRead; i += 1) {
+          BitVectorHelper.setValidityBitToOne(vec.getValidityBuffer(), i);
+        }

Review comment:
       This is needed for cases where Arrow checks the validity buffer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#issuecomment-748251420


   Merged #1356. Thanks for fixing that, @chenjunjiedada! Sorry I didn't get back to review that before now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545709695



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
##########
@@ -131,28 +138,52 @@ public SparkScanBuilder caseSensitive(boolean isCaseSensitive) {
 
   @Override
   public void pruneColumns(StructType requestedSchema) {
-    this.requestedProjection = requestedSchema;
+    this.requestedProjection = new StructType(Stream.of(requestedSchema.fields())
+        .filter(field -> MetadataColumns.nonMetadataColumn(field.name()))
+        .toArray(StructField[]::new));
+
+    Stream.of(requestedSchema.fields())

Review comment:
       I see we call distinct in `schemaWithMetadataColumns`, never mind.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546838823



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
##########
@@ -48,32 +48,32 @@ public SparkRowLevelOperationsTestBase(String catalogName, String implementation
   @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, format = {3}, vectorized = {4}")
   public static Object[][] parameters() {
     return new Object[][] {
-        { "testhive", SparkCatalog.class.getName(),
-            ImmutableMap.of(
-                "type", "hive",
-                "default-namespace", "default"
-            ),
-            "orc",
-            true
-        },
+//        { "testhive", SparkCatalog.class.getName(),
+//            ImmutableMap.of(
+//                "type", "hive",
+//                "default-namespace", "default"

Review comment:
       Yes, will remove. Sorry about that, I usually look through the PR to catch these before review!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545694599



##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -55,4 +55,16 @@ private MetadataColumns() {
   public static Set<Integer> metadataFieldIds() {
     return META_IDS;
   }
+
+  public static NestedField get(String name) {
+    return META_COLUMNS.get(name);
+  }
+
+  public static boolean isMetadataColumn(String name) {
+    return META_COLUMNS.containsKey(name);
+  }
+
+  public static boolean nonMetadataColumn(String name) {

Review comment:
       nit: should it be `isNonMetadataColumn` to indicate it is a boolean flag?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546633194



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ExtendedScanRelation.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+case class ExtendedScanRelation(relation: DataSourceV2ScanRelation) extends LogicalPlan {

Review comment:
       I get the intention here now. I did a quick look and see that Spark injects the extended planner strategies before the data source V2 strategy. That got me thinking that we can get rid of this extra logical plan and just use the scan node directly and apply the new logic only if we have a scan that implements `SupportsFileFilter`.
   
   ```
   case PhysicalOperation(project, filters, DataSourceV2ScanRelation(_, scan: SupportsFileFilter, output)) =>
     // projection and filters were already pushed down in the optimizer.
     // this uses PhysicalOperation to get the projection and ensure that if the batch scan does
     // not support columnar, a projection is added to convert the rows to UnsafeRow.
     val batchExec = ExtendedBatchScanExec(output, scan)
     withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil
   
     ....
   
   private def withProjectAndFilter(
       project: Seq[NamedExpression],
       filters: Seq[Expression],
       scan: LeafExecNode,
       needsUnsafeConversion: Boolean): SparkPlan = {
     val filterCondition = filters.reduceLeftOption(And)
     val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
   
     if (withFilter.output != project || needsUnsafeConversion) {
       ProjectExec(project, withFilter)
     } else {
       withFilter
     }
   }
   ```
   
   This also takes care of adding a projection if needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#issuecomment-748067794


   I confirm the tests for non-vectorized Parquet path are working fine. Seems like we don't populate the position in the vectorized path.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546891704



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ExtendedScanRelation.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+case class ExtendedScanRelation(relation: DataSourceV2ScanRelation) extends LogicalPlan {

Review comment:
       +1 to using this rule that gets rid of the extra node.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546611648



##########
File path: core/src/main/java/org/apache/iceberg/MetadataColumns.java
##########
@@ -55,4 +55,16 @@ private MetadataColumns() {
   public static Set<Integer> metadataFieldIds() {
     return META_IDS;
   }
+
+  public static NestedField get(String name) {
+    return META_COLUMNS.get(name);
+  }
+
+  public static boolean isMetadataColumn(String name) {
+    return META_COLUMNS.containsKey(name);
+  }
+
+  public static boolean nonMetadataColumn(String name) {

Review comment:
       Let's leave it, I don't mind as long as it wasn't an oversight and there is an idea/convention being followed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545709279



##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
##########
@@ -131,28 +138,52 @@ public SparkScanBuilder caseSensitive(boolean isCaseSensitive) {
 
   @Override
   public void pruneColumns(StructType requestedSchema) {
-    this.requestedProjection = requestedSchema;
+    this.requestedProjection = new StructType(Stream.of(requestedSchema.fields())
+        .filter(field -> MetadataColumns.nonMetadataColumn(field.name()))
+        .toArray(StructField[]::new));
+
+    Stream.of(requestedSchema.fields())

Review comment:
       Are there any cases when `pruneColumns` is going to be called multiple times? Should we worry about it at all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#issuecomment-748067794


   I confirm tests for the non-vectorized Parquet path are working fine. Seems like we don't populate the position in the vectorized path.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#issuecomment-749214871


   Thanks @rdblue and everyone who reviewed. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545986815



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ExtendedScanRelation.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+case class ExtendedScanRelation(relation: DataSourceV2ScanRelation) extends LogicalPlan {

Review comment:
       I think the more general problem is that Spark couldn't rewrite `DynamicFileFilter` node at all, which led to some situations where the plan worked fine and some where it would fail. I think we should generally fit the pattern of being able to rewrite nodes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546635446



##########
File path: spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
##########
@@ -48,32 +48,32 @@ public SparkRowLevelOperationsTestBase(String catalogName, String implementation
   @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, format = {3}, vectorized = {4}")
   public static Object[][] parameters() {
     return new Object[][] {
-        { "testhive", SparkCatalog.class.getName(),
-            ImmutableMap.of(
-                "type", "hive",
-                "default-namespace", "default"
-            ),
-            "orc",
-            true
-        },
+//        { "testhive", SparkCatalog.class.getName(),
+//            ImmutableMap.of(
+//                "type", "hive",
+//                "default-namespace", "default"

Review comment:
       Used for local testing?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546634262



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ExtendedScanRelation.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+case class ExtendedScanRelation(relation: DataSourceV2ScanRelation) extends LogicalPlan {

Review comment:
       I that's close how a native Spark solution would work.

##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ExtendedScanRelation.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+case class ExtendedScanRelation(relation: DataSourceV2ScanRelation) extends LogicalPlan {

Review comment:
       I think that's close how a native Spark solution would work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545810195



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ExtendedScanRelation.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+case class ExtendedScanRelation(relation: DataSourceV2ScanRelation) extends LogicalPlan {

Review comment:
       I am a bit worried about adding this class as I am not sure we want to maintain it in Spark later. There is another idea how to solve the rewrite rule: we can simply disable column pruning for `DynamicFileFilter` nodes. I think it should be sufficient to extend the node references to also cover all output attributes of the scan.
   
   ```
   AttributeSet(scanRelation.output ++ fileFilterPlan.output)
   ```
   
   I've submitted a PR with this idea to your branch, @rdblue. Feel free to discard/modify as needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546633917



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -77,33 +101,30 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
     PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
 
     val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, output)
+    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
 
     val scanPlan = scan match {

Review comment:
       I think this block can be simplified a bit.
   
   ```
       scan match {
         case filterable: SupportsFileFilter =>
           val matchingFilePlan = buildFileFilterPlan(cond, scanRelation)
           DynamicFileFilter(scanRelation, matchingFilePlan, filterable)
         case _ =>
           scanRelation
       }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#issuecomment-747960726


   It looks like vectorized Parquet is failing, @rdblue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r545810195



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ExtendedScanRelation.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+case class ExtendedScanRelation(relation: DataSourceV2ScanRelation) extends LogicalPlan {

Review comment:
       I am a bit worried about adding this class as I am not sure we want to maintain it in Spark later. There is another idea how to solve the rewrite rule: we can simply disable column pruning for `DynamicFileFilter` nodes. I think it should be sufficient to extend the node references to also cover all output attributes of the scan.
   
   ```
   AttributeSet(scanRelation.output ++ fileFilterPlan.output)
   ```
   
   I've submitted a [PR](https://github.com/rdblue/iceberg/pull/2) with this idea to your branch, @rdblue. Feel free to discard/modify as needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546853370



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDelete.scala
##########
@@ -77,33 +101,30 @@ object RewriteDelete extends Rule[LogicalPlan] with PredicateHelper with Logging
     PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
 
     val scan = scanBuilder.build()
-    val scanRelation = DataSourceV2ScanRelation(table, scan, output)
+    val scanRelation = DataSourceV2ScanRelation(table, scan, toOutputAttrs(scan.readSchema(), output))
 
     val scanPlan = scan match {

Review comment:
       Good catch. Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#issuecomment-748067794


   I confirm tests for the non-vectorized Parquet path are working fine. Seems like we don't populate the position in the vectorized path only.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546633194



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ExtendedScanRelation.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+case class ExtendedScanRelation(relation: DataSourceV2ScanRelation) extends LogicalPlan {

Review comment:
       I get the intention here now. I did a quick look and see that Spark injects the extended planner strategies before the data source V2 strategy. That got me thinking that we can get rid of this extra logical plan and just use the scan node directly and apply the new logic only if we have a scan that supports file filtering.
   
   ```
   case PhysicalOperation(project, filters, DataSourceV2ScanRelation(_, scan: SupportsFileFilter, output)) =>
     // projection and filters were already pushed down in the optimizer.
     // this uses PhysicalOperation to get the projection and ensure that if the batch scan does
     // not support columnar, a projection is added to convert the rows to UnsafeRow.
     val batchExec = ExtendedBatchScanExec(output, scan)
     withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil
   
     ....
   
   private def withProjectAndFilter(
       project: Seq[NamedExpression],
       filters: Seq[Expression],
       scan: LeafExecNode,
       needsUnsafeConversion: Boolean): SparkPlan = {
     val filterCondition = filters.reduceLeftOption(And)
     val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
   
     if (withFilter.output != project || needsUnsafeConversion) {
       ProjectExec(project, withFilter)
     } else {
       withFilter
     }
   }
   ```
   
   This also takes care of adding a projection if needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546634831



##########
File path: spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ExtendedScanRelation.scala
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
+
+case class ExtendedScanRelation(relation: DataSourceV2ScanRelation) extends LogicalPlan {

Review comment:
       There are no issues we use the same scan node in multiple places given the new logic and allowing rewrites, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#issuecomment-748907607


   I cloned this PR locally and tests seem to work. Thanks, @chenjunjiedada and @rdblue.
   
   I left a few questions but the rest looks great!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1955: Spark: Sort retained rows in DELETE FROM by file and position

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1955:
URL: https://github.com/apache/iceberg/pull/1955#discussion_r546157664



##########
File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
##########
@@ -381,9 +382,13 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
         for (int i = 0; i < numValsToRead; i += 1) {
           vec.getDataBuffer().setLong(i * Long.BYTES, rowStart + i);
         }
+        for (int i = 0; i < numValsToRead; i += 1) {
+          BitVectorHelper.setValidityBitToOne(vec.getValidityBuffer(), i);
+        }
         nulls = new NullabilityHolder(numValsToRead);
       }
 
+      rowStart += numValsToRead;

Review comment:
       Looks like this was an oversight in the original PR. FYI @chenjunjiedada.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org