You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by yu...@apache.org on 2022/10/27 17:15:50 UTC

[iceberg] branch master updated: Spark 3.1: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly (#6046)

This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 342d10bb1a Spark 3.1: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly (#6046)
342d10bb1a is described below

commit 342d10bb1a58cf491e660d54fbde4f75b246c922
Author: Wing Yew Poon <wy...@apache.org>
AuthorDate: Thu Oct 27 10:15:42 2022 -0700

    Spark 3.1: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly (#6046)
---
 .../iceberg/spark/source/BatchDataReader.java      |  7 +-
 .../spark/source/TestSparkReaderDeletes.java       | 93 +++++++++++++++++++++-
 2 files changed, 97 insertions(+), 3 deletions(-)

diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 35d0a9cbac..68e98ba913 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -77,14 +77,17 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
     if (task.file().format() == FileFormat.PARQUET) {
       SparkDeleteFilter deleteFilter = deleteFilter(task);
 
+      // get required schema if there are deletes
+      Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema;
+
       Parquet.ReadBuilder builder =
           Parquet.read(location)
-              .project(expectedSchema)
+              .project(requiredSchema)
               .split(task.start(), task.length())
               .createBatchedReaderFunc(
                   fileSchema ->
                       VectorizedSparkParquetReaders.buildReader(
-                          expectedSchema,
+                          requiredSchema,
                           fileSchema, /* setArrowValidityVector */
                           NullCheckingForGet.NULL_CHECKING_ENABLED,
                           idToConstant,
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 575555d745..bbc4171b42 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -20,12 +20,17 @@ package org.apache.iceberg.spark.source;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.PartitionSpec;
@@ -45,19 +50,28 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.TestHiveMetastore;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkStructLike;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.CharSequenceSet;
 import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeSet;
 import org.apache.iceberg.util.TableScanUtil;
+import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -117,16 +131,26 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
     spark = null;
   }
 
+  @After
+  @Override
+  public void cleanup() throws IOException {
+    super.cleanup();
+    dropTable("test3");
+  }
+
   @Override
   protected Table createTable(String name, Schema schema, PartitionSpec spec) {
     Table table = catalog.createTable(TableIdentifier.of("default", name), schema);
     TableOperations ops = ((BaseTable) table).operations();
     TableMetadata meta = ops.current();
     ops.commit(meta, meta.upgradeToFormatVersion(2));
+    table
+        .updateProperties()
+        .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized))
+        .commit();
     if (vectorized) {
       table
           .updateProperties()
-          .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
           .set(
               TableProperties.PARQUET_BATCH_SIZE,
               "4") // split 7 records to two batches to cover more code paths
@@ -293,4 +317,71 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
 
     Assert.assertEquals("Table should contain expected rows", expected, actual);
   }
+
+  @Test
+  public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException {
+    String tblName = "test3";
+    Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned());
+
+    List<Path> fileSplits = Lists.newArrayList();
+    StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    Configuration conf = new Configuration();
+    File testFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", testFile.delete());
+    Path testFilePath = new Path(testFile.getAbsolutePath());
+
+    // Write a Parquet file with more than one row group
+    ParquetFileWriter parquetFileWriter =
+        new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA, "test3Schema"), testFilePath);
+    parquetFileWriter.start();
+    for (int i = 0; i < 2; i += 1) {
+      File split = temp.newFile();
+      Assert.assertTrue("Delete should succeed", split.delete());
+      Path splitPath = new Path(split.getAbsolutePath());
+      fileSplits.add(splitPath);
+      try (FileAppender<InternalRow> writer =
+          Parquet.write(Files.localOutput(split))
+              .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(sparkSchema, msgType))
+              .schema(SCHEMA)
+              .overwrite()
+              .build()) {
+        Iterable<InternalRow> records = RandomData.generateSpark(SCHEMA, 100, 34 * i + 37);
+        writer.addAll(records);
+      }
+      parquetFileWriter.appendFile(
+          org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(splitPath, conf));
+    }
+    parquetFileWriter.end(
+        ParquetFileWriter.mergeMetadataFiles(fileSplits, conf)
+            .getFileMetaData()
+            .getKeyValueMetaData());
+
+    // Add the file to the table
+    DataFile dataFile =
+        DataFiles.builder(PartitionSpec.unpartitioned())
+            .withInputFile(org.apache.iceberg.hadoop.HadoopInputFile.fromPath(testFilePath, conf))
+            .withFormat("parquet")
+            .withRecordCount(200)
+            .build();
+    tbl.newAppend().appendFile(dataFile).commit();
+
+    // Add positional deletes to the table
+    List<Pair<CharSequence, Long>> deletes =
+        Lists.newArrayList(
+            Pair.of(dataFile.path(), 97L),
+            Pair.of(dataFile.path(), 98L),
+            Pair.of(dataFile.path(), 99L),
+            Pair.of(dataFile.path(), 101L),
+            Pair.of(dataFile.path(), 103L),
+            Pair.of(dataFile.path(), 107L),
+            Pair.of(dataFile.path(), 109L));
+    Pair<DeleteFile, CharSequenceSet> posDeletes =
+        FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), deletes);
+    tbl.newRowDelta()
+        .addDeletes(posDeletes.first())
+        .validateDataFilesExist(posDeletes.second())
+        .commit();
+
+    Assert.assertEquals(193, rowSet(tblName, tbl, "*").size());
+  }
 }