You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/09/01 03:49:16 UTC

[iceberg] branch master updated: Spark 3.2: Add row-based changelog reader (#5682)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 84f40cff9b Spark 3.2: Add row-based changelog reader (#5682)
84f40cff9b is described below

commit 84f40cff9b98ee15b706289e551078355bb8a7a5
Author: Yufei Gu <yu...@apache.org>
AuthorDate: Wed Aug 31 20:49:10 2022 -0700

    Spark 3.2: Add row-based changelog reader (#5682)
---
 .../iceberg/spark/source/ChangelogRowReader.java   | 154 +++++++++++++
 .../iceberg/spark/source/TestChangelogReader.java  | 255 +++++++++++++++++++++
 2 files changed, 409 insertions(+)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
new file mode 100644
index 0000000000..82ab8f360e
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.AddedRowsScanTask;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DeletedDataFileScanTask;
+import org.apache.iceberg.DeletedRowsScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.expressions.JoinedRow;
+import org.apache.spark.unsafe.types.UTF8String;
+
+class ChangelogRowReader extends BaseRowReader<ChangelogScanTask> {
+  ChangelogRowReader(
+      Table table,
+      ScanTaskGroup<ChangelogScanTask> taskGroup,
+      Schema expectedSchema,
+      boolean caseSensitive) {
+    super(table, taskGroup, expectedSchema, caseSensitive);
+  }
+
+  @Override
+  protected CloseableIterator<InternalRow> open(ChangelogScanTask task) {
+    JoinedRow cdcRow = new JoinedRow();
+
+    cdcRow.withRight(changelogMetadata(task));
+
+    CloseableIterable<InternalRow> rows = openChangelogScanTask(task);
+    CloseableIterable<InternalRow> cdcRows = CloseableIterable.transform(rows, cdcRow::withLeft);
+
+    return cdcRows.iterator();
+  }
+
+  private static InternalRow changelogMetadata(ChangelogScanTask task) {
+    InternalRow metadataRow = new GenericInternalRow(3);
+
+    metadataRow.update(0, UTF8String.fromString(task.operation().name()));
+    metadataRow.update(1, task.changeOrdinal());
+    metadataRow.update(2, task.commitSnapshotId());
+
+    return metadataRow;
+  }
+
+  private CloseableIterable<InternalRow> openChangelogScanTask(ChangelogScanTask task) {
+    if (task instanceof AddedRowsScanTask) {
+      return openAddedRowsScanTask((AddedRowsScanTask) task);
+
+    } else if (task instanceof DeletedRowsScanTask) {
+      throw new UnsupportedOperationException("Deleted rows scan task is not supported yet");
+
+    } else if (task instanceof DeletedDataFileScanTask) {
+      return openDeletedDataFileScanTask((DeletedDataFileScanTask) task);
+
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported changelog scan task type: " + task.getClass().getName());
+    }
+  }
+
+  CloseableIterable<InternalRow> openAddedRowsScanTask(AddedRowsScanTask task) {
+    String filePath = task.file().path().toString();
+    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.deletes());
+    return deletes.filter(rows(task, deletes.requiredSchema()));
+  }
+
+  private CloseableIterable<InternalRow> openDeletedDataFileScanTask(DeletedDataFileScanTask task) {
+    String filePath = task.file().path().toString();
+    SparkDeleteFilter deletes = new SparkDeleteFilter(filePath, task.existingDeletes());
+    return deletes.filter(rows(task, deletes.requiredSchema()));
+  }
+
+  private CloseableIterable<InternalRow> rows(ContentScanTask<DataFile> task, Schema readSchema) {
+    Map<Integer, ?> idToConstant = constantsMap(task, readSchema);
+
+    String filePath = task.file().path().toString();
+
+    // update the current file for Spark's filename() function
+    InputFileBlockHolder.set(filePath, task.start(), task.length());
+
+    InputFile location = getInputFile(filePath);
+    Preconditions.checkNotNull(location, "Could not find InputFile");
+    return newIterable(
+        location,
+        task.file().format(),
+        task.start(),
+        task.length(),
+        task.residual(),
+        readSchema,
+        idToConstant);
+  }
+
+  @Override
+  protected Stream<ContentFile<?>> referencedFiles(ChangelogScanTask task) {
+    if (task instanceof AddedRowsScanTask) {
+      return addedRowsScanTaskFiles((AddedRowsScanTask) task);
+
+    } else if (task instanceof DeletedRowsScanTask) {
+      throw new UnsupportedOperationException("Deleted rows scan task is not supported yet");
+
+    } else if (task instanceof DeletedDataFileScanTask) {
+      return deletedDataFileScanTaskFiles((DeletedDataFileScanTask) task);
+
+    } else {
+      throw new IllegalArgumentException(
+          "Unsupported changelog scan task type: " + task.getClass().getName());
+    }
+  }
+
+  private static Stream<ContentFile<?>> deletedDataFileScanTaskFiles(DeletedDataFileScanTask task) {
+    DeletedDataFileScanTask deletedDataFileScanTask = task;
+    DataFile file = deletedDataFileScanTask.file();
+    List<DeleteFile> existingDeletes = deletedDataFileScanTask.existingDeletes();
+    return Stream.concat(Stream.of(file), existingDeletes.stream());
+  }
+
+  private static Stream<ContentFile<?>> addedRowsScanTaskFiles(AddedRowsScanTask task) {
+    AddedRowsScanTask addedRowsScanTask = task;
+    DataFile file = addedRowsScanTask.file();
+    List<DeleteFile> deletes = addedRowsScanTask.deletes();
+    return Stream.concat(Stream.of(file), deletes.stream());
+  }
+}
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
new file mode 100644
index 0000000000..3fd8718aed
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestChangelogReader extends SparkTestBase {
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get()));
+  private static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+  private final List<Record> records1 = Lists.newArrayList();
+  private final List<Record> records2 = Lists.newArrayList();
+
+  private Table table;
+  private DataFile dataFile1;
+  private DataFile dataFile2;
+
+  @Rule public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void before() throws IOException {
+    table = catalog.createTable(TableIdentifier.of("default", "test"), SCHEMA, SPEC);
+    // create some data
+    GenericRecord record = GenericRecord.create(table.schema());
+    records1.add(record.copy("id", 29, "data", "a"));
+    records1.add(record.copy("id", 43, "data", "b"));
+    records1.add(record.copy("id", 61, "data", "c"));
+    records1.add(record.copy("id", 89, "data", "d"));
+
+    records2.add(record.copy("id", 100, "data", "e"));
+    records2.add(record.copy("id", 121, "data", "f"));
+    records2.add(record.copy("id", 122, "data", "g"));
+
+    // write data to files
+    dataFile1 = writeDataFile(records1);
+    dataFile2 = writeDataFile(records2);
+  }
+
+  @After
+  public void after() {
+    catalog.dropTable(TableIdentifier.of("default", "test"));
+  }
+
+  @Test
+  public void testInsert() throws IOException {
+    table.newAppend().appendFile(dataFile1).commit();
+    long snapshotId1 = table.currentSnapshot().snapshotId();
+
+    table.newAppend().appendFile(dataFile2).commit();
+    long snapshotId2 = table.currentSnapshot().snapshotId();
+
+    CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups = newScan().planTasks();
+
+    List<InternalRow> rows = Lists.newArrayList();
+
+    for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
+      ChangelogRowReader reader = new ChangelogRowReader(table, taskGroup, table.schema(), false);
+      while (reader.next()) {
+        rows.add(reader.get().copy());
+      }
+      reader.close();
+    }
+
+    rows.sort((r1, r2) -> r1.getInt(0) - r2.getInt(0));
+
+    List<Object[]> expectedRows = Lists.newArrayList();
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2);
+
+    assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testDelete() throws IOException {
+    table.newAppend().appendFile(dataFile1).commit();
+    long snapshotId1 = table.currentSnapshot().snapshotId();
+
+    table.newDelete().deleteFile(dataFile1).commit();
+    long snapshotId2 = table.currentSnapshot().snapshotId();
+
+    CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups =
+        newScan().fromSnapshotExclusive(snapshotId1).planTasks();
+
+    List<InternalRow> rows = Lists.newArrayList();
+
+    for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
+      ChangelogRowReader reader = new ChangelogRowReader(table, taskGroup, table.schema(), false);
+      while (reader.next()) {
+        rows.add(reader.get().copy());
+      }
+      reader.close();
+    }
+
+    rows.sort((r1, r2) -> r1.getInt(0) - r2.getInt(0));
+
+    List<Object[]> expectedRows = Lists.newArrayList();
+    addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 0, records1);
+
+    assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
+  }
+
+  @Test
+  public void testDataFileRewrite() throws IOException {
+    table.newAppend().appendFile(dataFile1).commit();
+    table.newAppend().appendFile(dataFile2).commit();
+    long snapshotId2 = table.currentSnapshot().snapshotId();
+
+    table
+        .newRewrite()
+        .rewriteFiles(ImmutableSet.of(dataFile1), ImmutableSet.of(dataFile2))
+        .commit();
+
+    // the rewrite operation should generate no Changelog rows
+    CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups =
+        newScan().fromSnapshotExclusive(snapshotId2).planTasks();
+
+    List<InternalRow> rows = Lists.newArrayList();
+
+    for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
+      ChangelogRowReader reader = new ChangelogRowReader(table, taskGroup, table.schema(), false);
+      while (reader.next()) {
+        rows.add(reader.get().copy());
+      }
+      reader.close();
+    }
+
+    Assert.assertEquals("Should have no rows", 0, rows.size());
+  }
+
+  @Test
+  public void testMixDeleteAndInsert() throws IOException {
+    table.newAppend().appendFile(dataFile1).commit();
+    long snapshotId1 = table.currentSnapshot().snapshotId();
+
+    table.newDelete().deleteFile(dataFile1).commit();
+    long snapshotId2 = table.currentSnapshot().snapshotId();
+
+    table.newAppend().appendFile(dataFile2).commit();
+    long snapshotId3 = table.currentSnapshot().snapshotId();
+
+    CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups = newScan().planTasks();
+
+    List<InternalRow> rows = Lists.newArrayList();
+
+    for (ScanTaskGroup<ChangelogScanTask> taskGroup : taskGroups) {
+      ChangelogRowReader reader = new ChangelogRowReader(table, taskGroup, table.schema(), false);
+      while (reader.next()) {
+        rows.add(reader.get().copy());
+      }
+      reader.close();
+    }
+
+    // order by the change ordinal
+    rows.sort(
+        (r1, r2) -> {
+          if (r1.getInt(3) != r2.getInt(3)) {
+            return r1.getInt(3) - r2.getInt(3);
+          } else {
+            return r1.getInt(0) - r2.getInt(0);
+          }
+        });
+
+    List<Object[]> expectedRows = Lists.newArrayList();
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
+    addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, records1);
+    addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, records2);
+
+    assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
+  }
+
+  private IncrementalChangelogScan newScan() {
+    return table.newIncrementalChangelogScan();
+  }
+
+  private List<Object[]> addExpectedRows(
+      List<Object[]> expectedRows,
+      ChangelogOperation operation,
+      long snapshotId,
+      int changeOrdinal,
+      List<Record> records) {
+    records.forEach(
+        r ->
+            expectedRows.add(row(r.get(0), r.get(1), operation.name(), changeOrdinal, snapshotId)));
+    return expectedRows;
+  }
+
+  protected List<Object[]> internalRowsToJava(List<InternalRow> rows) {
+    return rows.stream().map(this::toJava).collect(Collectors.toList());
+  }
+
+  private Object[] toJava(InternalRow row) {
+    Object[] values = new Object[row.numFields()];
+    values[0] = row.getInt(0);
+    values[1] = row.getString(1);
+    values[2] = row.getString(2);
+    values[3] = row.getInt(3);
+    values[4] = row.getLong(4);
+    return values;
+  }
+
+  private DataFile writeDataFile(List<Record> records) throws IOException {
+    // records all use IDs that are in bucket id_bucket=0
+    return FileHelpers.writeDataFile(
+        table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), records);
+  }
+}