You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/04/20 18:39:32 UTC

[iceberg] branch master updated: Spark: Add SparkFilesScan for querying a set of files (#2472)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4113c51  Spark: Add SparkFilesScan for querying a set of files (#2472)
4113c51 is described below

commit 4113c5170fcbe12334ce3f0f1c0956204ab020c9
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Tue Apr 20 11:39:19 2021 -0700

    Spark: Add SparkFilesScan for querying a set of files (#2472)
---
 .../org/apache/iceberg/spark/SparkReadOptions.java |   3 +
 .../iceberg/spark/FileScanTaskSetManager.java      |  66 +++++++++++
 .../iceberg/spark/source/SparkFilesScan.java       | 122 +++++++++++++++++++++
 .../spark/source/SparkFilesScanBuilder.java        |  46 ++++++++
 .../apache/iceberg/spark/source/SparkTable.java    |   6 +
 .../iceberg/spark/source/TestSparkFilesScan.java   | 121 ++++++++++++++++++++
 6 files changed, 364 insertions(+)

diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index e0242e0..bce0bf4 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -47,4 +47,7 @@ public class SparkReadOptions {
 
   // Overrides the table's read.parquet.vectorization.batch-size
   public static final String VECTORIZATION_BATCH_SIZE = "batch-size";
+
+  // Set ID that is used to fetch file scan tasks
+  public static final String FILE_SCAN_TASK_SET_ID = "file-scan-task-set-id";
 }
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java b/spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java
new file mode 100644
index 0000000..b43f1c6
--- /dev/null
+++ b/spark3/src/main/java/org/apache/iceberg/spark/FileScanTaskSetManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Pair;
+
+public class FileScanTaskSetManager {
+
+  private static final FileScanTaskSetManager INSTANCE = new FileScanTaskSetManager();
+
+  private final Map<Pair<String, String>, List<FileScanTask>> tasksMap = Maps.newConcurrentMap();
+
+  private FileScanTaskSetManager() {
+  }
+
+  public static FileScanTaskSetManager get() {
+    return INSTANCE;
+  }
+
+  public void stageTasks(Table table, String setID, List<FileScanTask> tasks) {
+    Preconditions.checkArgument(tasks != null && tasks.size() > 0, "Cannot stage null or empty tasks");
+    Pair<String, String> id = toID(table, setID);
+    tasksMap.put(id, tasks);
+  }
+
+  public List<FileScanTask> fetchTasks(Table table, String setID) {
+    Pair<String, String> id = toID(table, setID);
+    return tasksMap.get(id);
+  }
+
+  public List<FileScanTask> removeTasks(Table table, String setID) {
+    Pair<String, String> id = toID(table, setID);
+    return tasksMap.remove(id);
+  }
+
+  private Pair<String, String> toID(Table table, String setID) {
+    TableOperations ops = ((HasTableOperations) table).operations();
+    String tableUUID = ops.current().uuid();
+    return Pair.of(tableUUID, setID);
+  }
+}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java
new file mode 100644
index 0000000..20d2269
--- /dev/null
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+class SparkFilesScan extends SparkBatchScan {
+  private final String taskSetID;
+  private final Long splitSize;
+  private final Integer splitLookback;
+  private final Long splitOpenFileCost;
+
+  private List<CombinedScanTask> tasks = null; // lazy cache of tasks
+
+  SparkFilesScan(SparkSession spark, Table table, boolean caseSensitive, CaseInsensitiveStringMap options) {
+    super(spark, table, caseSensitive, table.schema(), ImmutableList.of(), options);
+
+    this.taskSetID = options.get(SparkReadOptions.FILE_SCAN_TASK_SET_ID);
+
+    Map<String, String> props = table.properties();
+
+    long tableSplitSize = PropertyUtil.propertyAsLong(props, SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+    this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+    int tableSplitLookback = PropertyUtil.propertyAsInt(props, SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+    this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+    long tableOpenFileCost = PropertyUtil.propertyAsLong(props, SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT);
+    this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, tableOpenFileCost);
+  }
+
+  @Override
+  protected List<CombinedScanTask> tasks() {
+    if (tasks == null) {
+      FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+      List<FileScanTask> files = taskSetManager.fetchTasks(table(), taskSetID);
+      ValidationException.check(files != null,
+          "Task set manager has no tasks for table %s with id %s",
+          table(), taskSetID);
+
+      CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles(
+          CloseableIterable.withNoopClose(files),
+          splitSize);
+      CloseableIterable<CombinedScanTask> scanTasks = TableScanUtil.planTasks(
+          splitFiles, splitSize,
+          splitLookback, splitOpenFileCost);
+      this.tasks = Lists.newArrayList(scanTasks);
+    }
+
+    return tasks;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    SparkFilesScan that = (SparkFilesScan) other;
+    return table().name().equals(that.table().name()) &&
+        Objects.equals(taskSetID, that.taskSetID) &&
+        Objects.equals(splitSize, that.splitSize) &&
+        Objects.equals(splitLookback, that.splitLookback) &&
+        Objects.equals(splitOpenFileCost, that.splitOpenFileCost);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(table().name(), taskSetID, splitSize, splitSize, splitOpenFileCost);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "IcebergFilesScan(table=%s, type=%s, taskSetID=%s, caseSensitive=%s)",
+        table(), expectedSchema().asStruct(), taskSetID, caseSensitive());
+  }
+}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java
new file mode 100644
index 0000000..a7c0a6c
--- /dev/null
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.Table;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkFilesScanBuilder implements ScanBuilder {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final boolean caseSensitive;
+  private final CaseInsensitiveStringMap options;
+
+  SparkFilesScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
+    this.spark = spark;
+    this.table = table;
+    this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+    this.options = options;
+  }
+
+  @Override
+  public Scan build() {
+    return new SparkFilesScan(spark, table, caseSensitive, options);
+  }
+}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 93531a3..62940e4 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkFilters;
+import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.SparkSession;
@@ -159,6 +160,11 @@ public class SparkTable implements org.apache.spark.sql.connector.catalog.Table,
 
   @Override
   public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+    if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) {
+      // skip planning the job and fetch already staged file scan tasks
+      return new SparkFilesScanBuilder(sparkSession(), icebergTable, options);
+    }
+
     if (refreshEagerly) {
       icebergTable.refresh();
     }
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkFilesScan.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkFilesScan.java
new file mode 100644
index 0000000..63195cf
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkFilesScan.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.FileScanTaskSetManager;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSparkFilesScan extends SparkCatalogTestBase {
+
+  public TestSparkFilesScan(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testTaskSetLoading() throws NoSuchTableException, IOException {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+
+    List<SimpleRecord> records = ImmutableList.of(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+    df.writeTo(tableName).append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals("Should produce 1 snapshot", 1, Iterables.size(table.snapshots()));
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
+      FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+      String setID = UUID.randomUUID().toString();
+      taskSetManager.stageTasks(table, setID, ImmutableList.copyOf(fileScanTasks));
+
+      // load the staged file set
+      Dataset<Row> scanDF = spark.read().format("iceberg")
+          .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, setID)
+          .load(tableName);
+
+      // write the records back essentially duplicating data
+      scanDF.writeTo(tableName).append();
+    }
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1, "a"), row(1, "a"), row(2, "b"), row(2, "b")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testTaskSetPlanning() throws NoSuchTableException, IOException {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+
+    List<SimpleRecord> records = ImmutableList.of(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b")
+    );
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+    df.coalesce(1).writeTo(tableName).append();
+    df.coalesce(1).writeTo(tableName).append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals("Should produce 2 snapshots", 2, Iterables.size(table.snapshots()));
+
+    try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
+      FileScanTaskSetManager taskSetManager = FileScanTaskSetManager.get();
+      String setID = UUID.randomUUID().toString();
+      List<FileScanTask> tasks = ImmutableList.copyOf(fileScanTasks);
+      taskSetManager.stageTasks(table, setID, tasks);
+
+      // load the staged file set and make sure each file is in a separate split
+      Dataset<Row> scanDF = spark.read().format("iceberg")
+          .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, setID)
+          .option(SparkReadOptions.SPLIT_SIZE, tasks.get(0).file().fileSizeInBytes())
+          .load(tableName);
+      Assert.assertEquals("Num partitions should match", 2, scanDF.javaRDD().getNumPartitions());
+
+      // load the staged file set and make sure we combine both files into a single split
+      scanDF = spark.read().format("iceberg")
+          .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, setID)
+          .option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE)
+          .load(tableName);
+      Assert.assertEquals("Num partitions should match", 1, scanDF.javaRDD().getNumPartitions());
+    }
+  }
+}