You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by yu...@apache.org on 2022/10/27 17:15:50 UTC
[iceberg] branch master updated: Spark 3.1: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly (#6046)
This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 342d10bb1a Spark 3.1: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly (#6046)
342d10bb1a is described below
commit 342d10bb1a58cf491e660d54fbde4f75b246c922
Author: Wing Yew Poon <wy...@apache.org>
AuthorDate: Thu Oct 27 10:15:42 2022 -0700
Spark 3.1: Ensure rowStartPosInBatch in ColumnarBatchReader is set correctly (#6046)
---
.../iceberg/spark/source/BatchDataReader.java | 7 +-
.../spark/source/TestSparkReaderDeletes.java | 93 +++++++++++++++++++++-
2 files changed, 97 insertions(+), 3 deletions(-)
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 35d0a9cbac..68e98ba913 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -77,14 +77,17 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
if (task.file().format() == FileFormat.PARQUET) {
SparkDeleteFilter deleteFilter = deleteFilter(task);
+ // get required schema if there are deletes
+ Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema;
+
Parquet.ReadBuilder builder =
Parquet.read(location)
- .project(expectedSchema)
+ .project(requiredSchema)
.split(task.start(), task.length())
.createBatchedReaderFunc(
fileSchema ->
VectorizedSparkParquetReaders.buildReader(
- expectedSchema,
+ requiredSchema,
fileSchema, /* setArrowValidityVector */
NullCheckingForGet.NULL_CHECKING_ENABLED,
idToConstant,
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 575555d745..bbc4171b42 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -20,12 +20,17 @@ package org.apache.iceberg.spark.source;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+import java.io.File;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.PartitionSpec;
@@ -45,19 +50,28 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkStructLike;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.TableScanUtil;
+import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -117,16 +131,26 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
spark = null;
}
+ @After
+ @Override
+ public void cleanup() throws IOException {
+ super.cleanup();
+ dropTable("test3");
+ }
+
@Override
protected Table createTable(String name, Schema schema, PartitionSpec spec) {
Table table = catalog.createTable(TableIdentifier.of("default", name), schema);
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));
+ table
+ .updateProperties()
+ .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized))
+ .commit();
if (vectorized) {
table
.updateProperties()
- .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true")
.set(
TableProperties.PARQUET_BATCH_SIZE,
"4") // split 7 records to two batches to cover more code paths
@@ -293,4 +317,71 @@ public class TestSparkReaderDeletes extends DeleteReadTests {
Assert.assertEquals("Table should contain expected rows", expected, actual);
}
+
+ @Test
+ public void testPosDeletesOnParquetFileWithMultipleRowGroups() throws IOException {
+ String tblName = "test3";
+ Table tbl = createTable(tblName, SCHEMA, PartitionSpec.unpartitioned());
+
+ List<Path> fileSplits = Lists.newArrayList();
+ StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
+ Configuration conf = new Configuration();
+ File testFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", testFile.delete());
+ Path testFilePath = new Path(testFile.getAbsolutePath());
+
+ // Write a Parquet file with more than one row group
+ ParquetFileWriter parquetFileWriter =
+ new ParquetFileWriter(conf, ParquetSchemaUtil.convert(SCHEMA, "test3Schema"), testFilePath);
+ parquetFileWriter.start();
+ for (int i = 0; i < 2; i += 1) {
+ File split = temp.newFile();
+ Assert.assertTrue("Delete should succeed", split.delete());
+ Path splitPath = new Path(split.getAbsolutePath());
+ fileSplits.add(splitPath);
+ try (FileAppender<InternalRow> writer =
+ Parquet.write(Files.localOutput(split))
+ .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(sparkSchema, msgType))
+ .schema(SCHEMA)
+ .overwrite()
+ .build()) {
+ Iterable<InternalRow> records = RandomData.generateSpark(SCHEMA, 100, 34 * i + 37);
+ writer.addAll(records);
+ }
+ parquetFileWriter.appendFile(
+ org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(splitPath, conf));
+ }
+ parquetFileWriter.end(
+ ParquetFileWriter.mergeMetadataFiles(fileSplits, conf)
+ .getFileMetaData()
+ .getKeyValueMetaData());
+
+ // Add the file to the table
+ DataFile dataFile =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withInputFile(org.apache.iceberg.hadoop.HadoopInputFile.fromPath(testFilePath, conf))
+ .withFormat("parquet")
+ .withRecordCount(200)
+ .build();
+ tbl.newAppend().appendFile(dataFile).commit();
+
+ // Add positional deletes to the table
+ List<Pair<CharSequence, Long>> deletes =
+ Lists.newArrayList(
+ Pair.of(dataFile.path(), 97L),
+ Pair.of(dataFile.path(), 98L),
+ Pair.of(dataFile.path(), 99L),
+ Pair.of(dataFile.path(), 101L),
+ Pair.of(dataFile.path(), 103L),
+ Pair.of(dataFile.path(), 107L),
+ Pair.of(dataFile.path(), 109L));
+ Pair<DeleteFile, CharSequenceSet> posDeletes =
+ FileHelpers.writeDeleteFile(table, Files.localOutput(temp.newFile()), deletes);
+ tbl.newRowDelta()
+ .addDeletes(posDeletes.first())
+ .validateDataFilesExist(posDeletes.second())
+ .commit();
+
+ Assert.assertEquals(193, rowSet(tblName, tbl, "*").size());
+ }
}