You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/10/18 23:16:40 UTC
[iceberg] branch master updated: Spark 3.2: Add SparkChangelogTable (#6013)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c8ce0c9551 Spark 3.2: Add SparkChangelogTable (#6013)
c8ce0c9551 is described below
commit c8ce0c955141a93162d9bfcbbf38d9c975d6ed1b
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Oct 18 16:16:34 2022 -0700
Spark 3.2: Add SparkChangelogTable (#6013)
---
.../spark/extensions/TestChangelogBatchReads.java | 248 +++++++++++++++++++++
.../org/apache/iceberg/spark/SparkCatalog.java | 73 +++---
.../iceberg/spark/source/ChangelogRowReader.java | 3 +-
.../apache/iceberg/spark/source/SparkBatch.java | 11 +-
.../{SparkBatch.java => SparkChangelogBatch.java} | 115 +++++-----
.../iceberg/spark/source/SparkChangelogScan.java | 163 ++++++++++++++
.../iceberg/spark/source/SparkChangelogTable.java | 102 +++++++++
.../iceberg/spark/source/SparkInputPartition.java | 84 +++++++
.../spark/source/SparkMicroBatchStream.java | 12 +-
.../org/apache/iceberg/spark/source/SparkScan.java | 122 ++++------
.../iceberg/spark/source/SparkScanBuilder.java | 37 ++-
.../actions/TestRemoveOrphanFilesAction3.java | 6 +-
.../iceberg/spark/source/TestPathIdentifier.java | 5 +-
13 files changed, 798 insertions(+), 183 deletions(-)
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java
new file mode 100644
index 0000000000..530abe67c9
--- /dev/null
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogBatchReads.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
+import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.Row;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+public class TestChangelogBatchReads extends SparkExtensionsTestBase {
+
+ @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {
+ 1,
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties()
+ },
+ {
+ 2,
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ SparkCatalogConfig.HIVE.properties()
+ }
+ };
+ }
+
+ private final int formatVersion;
+
+ public TestChangelogBatchReads(
+ int formatVersion, String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ this.formatVersion = formatVersion;
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testDataFilters() {
+ sql(
+ "CREATE TABLE %s (id INT, data STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES ( "
+ + " '%s' = '%d' "
+ + ")",
+ tableName, FORMAT_VERSION, formatVersion);
+
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+ sql("INSERT INTO %s VALUES (3, 'c')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Snapshot snap3 = table.currentSnapshot();
+
+ sql("DELETE FROM %s WHERE id = 3", tableName);
+
+ table.refresh();
+
+ Snapshot snap4 = table.currentSnapshot();
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(3, "c", "INSERT", 2, snap3.snapshotId()),
+ row(3, "c", "DELETE", 3, snap4.snapshotId())),
+ sql("SELECT * FROM %s.changes WHERE id = 3 ORDER BY _change_ordinal, id", tableName));
+ }
+
+ @Test
+ public void testOverwrites() {
+ sql(
+ "CREATE TABLE %s (id INT, data STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES ( "
+ + " '%s' = '%d' "
+ + ")",
+ tableName, FORMAT_VERSION, formatVersion);
+
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Snapshot snap2 = table.currentSnapshot();
+
+ sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+ table.refresh();
+
+ Snapshot snap3 = table.currentSnapshot();
+
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(2, "b", "DELETE", 0, snap3.snapshotId()),
+ row(-2, "b", "INSERT", 0, snap3.snapshotId())),
+ changelogRecords(snap2, snap3));
+ }
+
+ @Test
+ public void testMetadataDeletes() {
+ sql(
+ "CREATE TABLE %s (id INT, data STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES ( "
+ + " '%s' = '%d' "
+ + ")",
+ tableName, FORMAT_VERSION, formatVersion);
+
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Snapshot snap2 = table.currentSnapshot();
+
+ sql("DELETE FROM %s WHERE data = 'a'", tableName);
+
+ table.refresh();
+
+ Snapshot snap3 = table.currentSnapshot();
+ Assert.assertEquals("Operation must match", DataOperations.DELETE, snap3.operation());
+
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(row(1, "a", "DELETE", 0, snap3.snapshotId())),
+ changelogRecords(snap2, snap3));
+ }
+
+ @Test
+ public void testExistingEntriesInNewDataManifestsAreIgnored() {
+ sql(
+ "CREATE TABLE %s (id INT, data STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES ( "
+ + " '%s' = '%d', "
+ + " '%s' = '1', "
+ + " '%s' = 'true' "
+ + ")",
+ tableName, FORMAT_VERSION, formatVersion, MANIFEST_MIN_MERGE_COUNT, MANIFEST_MERGE_ENABLED);
+
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Snapshot snap1 = table.currentSnapshot();
+
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+ table.refresh();
+
+ Snapshot snap2 = table.currentSnapshot();
+ Assert.assertEquals("Manifest number must match", 1, snap2.dataManifests(table.io()).size());
+
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(row(2, "b", "INSERT", 0, snap2.snapshotId())),
+ changelogRecords(snap1, snap2));
+ }
+
+ @Test
+ public void testManifestRewritesAreIgnored() {
+ sql(
+ "CREATE TABLE %s (id INT, data STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES ( "
+ + " '%s' = '%d' "
+ + ")",
+ tableName, FORMAT_VERSION, formatVersion);
+
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+ sql("CALL %s.system.rewrite_manifests('%s')", catalogName, tableIdent);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertEquals("Num snapshots must match", 3, Iterables.size(table.snapshots()));
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(row(1, "INSERT"), row(2, "INSERT")),
+ sql("SELECT id, _change_type FROM %s.changes ORDER BY id", tableName));
+ }
+
+ private List<Object[]> changelogRecords(Snapshot startSnapshot, Snapshot endSnapshot) {
+ DataFrameReader reader = spark.read();
+
+ if (startSnapshot != null) {
+ reader = reader.option(SparkReadOptions.START_SNAPSHOT_ID, startSnapshot.snapshotId());
+ }
+
+ if (endSnapshot != null) {
+ reader = reader.option(SparkReadOptions.END_SNAPSHOT_ID, endSnapshot.snapshotId());
+ }
+
+ return rowsToJava(collect(reader));
+ }
+
+ private List<Row> collect(DataFrameReader reader) {
+ return reader
+ .table(tableName + "." + SparkChangelogTable.TABLE_NAME)
+ .orderBy("_change_ordinal", "_commit_snapshot_id", "_change_type", "id")
+ .collectAsList();
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index 835a395f6e..b3a514dd21 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -35,7 +35,6 @@ import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
@@ -54,6 +53,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.actions.SparkActions;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.iceberg.util.Pair;
@@ -67,6 +67,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange;
@@ -137,23 +138,22 @@ public class SparkCatalog extends BaseCatalog {
}
@Override
- public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
+ public Table loadTable(Identifier ident) throws NoSuchTableException {
try {
- Pair<Table, Long> icebergTable = load(ident);
- return new SparkTable(icebergTable.first(), icebergTable.second(), !cacheEnabled);
+ return load(ident);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
}
@Override
- public SparkTable createTable(
+ public Table createTable(
Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties)
throws TableAlreadyExistsException {
Schema icebergSchema = SparkSchemaUtil.convert(schema, useTimestampsWithoutZone);
try {
Catalog.TableBuilder builder = newBuilder(ident, icebergSchema);
- Table icebergTable =
+ org.apache.iceberg.Table icebergTable =
builder
.withPartitionSpec(Spark3Util.toPartitionSpec(icebergSchema, transforms))
.withLocation(properties.get("location"))
@@ -218,8 +218,7 @@ public class SparkCatalog extends BaseCatalog {
}
@Override
- public SparkTable alterTable(Identifier ident, TableChange... changes)
- throws NoSuchTableException {
+ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
SetProperty setLocation = null;
SetProperty setSnapshotId = null;
SetProperty pickSnapshotId = null;
@@ -252,7 +251,7 @@ public class SparkCatalog extends BaseCatalog {
}
try {
- Table table = load(ident).first();
+ org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
commitChanges(
table, setLocation, setSnapshotId, pickSnapshotId, propertyChanges, schemaChanges);
return new SparkTable(table, true /* refreshEagerly */);
@@ -269,7 +268,7 @@ public class SparkCatalog extends BaseCatalog {
@Override
public boolean purgeTable(Identifier ident) {
try {
- Table table = load(ident).first();
+ org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
ValidationException.check(
PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
"Cannot purge table: GC is disabled (deleting files may corrupt other tables)");
@@ -493,7 +492,7 @@ public class SparkCatalog extends BaseCatalog {
}
private static void commitChanges(
- Table table,
+ org.apache.iceberg.Table table,
SetProperty setLocation,
SetProperty setSnapshotId,
SetProperty pickSnapshotId,
@@ -546,23 +545,24 @@ public class SparkCatalog extends BaseCatalog {
}
}
- private Pair<Table, Long> load(Identifier ident) {
+ private Table load(Identifier ident) {
if (isPathIdentifier(ident)) {
return loadFromPathIdentifier((PathIdentifier) ident);
}
try {
- return Pair.of(icebergCatalog.loadTable(buildIdentifier(ident)), null);
+ org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident));
+ return new SparkTable(table, !cacheEnabled);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
if (ident.namespace().length == 0) {
throw e;
}
- // if the original load didn't work, the identifier may be extended and include a snapshot
- // selector
+ // if the original load didn't work, try using the namespace as an identifier because
+ // the original identifier may include a snapshot selector or may point to the changelog
TableIdentifier namespaceAsIdent = buildIdentifier(namespaceToIdentifier(ident.namespace()));
- Table table;
+ org.apache.iceberg.Table table;
try {
table = icebergCatalog.loadTable(namespaceAsIdent);
} catch (Exception ignored) {
@@ -572,19 +572,27 @@ public class SparkCatalog extends BaseCatalog {
}
// loading the namespace as a table worked, check the name to see if it is a valid selector
+ // or if the name points to the changelog
+
+ if (ident.name().equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {
+ return new SparkChangelogTable(table, !cacheEnabled);
+ }
+
Matcher at = AT_TIMESTAMP.matcher(ident.name());
if (at.matches()) {
long asOfTimestamp = Long.parseLong(at.group(1));
- return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
+ long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
+ return new SparkTable(table, snapshotId, !cacheEnabled);
}
Matcher id = SNAPSHOT_ID.matcher(ident.name());
if (id.matches()) {
long snapshotId = Long.parseLong(id.group(1));
- return Pair.of(table, snapshotId);
+ return new SparkTable(table, snapshotId, !cacheEnabled);
}
- // the name wasn't a valid snapshot selector. throw the original exception
+ // the name wasn't a valid snapshot selector and did not point to the changelog
+ // throw the original exception
throw e;
}
}
@@ -600,13 +608,21 @@ public class SparkCatalog extends BaseCatalog {
}
}
- private Pair<Table, Long> loadFromPathIdentifier(PathIdentifier ident) {
+ @SuppressWarnings("CyclomaticComplexity")
+ private Table loadFromPathIdentifier(PathIdentifier ident) {
Pair<String, List<String>> parsed = parseLocationString(ident.location());
String metadataTableName = null;
Long asOfTimestamp = null;
Long snapshotId = null;
+ boolean isChangelog = false;
+
for (String meta : parsed.second()) {
+ if (meta.equalsIgnoreCase(SparkChangelogTable.TABLE_NAME)) {
+ isChangelog = true;
+ continue;
+ }
+
if (MetadataTableType.from(meta) != null) {
metadataTableName = meta;
continue;
@@ -629,15 +645,22 @@ public class SparkCatalog extends BaseCatalog {
"Cannot specify both snapshot-id and as-of-timestamp: %s",
ident.location());
- Table table =
+ Preconditions.checkArgument(
+ !isChangelog || (snapshotId == null && asOfTimestamp == null),
+ "Cannot specify snapshot-id and as-of-timestamp for changelogs");
+
+ org.apache.iceberg.Table table =
tables.load(parsed.first() + (metadataTableName != null ? "#" + metadataTableName : ""));
- if (snapshotId != null) {
- return Pair.of(table, snapshotId);
+ if (isChangelog) {
+ return new SparkChangelogTable(table, !cacheEnabled);
+
} else if (asOfTimestamp != null) {
- return Pair.of(table, SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp));
+ long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, asOfTimestamp);
+ return new SparkTable(table, snapshotIdAsOfTime, !cacheEnabled);
+
} else {
- return Pair.of(table, null);
+ return new SparkTable(table, snapshotId, !cacheEnabled);
}
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
index 20f0893bcc..a1111eda12 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.stream.Stream;
import org.apache.iceberg.AddedRowsScanTask;
import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ChangelogUtil;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
@@ -48,7 +49,7 @@ class ChangelogRowReader extends BaseRowReader<ChangelogScanTask> {
ScanTaskGroup<ChangelogScanTask> taskGroup,
Schema expectedSchema,
boolean caseSensitive) {
- super(table, taskGroup, expectedSchema, caseSensitive);
+ super(table, taskGroup, ChangelogUtil.dropChangelogMetadata(expectedSchema), caseSensitive);
}
@Override
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
index 6d85047943..19a1dce3c9 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
@@ -25,7 +25,6 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.spark.source.SparkScan.ReadTask;
import org.apache.iceberg.spark.source.SparkScan.ReaderFactory;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.Tasks;
@@ -62,22 +61,22 @@ abstract class SparkBatch implements Batch {
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
String expectedSchemaString = SchemaParser.toJson(expectedSchema);
- InputPartition[] readTasks = new InputPartition[tasks().size()];
+ InputPartition[] partitions = new InputPartition[tasks().size()];
- Tasks.range(readTasks.length)
+ Tasks.range(partitions.length)
.stopOnFailure()
.executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
.run(
index ->
- readTasks[index] =
- new ReadTask(
+ partitions[index] =
+ new SparkInputPartition(
tasks().get(index),
tableBroadcast,
expectedSchemaString,
caseSensitive,
localityEnabled));
- return readTasks;
+ return partitions;
}
protected abstract List<CombinedScanTask> tasks();
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java
similarity index 50%
copy from spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
copy to spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java
index 6d85047943..38e25e9989 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogBatch.java
@@ -19,115 +19,120 @@
package org.apache.iceberg.spark.source;
import java.util.List;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.FileFormat;
+import java.util.Objects;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.spark.source.SparkScan.ReadTask;
-import org.apache.iceberg.spark.source.SparkScan.ReaderFactory;
-import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
-abstract class SparkBatch implements Batch {
+class SparkChangelogBatch implements Batch {
private final JavaSparkContext sparkContext;
private final Table table;
- private final SparkReadConf readConf;
+ private final List<ScanTaskGroup<ChangelogScanTask>> taskGroups;
private final Schema expectedSchema;
private final boolean caseSensitive;
private final boolean localityEnabled;
-
- SparkBatch(SparkSession spark, Table table, SparkReadConf readConf, Schema expectedSchema) {
+ private final int scanHashCode;
+
+ SparkChangelogBatch(
+ SparkSession spark,
+ Table table,
+ SparkReadConf readConf,
+ List<ScanTaskGroup<ChangelogScanTask>> taskGroups,
+ Schema expectedSchema,
+ int scanHashCode) {
this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
- this.readConf = readConf;
+ this.taskGroups = taskGroups;
this.expectedSchema = expectedSchema;
this.caseSensitive = readConf.caseSensitive();
this.localityEnabled = readConf.localityEnabled();
+ this.scanHashCode = scanHashCode;
}
@Override
public InputPartition[] planInputPartitions() {
- // broadcast the table metadata as input partitions will be sent to executors
- Broadcast<Table> tableBroadcast =
- sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
+ Table serializableTable = SerializableTableWithSize.copyOf(table);
+ Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
String expectedSchemaString = SchemaParser.toJson(expectedSchema);
- InputPartition[] readTasks = new InputPartition[tasks().size()];
+ InputPartition[] partitions = new InputPartition[taskGroups.size()];
- Tasks.range(readTasks.length)
+ Tasks.range(partitions.length)
.stopOnFailure()
.executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
.run(
index ->
- readTasks[index] =
- new ReadTask(
- tasks().get(index),
+ partitions[index] =
+ new SparkInputPartition(
+ taskGroups.get(index),
tableBroadcast,
expectedSchemaString,
caseSensitive,
localityEnabled));
- return readTasks;
- }
-
- protected abstract List<CombinedScanTask> tasks();
-
- protected JavaSparkContext sparkContext() {
- return sparkContext;
+ return partitions;
}
@Override
public PartitionReaderFactory createReaderFactory() {
- return new ReaderFactory(batchSize());
+ return new ReaderFactory();
}
- private int batchSize() {
- if (parquetOnly() && parquetBatchReadsEnabled()) {
- return readConf.parquetBatchSize();
- } else if (orcOnly() && orcBatchReadsEnabled()) {
- return readConf.orcBatchSize();
- } else {
- return 0;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
}
- }
- private boolean parquetOnly() {
- return tasks().stream()
- .allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.PARQUET));
- }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
- private boolean parquetBatchReadsEnabled() {
- return readConf.parquetVectorizationEnabled()
- && // vectorization enabled
- expectedSchema.columns().size() > 0
- && // at least one column is projected
- expectedSchema.columns().stream()
- .allMatch(c -> c.type().isPrimitiveType()); // only primitives
+ SparkChangelogBatch that = (SparkChangelogBatch) o;
+ return table.name().equals(that.table.name()) && scanHashCode == that.scanHashCode;
}
- private boolean orcOnly() {
- return tasks().stream()
- .allMatch(task -> !task.isDataTask() && onlyFileFormat(task, FileFormat.ORC));
+ @Override
+ public int hashCode() {
+ return Objects.hash(table.name(), scanHashCode);
}
- private boolean orcBatchReadsEnabled() {
- return readConf.orcVectorizationEnabled()
- && // vectorization enabled
- tasks().stream().noneMatch(TableScanUtil::hasDeletes); // no delete files
+ private static class ReaderFactory implements PartitionReaderFactory {
+ @Override
+ public PartitionReader<InternalRow> createReader(InputPartition partition) {
+ Preconditions.checkArgument(
+ partition instanceof SparkInputPartition,
+ "Unknown input partition type: %s",
+ partition.getClass().getName());
+
+ return new RowReader((SparkInputPartition) partition);
+ }
}
- private boolean onlyFileFormat(CombinedScanTask task, FileFormat fileFormat) {
- return task.files().stream()
- .allMatch(fileScanTask -> fileScanTask.file().format().equals(fileFormat));
+ private static class RowReader extends ChangelogRowReader
+ implements PartitionReader<InternalRow> {
+
+ RowReader(SparkInputPartition partition) {
+ super(
+ partition.table(),
+ partition.taskGroup(),
+ partition.expectedSchema(),
+ partition.isCaseSensitive());
+ }
}
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
new file mode 100644
index 0000000000..3f7927c6d6
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ChangelogScanTask;
+import org.apache.iceberg.IncrementalChangelogScan;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.Statistics;
+import org.apache.spark.sql.connector.read.SupportsReportStatistics;
+import org.apache.spark.sql.types.StructType;
+
+class SparkChangelogScan implements Scan, SupportsReportStatistics {
+
+ private final SparkSession spark;
+ private final Table table;
+ private final IncrementalChangelogScan scan;
+ private final SparkReadConf readConf;
+ private final Schema expectedSchema;
+ private final List<Expression> filters;
+ private final Long startSnapshotId;
+ private final Long endSnapshotId;
+ private final boolean readTimestampWithoutZone;
+
+ // lazy variables
+ private List<ScanTaskGroup<ChangelogScanTask>> taskGroups = null;
+ private StructType expectedSparkType = null;
+
+ SparkChangelogScan(
+ SparkSession spark,
+ Table table,
+ IncrementalChangelogScan scan,
+ SparkReadConf readConf,
+ Schema expectedSchema,
+ List<Expression> filters) {
+
+ SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);
+
+ this.spark = spark;
+ this.table = table;
+ this.scan = scan;
+ this.readConf = readConf;
+ this.expectedSchema = expectedSchema;
+ this.filters = filters != null ? filters : Collections.emptyList();
+ this.startSnapshotId = readConf.startSnapshotId();
+ this.endSnapshotId = readConf.endSnapshotId();
+ this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
+ }
+
+ @Override
+ public Statistics estimateStatistics() {
+ long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum();
+ long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount);
+ return new Stats(sizeInBytes, rowsCount);
+ }
+
+ @Override
+ public StructType readSchema() {
+ if (expectedSparkType == null) {
+ Preconditions.checkArgument(
+ readTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(expectedSchema),
+ SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
+
+ this.expectedSparkType = SparkSchemaUtil.convert(expectedSchema);
+ }
+
+ return expectedSparkType;
+ }
+
+ @Override
+ public Batch toBatch() {
+ return new SparkChangelogBatch(
+ spark, table, readConf, taskGroups(), expectedSchema, hashCode());
+ }
+
+ private List<ScanTaskGroup<ChangelogScanTask>> taskGroups() {
+ if (taskGroups == null) {
+ try (CloseableIterable<ScanTaskGroup<ChangelogScanTask>> groups = scan.planTasks()) {
+ this.taskGroups = Lists.newArrayList(groups);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close changelog scan: " + scan, e);
+ }
+ }
+
+ return taskGroups;
+ }
+
+ @Override
+ public String description() {
+ return String.format(
+ "%s [fromSnapshotId=%d, toSnapshotId=%d, filters=%s]",
+ table, startSnapshotId, endSnapshotId, filtersAsString());
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "IcebergChangelogScan(table=%s, type=%s, fromSnapshotId=%d, toSnapshotId=%d, filters=%s)",
+ table, expectedSchema.asStruct(), startSnapshotId, endSnapshotId, filtersAsString());
+ }
+
+ private String filtersAsString() {
+ return filters.stream().map(Spark3Util::describe).collect(Collectors.joining(", "));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SparkChangelogScan that = (SparkChangelogScan) o;
+ return table.name().equals(that.table.name())
+ && readSchema().equals(that.readSchema()) // compare Spark schemas to ignore field IDs
+ && filters.toString().equals(that.filters.toString())
+ && Objects.equals(startSnapshotId, that.startSnapshotId)
+ && Objects.equals(endSnapshotId, that.endSnapshotId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ table.name(), readSchema(), filters.toString(), startSnapshotId, endSnapshotId);
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java
new file mode 100644
index 0000000000..645a583e5d
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogTable.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Set;
+import org.apache.iceberg.ChangelogUtil;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkChangelogTable implements Table, SupportsRead {
+
+ public static final String TABLE_NAME = "changes";
+
+ private static final Set<TableCapability> CAPABILITIES =
+ ImmutableSet.of(TableCapability.BATCH_READ);
+
+ private final org.apache.iceberg.Table icebergTable;
+ private final boolean refreshEagerly;
+
+ private SparkSession lazySpark = null;
+ private StructType lazyTableSparkType = null;
+ private Schema lazyChangelogSchema = null;
+
+ public SparkChangelogTable(org.apache.iceberg.Table icebergTable, boolean refreshEagerly) {
+ this.icebergTable = icebergTable;
+ this.refreshEagerly = refreshEagerly;
+ }
+
+ @Override
+ public String name() {
+ return icebergTable.name() + "." + TABLE_NAME;
+ }
+
+ @Override
+ public StructType schema() {
+ if (lazyTableSparkType == null) {
+ this.lazyTableSparkType = SparkSchemaUtil.convert(changelogSchema());
+ }
+
+ return lazyTableSparkType;
+ }
+
+ @Override
+ public Set<TableCapability> capabilities() {
+ return CAPABILITIES;
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ if (refreshEagerly) {
+ icebergTable.refresh();
+ }
+
+ return new SparkScanBuilder(spark(), icebergTable, changelogSchema(), options) {
+ @Override
+ public Scan build() {
+ return buildChangelogScan();
+ }
+ };
+ }
+
+ private Schema changelogSchema() {
+ if (lazyChangelogSchema == null) {
+ this.lazyChangelogSchema = ChangelogUtil.changelogSchema(icebergTable.schema());
+ }
+
+ return lazyChangelogSchema;
+ }
+
+ private SparkSession spark() {
+ if (lazySpark == null) {
+ this.lazySpark = SparkSession.active();
+ }
+
+ return lazySpark;
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
new file mode 100644
index 0000000000..7786ee0eaf
--- /dev/null
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkInputPartition.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.Serializable;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+
+class SparkInputPartition implements InputPartition, Serializable {
+ private final ScanTaskGroup<?> taskGroup;
+ private final Broadcast<Table> tableBroadcast;
+ private final String expectedSchemaString;
+ private final boolean caseSensitive;
+
+ private transient Schema expectedSchema = null;
+ private transient String[] preferredLocations = null;
+
+ SparkInputPartition(
+ ScanTaskGroup<?> taskGroup,
+ Broadcast<Table> tableBroadcast,
+ String expectedSchemaString,
+ boolean caseSensitive,
+ boolean localityPreferred) {
+ this.taskGroup = taskGroup;
+ this.tableBroadcast = tableBroadcast;
+ this.expectedSchemaString = expectedSchemaString;
+ this.caseSensitive = caseSensitive;
+ if (localityPreferred) {
+ Table table = tableBroadcast.value();
+ this.preferredLocations = Util.blockLocations(table.io(), taskGroup);
+ } else {
+ this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
+ }
+ }
+
+ @Override
+ public String[] preferredLocations() {
+ return preferredLocations;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T extends ScanTask> ScanTaskGroup<T> taskGroup() {
+ return (ScanTaskGroup<T>) taskGroup;
+ }
+
+ public Table table() {
+ return tableBroadcast.value();
+ }
+
+ public boolean isCaseSensitive() {
+ return caseSensitive;
+ }
+
+ public Schema expectedSchema() {
+ if (expectedSchema == null) {
+ this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
+ }
+
+ return expectedSchema;
+ }
+}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index d3c299aa8b..972988b6b2 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -47,7 +47,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
-import org.apache.iceberg.spark.source.SparkScan.ReadTask;
import org.apache.iceberg.spark.source.SparkScan.ReaderFactory;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
@@ -152,22 +151,23 @@ public class SparkMicroBatchStream implements MicroBatchStream {
List<CombinedScanTask> combinedScanTasks =
Lists.newArrayList(
TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost));
- InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
- Tasks.range(readTasks.length)
+ InputPartition[] partitions = new InputPartition[combinedScanTasks.size()];
+
+ Tasks.range(partitions.length)
.stopOnFailure()
.executeWith(localityPreferred ? ThreadPools.getWorkerPool() : null)
.run(
index ->
- readTasks[index] =
- new ReadTask(
+ partitions[index] =
+ new SparkInputPartition(
combinedScanTasks.get(index),
tableBroadcast,
expectedSchema,
caseSensitive,
localityPreferred));
- return readTasks;
+ return partitions;
}
@Override
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
index 8dc2b47d27..162333e602 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java
@@ -18,8 +18,6 @@
*/
package org.apache.iceberg.spark.source;
-import java.io.Serializable;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -27,13 +25,10 @@ import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.hadoop.HadoopInputFile;
-import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
@@ -44,7 +39,6 @@ import org.apache.iceberg.spark.source.metrics.NumSplits;
import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
import org.apache.iceberg.util.PropertyUtil;
-import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.metric.CustomMetric;
@@ -178,20 +172,22 @@ abstract class SparkScan extends SparkBatch implements Scan, SupportsReportStati
@Override
public PartitionReader<InternalRow> createReader(InputPartition partition) {
- if (partition instanceof ReadTask) {
- return new RowReader((ReadTask) partition);
- } else {
- throw new UnsupportedOperationException("Incorrect input partition type: " + partition);
- }
+ Preconditions.checkArgument(
+ partition instanceof SparkInputPartition,
+ "Unknown input partition type: %s",
+ partition.getClass().getName());
+
+ return new RowReader((SparkInputPartition) partition);
}
@Override
public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition partition) {
- if (partition instanceof ReadTask) {
- return new BatchReader((ReadTask) partition, batchSize);
- } else {
- throw new UnsupportedOperationException("Incorrect input partition type: " + partition);
- }
+ Preconditions.checkArgument(
+ partition instanceof SparkInputPartition,
+ "Unknown input partition type: %s",
+ partition.getClass().getName());
+
+ return new BatchReader((SparkInputPartition) partition, batchSize);
}
@Override
@@ -201,13 +197,19 @@ abstract class SparkScan extends SparkBatch implements Scan, SupportsReportStati
}
private static class RowReader extends RowDataReader implements PartitionReader<InternalRow> {
- private long numSplits;
+ private static final Logger LOG = LoggerFactory.getLogger(RowReader.class);
+
+ private final long numSplits;
+
+ RowReader(SparkInputPartition partition) {
+ super(
+ partition.taskGroup(),
+ partition.table(),
+ partition.expectedSchema(),
+ partition.isCaseSensitive());
- RowReader(ReadTask task) {
- super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive());
- numSplits = task.task.files().size();
- LOG.debug(
- "Reading {} file split(s) for table {} using RowReader", numSplits, task.table().name());
+ numSplits = partition.taskGroup().tasks().size();
+ LOG.debug("Reading {} file split(s) for table {}", numSplits, partition.table().name());
}
@Override
@@ -220,15 +222,21 @@ abstract class SparkScan extends SparkBatch implements Scan, SupportsReportStati
private static class BatchReader extends BatchDataReader
implements PartitionReader<ColumnarBatch> {
- private long numSplits;
-
- BatchReader(ReadTask task, int batchSize) {
- super(task.task, task.table(), task.expectedSchema(), task.isCaseSensitive(), batchSize);
- numSplits = task.task.files().size();
- LOG.debug(
- "Reading {} file split(s) for table {} using BatchReader",
- numSplits,
- task.table().name());
+
+ private static final Logger LOG = LoggerFactory.getLogger(BatchReader.class);
+
+ private final long numSplits;
+
+ BatchReader(SparkInputPartition partition, int batchSize) {
+ super(
+ partition.taskGroup(),
+ partition.table(),
+ partition.expectedSchema(),
+ partition.isCaseSensitive(),
+ batchSize);
+
+ numSplits = partition.taskGroup().tasks().size();
+ LOG.debug("Reading {} file split(s) for table {}", numSplits, partition.table().name());
}
@Override
@@ -238,56 +246,4 @@ abstract class SparkScan extends SparkBatch implements Scan, SupportsReportStati
};
}
}
-
- static class ReadTask implements InputPartition, Serializable {
- private final CombinedScanTask task;
- private final Broadcast<Table> tableBroadcast;
- private final String expectedSchemaString;
- private final boolean caseSensitive;
-
- private transient Schema expectedSchema = null;
- private transient String[] preferredLocations = null;
-
- ReadTask(
- CombinedScanTask task,
- Broadcast<Table> tableBroadcast,
- String expectedSchemaString,
- boolean caseSensitive,
- boolean localityPreferred) {
- this.task = task;
- this.tableBroadcast = tableBroadcast;
- this.expectedSchemaString = expectedSchemaString;
- this.caseSensitive = caseSensitive;
- if (localityPreferred) {
- Table table = tableBroadcast.value();
- this.preferredLocations = Util.blockLocations(table.io(), task);
- } else {
- this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
- }
- }
-
- @Override
- public String[] preferredLocations() {
- return preferredLocations;
- }
-
- public Collection<FileScanTask> files() {
- return task.files();
- }
-
- public Table table() {
- return tableBroadcast.value();
- }
-
- public boolean isCaseSensitive() {
- return caseSensitive;
- }
-
- private Schema expectedSchema() {
- if (expectedSchema == null) {
- this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
- }
- return expectedSchema;
- }
- }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 21c34ed6f6..b291a8e267 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.source;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.iceberg.IncrementalChangelogScan;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -238,6 +239,38 @@ public class SparkScanBuilder
return new SparkBatchQueryScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
}
+ public Scan buildChangelogScan() {
+ Preconditions.checkArgument(
+ readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
+ "Cannot set neither %s nor %s for changelogs",
+ SparkReadOptions.SNAPSHOT_ID,
+ SparkReadOptions.AS_OF_TIMESTAMP);
+
+ Long startSnapshotId = readConf.startSnapshotId();
+ Long endSnapshotId = readConf.endSnapshotId();
+
+ Schema expectedSchema = schemaWithMetadataColumns();
+
+ IncrementalChangelogScan scan =
+ table
+ .newIncrementalChangelogScan()
+ .caseSensitive(caseSensitive)
+ .filter(filterExpression())
+ .project(expectedSchema);
+
+ if (startSnapshotId != null) {
+ scan = scan.fromSnapshotExclusive(startSnapshotId);
+ }
+
+ if (endSnapshotId != null) {
+ scan = scan.toSnapshot(endSnapshotId);
+ }
+
+ scan = configureSplitPlanning(scan);
+
+ return new SparkChangelogScan(spark, table, scan, readConf, expectedSchema, filterExpressions);
+ }
+
public Scan buildMergeOnReadScan() {
Preconditions.checkArgument(
readConf.snapshotId() == null && readConf.asOfTimestamp() == null,
@@ -306,8 +339,8 @@ public class SparkScanBuilder
spark, table, scan, snapshot, readConf, expectedSchema, filterExpressions);
}
- private TableScan configureSplitPlanning(TableScan scan) {
- TableScan configuredScan = scan;
+ private <T extends org.apache.iceberg.Scan<T, ?, ?>> T configureSplitPlanning(T scan) {
+ T configuredScan = scan;
Long splitSize = readConf.splitSizeOption();
if (splitSize != null) {
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
index e3699eaede..0abfd79d5d 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction3.java
@@ -46,7 +46,7 @@ public class TestRemoveOrphanFilesAction3 extends TestRemoveOrphanFilesAction {
Map<String, String> options = Maps.newHashMap();
Transform[] transforms = {};
cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
- SparkTable table = cat.loadTable(id);
+ SparkTable table = (SparkTable) cat.loadTable(id);
spark.sql("INSERT INTO mycat.default.table VALUES (1,1,1)");
@@ -76,7 +76,7 @@ public class TestRemoveOrphanFilesAction3 extends TestRemoveOrphanFilesAction {
Map<String, String> options = Maps.newHashMap();
Transform[] transforms = {};
cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
- SparkTable table = cat.loadTable(id);
+ SparkTable table = (SparkTable) cat.loadTable(id);
spark.sql("INSERT INTO hadoop.default.table VALUES (1,1,1)");
@@ -106,7 +106,7 @@ public class TestRemoveOrphanFilesAction3 extends TestRemoveOrphanFilesAction {
Map<String, String> options = Maps.newHashMap();
Transform[] transforms = {};
cat.createTable(id, SparkSchemaUtil.convert(SCHEMA), transforms, options);
- SparkTable table = cat.loadTable(id);
+ SparkTable table = (SparkTable) cat.loadTable(id);
spark.sql("INSERT INTO hive.default.table VALUES (1,1,1)");
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java
index f58451296c..5baf607123 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPathIdentifier.java
@@ -71,8 +71,9 @@ public class TestPathIdentifier extends SparkTestBase {
@Test
public void testPathIdentifier() throws TableAlreadyExistsException, NoSuchTableException {
SparkTable table =
- sparkCatalog.createTable(
- identifier, SparkSchemaUtil.convert(SCHEMA), new Transform[0], ImmutableMap.of());
+ (SparkTable)
+ sparkCatalog.createTable(
+ identifier, SparkSchemaUtil.convert(SCHEMA), new Transform[0], ImmutableMap.of());
Assert.assertEquals(table.table().location(), tableLocation.getAbsolutePath());
Assertions.assertThat(table.table()).isInstanceOf(BaseTable.class);