You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/07/20 00:40:00 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5248: Spark: Adopt the new Scan Task APIs in Spark Readers

aokolnychyi commented on code in PR #5248:
URL: https://github.com/apache/iceberg/pull/5248#discussion_r925062229


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java:
##########
@@ -60,35 +64,54 @@
  *
  * @param <T> is the Java class returned by this reader whose objects contain one or more rows.
  */
-abstract class BaseDataReader<T> implements Closeable {
+abstract class BaseDataReader<T, CST extends ContentScanTask<?>, G extends ScanTaskGroup<CST>>

Review Comment:
   To be honest, I am not convinced we have to restrict this to `ContentScanTask`. Also, keep in mind that different changelog tasks can be packed into the same task group. That's why the reader should not be restricted to strictly one task type.
   
   I'd consider using `ScanTask` and changing the hierarchy a bit. This can can become `BaseReader`. I also don't think we need the second type parameter. We can just work with `ScanTaskGroup<ST>`.
   
   ```
   abstract class BaseReader<T, ST extends ScanTask> implements Closeable {
   
     ...
   
     BaseReader(Table table, ScanTaskGroup<ST> taskGroup) {
       this.table = table;
       this.tasks = taskGroup.tasks().iterator();
       this.inputFiles = inputFiles(taskGroup);
       this.currentIterator = CloseableIterator.empty();
     }
   
     ...
   }
   ```
   
   Then I'd consider adding `BaseRowReader` like this (also no data in the name).
   
   ```
   abstract class BaseRowReader<ST extends ScanTask> extends BaseReader<InternalRow, ST> {
   
     private final Schema tableSchema;
     private final Schema expectedSchema;
     private final String nameMapping;
     private final boolean caseSensitive;
   
     BaseRowReader(Table table, ScanTaskGroup<ST> taskGroup, Schema expectedSchema, boolean caseSensitive) {
       super(table, taskGroup);
       this.tableSchema = table.schema();
       this.expectedSchema = expectedSchema;
       this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
       this.caseSensitive = caseSensitive;
     }
   
     protected Schema tableSchema() {
       return tableSchema;
     }
   
     protected Schema expectedSchema() {
       return expectedSchema;
     }
   
     protected CloseableIterable<InternalRow> newIterable(InputFile file, FileFormat format, long start, long length,
                                                          Expression residual, Schema projection,
                                                          Map<Integer, ?> idToConstant) {
       switch (format) {
         case PARQUET:
           return newParquetIterable(file, start, length, residual, projection, idToConstant);
   
         case AVRO:
           return newAvroIterable(file, start, length, projection, idToConstant);
   
         case ORC:
           return newOrcIterable(file, start, length, residual, projection, idToConstant);
   
         default:
           throw new UnsupportedOperationException("Cannot read unknown format: " + format);
       }
     }
   
     ...
   }
   ```
   
   Then the existing `RowDataReader` won't need to change a lot.
   
   ```
   class RowDataReader extends BaseRowReader<FileScanTask> {
     ...
   }
   ```
   
   Finally, we will have `ChangelogRowReader` capable of reading all types of changelog tasks like this.
   
   ```
   class ChangelogRowReader extends BaseRowReader<ChangelogScanTask> {
     ...
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.AddedRowsScanTask;
+import org.apache.iceberg.BaseScanTaskGroup;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeletedDataFileScanTask;
+import org.apache.iceberg.DeletedRowsScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class ChangelogRowReader {
+
+  class ChangelogAddedRowReader extends RowDataReader<AddedRowsScanTask, BaseScanTaskGroup<AddedRowsScanTask>> {

Review Comment:
   We can't have a reader per changelog scan task as a single task group can contain multiple task types.



##########
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:
##########
@@ -39,12 +40,34 @@ private PartitionUtil() {
     return constantsMap(task, null, (type, constant) -> constant);
   }
 
+  public static Map<Integer, ?> constantsMap(ContentScanTask task, BiFunction<Type, Object, Object> convertConstant) {

Review Comment:
   nit: raw use of parameterized class, `ContentScanTask` -> `ContentScanTask<?>`



##########
core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:
##########
@@ -39,12 +40,34 @@ private PartitionUtil() {
     return constantsMap(task, null, (type, constant) -> constant);
   }
 
+  public static Map<Integer, ?> constantsMap(ContentScanTask task, BiFunction<Type, Object, Object> convertConstant) {
+    return constantsMapInternal(task, null, convertConstant);
+  }
+
+  /**
+   * @deprecated Replaced by {@link PartitionUtil#constantsMap(ContentScanTask, BiFunction)}
+   */
+  @Deprecated
   public static Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> convertConstant) {
-    return constantsMap(task, null, convertConstant);
+    return constantsMapInternal(task, null, convertConstant);
   }
 
+  /**
+   * @deprecated Replaced by {@link PartitionUtil#constantsMap(ContentScanTask, Types.StructType, BiFunction)}
+   */
+  @Deprecated
   public static Map<Integer, ?> constantsMap(FileScanTask task, Types.StructType partitionType,
                                              BiFunction<Type, Object, Object> convertConstant) {
+    return constantsMapInternal(task, partitionType, convertConstant);
+  }
+
+  public static Map<Integer, ?> constantsMap(ContentScanTask task, Types.StructType partitionType,
+                                             BiFunction<Type, Object, Object> convertConstant) {
+    return constantsMapInternal(task, partitionType, convertConstant);
+  }
+
+  private static Map<Integer, ?> constantsMapInternal(ContentScanTask task, Types.StructType partitionType,

Review Comment:
   Do we really have to add new methods and deprecate old? Can't we just accept `ContentScanTask`? That should not break existing usages as `FileScanTask` extends `ContentScanTask`.
   
   I think it should be sufficient to just change the arg type as we are replacing it with a parent type.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java:
##########
@@ -146,7 +169,7 @@ protected InputFile getInputFile(String location) {
     return inputFiles.get(location);
   }
 
-  protected Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema) {
+  protected Map<Integer, ?> constantsMap(CST task, Schema readSchema) {

Review Comment:
   If we change the boundary to `ScanTask`, then this method can accept `ContentScanTask<?>`.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java:
##########
@@ -60,35 +64,54 @@
  *
  * @param <T> is the Java class returned by this reader whose objects contain one or more rows.
  */
-abstract class BaseDataReader<T> implements Closeable {
+abstract class BaseDataReader<T, CST extends ContentScanTask<?>, G extends ScanTaskGroup<CST>>
+    implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(BaseDataReader.class);
 
   private final Table table;
-  private final Iterator<FileScanTask> tasks;
+  private final Iterator<CST> tasks;
   private final Map<String, InputFile> inputFiles;
 
   private CloseableIterator<T> currentIterator;
   private T current = null;
-  private FileScanTask currentTask = null;
+  private CST currentTask = null;
 
-  BaseDataReader(Table table, CombinedScanTask task) {
+  BaseDataReader(Table table, G task) {
     this.table = table;
-    this.tasks = task.files().iterator();
+    this.tasks = task.tasks().iterator();
+    this.inputFiles = inputFiles(task);
+    this.currentIterator = CloseableIterator.empty();
+  }
+
+  private Map<String, InputFile> inputFiles(G task) {
     Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
-    task.files().stream()
-        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+    Stream<ContentFile> dataFileStream = task.tasks().stream()
+        .flatMap(contentScanTask -> {
+          Stream<ContentFile> stream = Stream.of(contentScanTask.file());
+          if (contentScanTask.isFileScanTask()) {
+            stream = Stream.concat(stream, contentScanTask.asFileScanTask().deletes().stream());
+          } else if (contentScanTask instanceof AddedRowsScanTask) {
+            stream = Stream.concat(stream, ((AddedRowsScanTask) contentScanTask).deletes().stream());
+          } else if (contentScanTask instanceof DeletedDataFileScanTask) {
+            stream = Stream.concat(stream, ((DeletedDataFileScanTask) contentScanTask).existingDeletes().stream());
+          } else if (contentScanTask instanceof DeletedRowsScanTask) {
+            stream = Stream.concat(stream, ((DeletedRowsScanTask) contentScanTask).addedDeletes().stream());
+            stream = Stream.concat(stream, ((DeletedRowsScanTask) contentScanTask).existingDeletes().stream());
+          }

Review Comment:
   Yeah, this is not good. I'd be up exposing something like `referencedDataFiles` or `dataFiles` with `referencedDeleteFiles` or `deleteFiles` to avoid `?` in public API.
   
   Maybe, we can even even add those methods to `ScanTask`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org