You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/20 18:39:32 UTC
[iceberg] branch master updated: Spark: Add SparkFilesScan for
querying a set of files (#2472)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4113c51 Spark: Add SparkFilesScan for querying a set of files (#2472)
4113c51 is described below
commit 4113c5170fcbe12334ce3f0f1c0956204ab020c9
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Apr 20 11:39:19 2021 -0700
Spark: Add SparkFilesScan for querying a set of files (#2472)
---
.../org/apache/iceberg/spark/SparkReadOptions.java | 3 +
.../iceberg/spark/FileScanTaskSetManager.java | 66 +++++++++++
.../iceberg/spark/source/SparkFilesScan.java | 122 +++++++++++++++++++++
.../spark/source/SparkFilesScanBuilder.java | 46 ++++++++
.../apache/iceberg/spark/source/SparkTable.java | 6 +
.../iceberg/spark/source/TestSparkFilesScan.java | 121 ++++++++++++++++++++
6 files changed, 364 insertions(+)
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index e0242e0..bce0bf4 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -47,4 +47,7 @@ public class SparkReadOptions {
// Overrides the table's read.parquet.vectorization.batch-size
public static final String VECTORIZATION_BATCH_SIZE = "batch-size";
+
+ // Set ID that is used to fetch file scan tasks
+ public static final String FILE_SCAN_TASK_SET_ID = "file-scan-task-set-id";
}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java b/spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java
new file mode 100644
index 0000000..b43f1c6
--- /dev/null
+++ b/spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Pair;
+
+public class FileScanTaskSetManager {
+
+ private static final FileScanTaskSetManager INSTANCE = new FileScanTaskSetManager();
+
+ private final Map<Pair<String, String>, List<FileScanTask>> tasksMap = Maps.newConcurrentMap();
+
+ private FileScanTaskSetManager() {
+ }
+
+ public static FileScanTaskSetManager get() {
+ return INSTANCE;
+ }
+
+ public void stageTasks(Table table, String setID, List<FileScanTask> tasks) {
+ Preconditions.checkArgument(tasks != null && tasks.size() > 0, "Cannot stage null or empty tasks");
+ Pair<String, String> id = toID(table, setID);
+ tasksMap.put(id, tasks);
+ }
+
+ public List<FileScanTask> fetchTasks(Table table, String setID) {
+ Pair<String, String> id = toID(table, setID);
+ return tasksMap.get(id);
+ }
+
+ public List<FileScanTask> removeTasks(Table table, String setID) {
+ Pair<String, String> id = toID(table, setID);
+ return tasksMap.remove(id);
+ }
+
+ private Pair<String, String> toID(Table table, String setID) {
+ TableOperations ops = ((HasTableOperations) table).operations();
+ String tableUUID = ops.current().uuid();
+ return Pair.of(tableUUID, setID);
+ }
+}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java
new file mode 100644
index 0000000..20d2269
--- /dev/null
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+class SparkFilesScan extends SparkBatchScan {
+ private final String taskSetID;
+ private final Long splitSize;
+ private final Integer splitLookback;
+ private final Long splitOpenFileCost;
+
+ private List<CombinedScanTask> tasks = null; // lazy cache of tasks
+
+ SparkFilesScan(SparkSession spark, Table table, boolean caseSensitive, CaseInsensitiveStringMap options) {
+ super(spark, table, caseSensitive, table.schema(), ImmutableList.of(), options);
+
+ this.taskSetID = options.get(SparkReadOptions.FILE_SCAN_TASK_SET_ID);
+
+ Map<String, String> props = table.properties();
+
+ long tableSplitSize = PropertyUtil.propertyAsLong(props, SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+ this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+ int tableSplitLookback = PropertyUtil.propertyAsInt(props, SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+ this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+ long tableOpenFileCost = PropertyUtil.propertyAsLong(props, SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT);
+ this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, tableOpenFileCost);
+ }
+
+ @Override
+ protected List<CombinedScanTask> tasks() {
+ if (tasks == null) {
+ FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+ List<FileScanTask> files = taskSetManager.fetchTasks(table(), taskSetID);
+ ValidationException.check(files != null,
+ "Task set manager has no tasks for table %s with id %s",
+ table(), taskSetID);
+
+ CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles(
+ CloseableIterable.withNoopClose(files),
+ splitSize);
+ CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks(
+ splitFiles, splitSize,
+ splitLookback, splitOpenFileCost);
+ this.tasks = Lists.newArrayList(scanTasks);
+ }
+
+ return tasks;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ SparkFilesScan that = (SparkFilesScan) other;
+ return table().name().equals(that.table().name()) &&
+ Objects.equals(taskSetID, that.taskSetID) &&
+ Objects.equals(splitSize, that.splitSize) &&
+ Objects.equals(splitLookback, that.splitLookback) &&
+ Objects.equals(splitOpenFileCost, that.splitOpenFileCost);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(table().name(), taskSetID, splitSize, splitSize, splitOpenFileCost);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "IcebergFilesScan(table=%s, type=%s, taskSetID=%s, caseSensitive=%s)",
+ table(), expectedSchema().asStruct(), taskSetID, caseSensitive());
+ }
+}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java
new file mode 100644
index 0000000..a7c0a6c
--- /dev/null
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.Table;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkFilesScanBuilder implements ScanBuilder {
+
+ private final SparkSession spark;
+ private final Table table;
+ private final boolean caseSensitive;
+ private final CaseInsensitiveStringMap options;
+
+ SparkFilesScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
+ this.spark = spark;
+ this.table = table;
+ this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+ this.options = options;
+ }
+
+ @Override
+ public Scan build() {
+ return new SparkFilesScan(spark, table, caseSensitive, options);
+ }
+}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 93531a3..62940e4 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
+import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.SparkSession;
@@ -159,6 +160,11 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) {
+ // skip planning the job and fetch already staged file scan tasks
+ return new SparkFilesScanBuilder(sparkSession(), icebergTable, options);
+ }
+
if (refreshEagerly) {
icebergTable.refresh();
}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkFilesScan.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkFilesScan.java
new file mode 100644
index 0000000..63195cf
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkFilesScan.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSparkFilesScan extends SparkCatalogTestBase {
+
+ public TestSparkFilesScan(String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testTaskSetLoading() throws NoSuchTableException, IOException {
+ sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+
+ List<SimpleRecord> records = ImmutableList.of(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b")
+ );
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+ df.writeTo(tableName).append();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertEquals("Should produce 1 snapshot", 1, Iterables.size(table.snapshots()));
+
+ try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
+ FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+ String setID = UUID.randomUUID().toString();
+ taskSetManager.stageTasks(table, setID, ImmutableList.copyOf(fileScanTasks));
+
+ // load the staged file set
+ Dataset<Row> scanDF = spark.read().format("iceberg")
+ .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, setID)
+ .load(tableName);
+
+ // write the records back essentially duplicating data
+ scanDF.writeTo(tableName).append();
+ }
+
+ assertEquals("Should have expected rows",
+ ImmutableList.of(row(1, "a"), row(1, "a"), row(2, "b"), row(2, "b")),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testTaskSetPlanning() throws NoSuchTableException, IOException {
+ sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+
+ List<SimpleRecord> records = ImmutableList.of(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b")
+ );
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+ df.coalesce(1).writeTo(tableName).append();
+ df.coalesce(1).writeTo(tableName).append();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assert.assertEquals("Should produce 2 snapshots", 2, Iterables.size(table.snapshots()));
+
+ try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
+ FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+ String setID = UUID.randomUUID().toString();
+ List<FileScanTask> tasks = ImmutableList.copyOf(fileScanTasks);
+ taskSetManager.stageTasks(table, setID, tasks);
+
+ // load the staged file set and make sure each file is in a separate split
+ Dataset<Row> scanDF = spark.read().format("iceberg")
+ .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, setID)
+ .option(SparkReadOptions.SPLIT_SIZE, tasks.get(0).file().fileSizeInBytes())
+ .load(tableName);
+ Assert.assertEquals("Num partitions should match", 2, scanDF.javaRDD().getNumPartitions());
+
+ // load the staged file set and make sure we combine both files into a single split
+ scanDF = spark.read().format("iceberg")
+ .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, setID)
+ .option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE)
+ .load(tableName);
+ Assert.assertEquals("Num partitions should match", 1, scanDF.javaRDD().getNumPartitions());
+ }
+ }
+}