You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/09/01 03:49:16 UTC
[iceberg] branch master updated: Spark 3.2: Add row-based changelog reader (#5682)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 84f40cff9b Spark 3.2: Add row-based changelog reader (#5682)
84f40cff9b is described below
commit 84f40cff9b98ee15b706289e551078355bb8a7a5
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Wed Aug 31 20:49:10 2022 -0700
Spark 3.2: Add row-based changelog reader (#5682)
---
.../iceberg/spark/source/ChangelogRowReader.java | 154 +++++++++++++
.../iceberg/spark/source/TestChangelogReader.java | 255 +++++++++++++++++++++
2 files changed, 409 insertions(+)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
new file mode 100644
index 0000000000..82ab8f360e
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.AddedRowsScanTask;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DeletedDataFileScanTask;
+import org.apache.iceberg.DeletedRowsScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.JoinedRow;
+import org.apache.spark.unsafe.types.UTF8String;
+
+class ChangelogRowReader extends BaseRowReader<ChangelogScanTask> {
+ ChangelogRowReader(
+ Table table,
+ ScanTaskGroup<ChangelogScanTask> taskGroup,
+ Schema expectedSchema,
+ boolean caseSensitive) {
+ super(table, taskGroup, expectedSchema, caseSensitive);
+ }
+
+ @Override
+ protected CloseableIterator<InternalRow> open(ChangelogScanTask task) {
+ JoinedRow cdcRow = new JoinedRow();
+
+ cdcRow.withRight(changelogMetadata(task));
+
+ CloseableIterable<InternalRow> rows = openChangelogScanTask(task);
+ CloseableIterable<InternalRow> cdcRows = CloseableIterable.transform(rows, cdcRow::withLeft);
+
+ return cdcRows.iterator();
+ }
+
+ private static InternalRow changelogMetadata(ChangelogScanTask task) {
+ InternalRow metadataRow = new GenericInternalRow(3);
+
+ metadataRow.update(0, UTF8String.fromString(task.operation().name()));
+ metadataRow.update(1, task.changeOrdinal());
+ metadataRow.update(2, task.commitSnapshotId());
+
+ return metadataRow;
+ }
+
+ private CloseableIterable<InternalRow> openChangelogScanTask(ChangelogScanTask task) {
+ if (task instanceof AddedRowsScanTask) {
+ return openAddedRowsScanTask((AddedRowsScanTask) task);
+
+ } else if (task instanceof DeletedRowsScanTask) {
+ throw new UnsupportedOperationException("Deleted rows scan task is not supported yet");
+
+ } else if (task instanceof DeletedDataFileScanTask) {
+ return openDeletedDataFileScanTask((DeletedDataFileScanTask) task);
+
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported changelog scan task type: " + task.getClass().getName());
+ }
+ }
+
+ CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) {
+ String filePath = task.file().path().toString();
+ SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes());
+ return deletes.filter(rows(task, deletes.requiredSchema()));
+ }
+
+ private CloseableIterable<InternalRow> openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
+ String filePath = task.file().path().toString();
+ SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes());
+ return deletes.filter(rows(task, deletes.requiredSchema()));
+ }
+
+ private CloseableIterable<InternalRow> rows(ContentScanTask<DataFile> task, Schema readSchema) {
+ Map<Integer, ?> idToConstant = constantsMap(task, readSchema);
+
+ String filePath = task.file().path().toString();
+
+ // update the current file for Spark's filename() function
+ InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+ InputFile location = getInputFile(filePath);
+ Preconditions.checkNotNull(location, "Could not find InputFile");
+ return newIterable(
+ location,
+ task.file().format(),
+ task.start(),
+ task.length(),
+ task.residual(),
+ readSchema,
+ idToConstant);
+ }
+
+ @Override
+ protected Stream<ContentFile<?>> referencedFiles(ChangelogScanTask task) {
+ if (task instanceof AddedRowsScanTask) {
+ return addedRowsScanTaskFiles((AddedRowsScanTask) task);
+
+ } else if (task instanceof DeletedRowsScanTask) {
+ throw new UnsupportedOperationException("Deleted rows scan task is not supported yet");
+
+ } else if (task instanceof DeletedDataFileScanTask) {
+ return deletedDataFileScanTaskFiles((DeletedDataFileScanTask) task);
+
+ } else {
+ throw new IllegalArgumentException(
+ "Unsupported changelog scan task type: " + task.getClass().getName());
+ }
+ }
+
+ private static Stream<ContentFile<?>> deletedDataFileScanTaskFiles(DeletedDataFileScanTask task) {
+ DeletedDataFileScanTask deletedDataFileScanTask = task;
+ DataFile file = deletedDataFileScanTask.file();
+ List<DeleteFile> existingDeletes = deletedDataFileScanTask.existingDeletes();
+ return Stream.concat(Stream.of(file), existingDeletes.stream());
+ }
+
+ private static Stream<ContentFile<?>> addedRowsScanTaskFiles(AddedRowsScanTask task) {
+ AddedRowsScanTask addedRowsScanTask = task;
+ DataFile file = addedRowsScanTask.file();
+ List<DeleteFile> deletes = addedRowsScanTask.deletes();
+ return Stream.concat(Stream.of(file), deletes.stream());
+ }
+}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
new file mode 100644
index 0000000000..3fd8718aed
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestChangelogReader extends SparkTestBase {
+ private static final Schema SCHEMA =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get()));
+ private static final PartitionSpec SPEC =
+ PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+ private final List<Record> records1 = Lists.newArrayList();
+ private final List<Record> records2 = Lists.newArrayList();
+
+ private Table table;
+ private DataFile dataFile1;
+ private DataFile dataFile2;
+
+ @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+ @Before
+ public void before() throws IOException {
+ table = catalog.createTable(TableIdentifier.of("default", "test"), SCHEMA, SPEC);
+ // create some data
+ GenericRecord record = GenericRecord.create(table.schema());
+ records1.add(record.copy("id", 29, "data", "a"));
+ records1.add(record.copy("id", 43, "data", "b"));
+ records1.add(record.copy("id", 61, "data", "c"));
+ records1.add(record.copy("id", 89, "data", "d"));
+
+ records2.add(record.copy("id", 100, "data", "e"));
+ records2.add(record.copy("id", 121, "data", "f"));
+ records2.add(record.copy("id", 122, "data", "g"));
+
+ // write data to files
+ dataFile1 = writeDataFile(records1);
+ dataFile2 = writeDataFile(records2);
+ }
+
+ @After
+ public void after() {
+ catalog.dropTable(TableIdentifier.of("default", "test"));
+ }
+
+ @Test
+ public void testInsert() throws IOException {
+ table.newAppend().appendFile(dataFile1).commit();
+ long snapshotId1 = table.currentSnapshot().snapshotId();
+
+ table.newAppend().appendFile(dataFile2).commit();
+ long snapshotId2 = table.currentSnapshot().snapshotId();
+
+ CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups = newScan().planTasks();
+
+ List<InternalRow> rows = Lists.newArrayList();
+
+ for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
+ ChangelogRowReader reader = new ChangelogRowReader(table, taskGroup, table.schema(), false);
+ while (reader.next()) {
+ rows.add(reader.get().copy());
+ }
+ reader.close();
+ }
+
+ rows.sort((r1, r2) -> r1.getInt(0) - r2.getInt(0));
+
+ List<Object[]> expectedRows = Lists.newArrayList();
+ addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
+ addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2);
+
+ assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
+ }
+
+ @Test
+ public void testDelete() throws IOException {
+ table.newAppend().appendFile(dataFile1).commit();
+ long snapshotId1 = table.currentSnapshot().snapshotId();
+
+ table.newDelete().deleteFile(dataFile1).commit();
+ long snapshotId2 = table.currentSnapshot().snapshotId();
+
+ CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups =
+ newScan().fromSnapshotExclusive(snapshotId1).planTasks();
+
+ List<InternalRow> rows = Lists.newArrayList();
+
+ for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
+ ChangelogRowReader reader = new ChangelogRowReader(table, taskGroup, table.schema(), false);
+ while (reader.next()) {
+ rows.add(reader.get().copy());
+ }
+ reader.close();
+ }
+
+ rows.sort((r1, r2) -> r1.getInt(0) - r2.getInt(0));
+
+ List<Object[]> expectedRows = Lists.newArrayList();
+ addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 0, records1);
+
+ assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
+ }
+
+ @Test
+ public void testDataFileRewrite() throws IOException {
+ table.newAppend().appendFile(dataFile1).commit();
+ table.newAppend().appendFile(dataFile2).commit();
+ long snapshotId2 = table.currentSnapshot().snapshotId();
+
+ table
+ .newRewrite()
+ .rewriteFiles(ImmutableSet.of(dataFile1), ImmutableSet.of(dataFile2))
+ .commit();
+
+ // the rewrite operation should generate no Changelog rows
+ CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups =
+ newScan().fromSnapshotExclusive(snapshotId2).planTasks();
+
+ List<InternalRow> rows = Lists.newArrayList();
+
+ for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
+ ChangelogRowReader reader = new ChangelogRowReader(table, taskGroup, table.schema(), false);
+ while (reader.next()) {
+ rows.add(reader.get().copy());
+ }
+ reader.close();
+ }
+
+ Assert.assertEquals("Should have no rows", 0, rows.size());
+ }
+
+ @Test
+ public void testMixDeleteAndInsert() throws IOException {
+ table.newAppend().appendFile(dataFile1).commit();
+ long snapshotId1 = table.currentSnapshot().snapshotId();
+
+ table.newDelete().deleteFile(dataFile1).commit();
+ long snapshotId2 = table.currentSnapshot().snapshotId();
+
+ table.newAppend().appendFile(dataFile2).commit();
+ long snapshotId3 = table.currentSnapshot().snapshotId();
+
+ CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups = newScan().planTasks();
+
+ List<InternalRow> rows = Lists.newArrayList();
+
+ for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
+ ChangelogRowReader reader = new ChangelogRowReader(table, taskGroup, table.schema(), false);
+ while (reader.next()) {
+ rows.add(reader.get().copy());
+ }
+ reader.close();
+ }
+
+ // order by the change ordinal
+ rows.sort(
+ (r1, r2) -> {
+ if (r1.getInt(3) != r2.getInt(3)) {
+ return r1.getInt(3) - r2.getInt(3);
+ } else {
+ return r1.getInt(0) - r2.getInt(0);
+ }
+ });
+
+ List<Object[]> expectedRows = Lists.newArrayList();
+ addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
+ addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, records1);
+ addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, records2);
+
+ assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
+ }
+
+ private IncrementalChangelogScan newScan() {
+ return table.newIncrementalChangelogScan();
+ }
+
+ private List<Object[]> addExpectedRows(
+ List<Object[]> expectedRows,
+ ChangelogOperation operation,
+ long snapshotId,
+ int changeOrdinal,
+ List<Record> records) {
+ records.forEach(
+ r ->
+ expectedRows.add(row(r.get(0), r.get(1), operation.name(), changeOrdinal, snapshotId)));
+ return expectedRows;
+ }
+
+ protected List<Object[]> internalRowsToJava(List<InternalRow> rows) {
+ return rows.stream().map(this::toJava).collect(Collectors.toList());
+ }
+
+ private Object[] toJava(InternalRow row) {
+ Object[] values = new Object[row.numFields()];
+ values[0] = row.getInt(0);
+ values[1] = row.getString(1);
+ values[2] = row.getString(2);
+ values[3] = row.getInt(3);
+ values[4] = row.getLong(4);
+ return values;
+ }
+
+ private DataFile writeDataFile(List<Record> records) throws IOException {
+ // records all use IDs that are in bucket id_bucket=0
+ return FileHelpers.writeDataFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), records);
+ }
+}