You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/12/11 01:19:47 UTC
[incubator-iceberg] 02/03: Code review updates.
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch spark-3
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
commit 176393ca8d9d3461d1e697e54e87b8b89c12ffad
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Mon Dec 9 16:54:25 2019 -0800
Code review updates.
---
api/src/main/java/org/apache/iceberg/Table.java | 9 -
build.gradle | 6 -
.../org/apache/iceberg/spark/SparkFilters.java | 19 +
.../java/org/apache/iceberg/spark/SparkUtil.java | 242 +++++++++++
.../java/org/apache/iceberg/spark/SparkUtils.java | 92 ----
...cebergTableProvider.java => IcebergSource.java} | 62 ++-
.../apache/iceberg/spark/source/IcebergTable.java | 222 ----------
.../{IcebergBatchScan.java => SparkBatchScan.java} | 482 +++++++++------------
...cebergBatchWriter.java => SparkBatchWrite.java} | 77 ++--
.../iceberg/spark/source/SparkScanBuilder.java | 102 +++++
...reamingWriter.java => SparkStreamingWrite.java} | 54 +--
.../apache/iceberg/spark/source/SparkTable.java | 118 +++++
.../iceberg/spark/source/SparkWriteBuilder.java | 173 ++++++++
...org.apache.spark.sql.sources.DataSourceRegister | 2 +-
.../iceberg/spark/source/TestDataFrameWrites.java | 12 +-
.../iceberg/spark/source/TestFilteredScan.java | 64 +--
.../iceberg/spark/source/TestIcebergSource.java | 5 +-
.../iceberg/spark/source/TestParquetWrite.java | 21 +-
.../spark/source/TestStructuredStreaming.java | 4 +-
19 files changed, 991 insertions(+), 775 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java
index e0fa525..63c55e5 100644
--- a/api/src/main/java/org/apache/iceberg/Table.java
+++ b/api/src/main/java/org/apache/iceberg/Table.java
@@ -228,13 +228,4 @@ public interface Table {
* @return a {@link LocationProvider} to provide locations for new data files
*/
LocationProvider locationProvider();
-
- /**
- * Return the name of this table.
- *
- * @return this table's name
- */
- default String name() {
- return "table(" + hashCode() + ")";
- }
}
diff --git a/build.gradle b/build.gradle
index 6835cbe..cc0f666 100644
--- a/build.gradle
+++ b/build.gradle
@@ -240,15 +240,9 @@ project(':iceberg-spark') {
compile project(':iceberg-hive')
compileOnly "org.apache.avro:avro"
- compileOnly("org.apache.spark:spark-sql_2.12") {
- exclude group: 'org.apache.avro', module: 'avro'
- }
compileOnly("org.apache.spark:spark-hive_2.12") {
exclude group: 'org.apache.avro', module: 'avro'
}
- compileOnly("org.apache.spark:spark-catalyst_2.12") {
- exclude group: 'org.apache.avro', module: 'avro'
- }
testCompile "org.apache.hadoop:hadoop-hdfs::tests"
testCompile "org.apache.hadoop:hadoop-common::tests"
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
index c6f8f52..6c43b87 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
@@ -25,7 +25,10 @@ import java.sql.Date;
import java.sql.Timestamp;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expression.Operation;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.sources.AlwaysFalse$;
+import org.apache.spark.sql.sources.AlwaysTrue$;
import org.apache.spark.sql.sources.And;
import org.apache.spark.sql.sources.EqualNullSafe;
import org.apache.spark.sql.sources.EqualTo;
@@ -60,6 +63,8 @@ public class SparkFilters {
private static final ImmutableMap<Class<? extends Filter>, Operation> FILTERS = ImmutableMap
.<Class<? extends Filter>, Operation>builder()
+ .put(AlwaysTrue$.class, Operation.TRUE)
+ .put(AlwaysFalse$.class, Operation.FALSE)
.put(EqualTo.class, Operation.EQ)
.put(EqualNullSafe.class, Operation.EQ)
.put(GreaterThan.class, Operation.GT)
@@ -75,11 +80,25 @@ public class SparkFilters {
.put(StringStartsWith.class, Operation.STARTS_WITH)
.build();
+ public static Expression convert(Filter[] filters) {
+ Expression expression = Expressions.alwaysTrue();
+ for (Filter filter : filters) {
+ expression = Expressions.and(expression, convert(filter));
+ }
+ return expression;
+ }
+
public static Expression convert(Filter filter) {
// avoid using a chain of if instanceof statements by mapping to the expression enum.
Operation op = FILTERS.get(filter.getClass());
if (op != null) {
switch (op) {
+ case TRUE:
+ return Expressions.alwaysTrue();
+
+ case FALSE:
+ return Expressions.alwaysFalse();
+
case IS_NULL:
IsNull isNullFilter = (IsNull) filter;
return isNull(isNullFilter.attribute());
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
new file mode 100644
index 0000000..62b83f2
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.transforms.PartitionSpecVisitor;
+import org.apache.iceberg.types.Type;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Expression;
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.Literal;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+
+public class SparkUtil {
+
+ private SparkUtil() {
+ }
+
+ private static final Joiner DOT = Joiner.on(".");
+
+ /**
+ * Applies a list of Spark table changes to an {@link UpdateProperties} operation.
+ * <p>
+ * All non-property changes in the list are ignored.
+ *
+ * @param pendingUpdate an uncommitted UpdateProperties operation to configure
+ * @param changes a list of Spark table changes
+ * @return the UpdateProperties operation configured with the changes
+ */
+ public static UpdateProperties applyPropertyChanges(UpdateProperties pendingUpdate, List<TableChange> changes) {
+ for (TableChange change : changes) {
+ if (change instanceof TableChange.SetProperty) {
+ TableChange.SetProperty set = (TableChange.SetProperty) change;
+ pendingUpdate.set(set.property(), set.value());
+
+ } else if (change instanceof TableChange.RemoveProperty) {
+ TableChange.RemoveProperty remove = (TableChange.RemoveProperty) change;
+ pendingUpdate.remove(remove.property());
+ }
+ }
+
+ return pendingUpdate;
+ }
+
+ /**
+ * Applies a list of Spark table changes to an {@link UpdateSchema} operation.
+ * <p>
+ * All non-schema changes in the list are ignored.
+ *
+ * @param pendingUpdate an uncommitted UpdateSchema operation to configure
+ * @param changes a list of Spark table changes
+ * @return the UpdateSchema operation configured with the changes
+ */
+ public static UpdateSchema applySchemaChanges(UpdateSchema pendingUpdate, List<TableChange> changes) {
+ for (TableChange change : changes) {
+ if (change instanceof TableChange.AddColumn) {
+ TableChange.AddColumn add = (TableChange.AddColumn) change;
+ Type type = SparkSchemaUtil.convert(add.dataType());
+ pendingUpdate.addColumn(parentName(add.fieldNames()), leafName(add.fieldNames()), type, add.comment());
+
+ } else if (change instanceof TableChange.UpdateColumnType) {
+ TableChange.UpdateColumnType update = (TableChange.UpdateColumnType) change;
+ Type newType = SparkSchemaUtil.convert(update.newDataType());
+ Preconditions.checkArgument(newType.isPrimitiveType(),
+ "Cannot update '%s', not a primitive type: %s", DOT.join(update.fieldNames()), update.newDataType());
+ pendingUpdate.updateColumn(DOT.join(update.fieldNames()), newType.asPrimitiveType());
+
+ } else if (change instanceof TableChange.UpdateColumnComment) {
+ TableChange.UpdateColumnComment update = (TableChange.UpdateColumnComment) change;
+ pendingUpdate.updateColumnDoc(DOT.join(update.fieldNames()), update.newComment());
+
+ } else if (change instanceof TableChange.RenameColumn) {
+ TableChange.RenameColumn rename = (TableChange.RenameColumn) change;
+ pendingUpdate.renameColumn(DOT.join(rename.fieldNames()), rename.newName());
+
+ } else if (change instanceof TableChange.DeleteColumn) {
+ TableChange.DeleteColumn delete = (TableChange.DeleteColumn) change;
+ pendingUpdate.deleteColumn(DOT.join(delete.fieldNames()));
+ }
+ }
+
+ return pendingUpdate;
+ }
+
+ /**
+ * Converts a PartitionSpec to Spark transforms.
+ *
+ * @param spec a PartitionSpec
+ * @return an array of Transforms
+ */
+ public static Transform[] toTransforms(PartitionSpec spec) {
+ List<Transform> transforms = PartitionSpecVisitor.visit(spec.schema(), spec,
+ new PartitionSpecVisitor<Transform>() {
+ @Override
+ public Transform identity(String sourceName, int sourceId) {
+ return Expressions.identity(sourceName);
+ }
+
+ @Override
+ public Transform bucket(String sourceName, int sourceId, int width) {
+ return Expressions.bucket(width, sourceName);
+ }
+
+ @Override
+ public Transform truncate(String sourceName, int sourceId, int width) {
+ return Expressions.apply("truncate", Expressions.column(sourceName), Expressions.literal(width));
+ }
+
+ @Override
+ public Transform year(String sourceName, int sourceId) {
+ return Expressions.years(sourceName);
+ }
+
+ @Override
+ public Transform month(String sourceName, int sourceId) {
+ return Expressions.months(sourceName);
+ }
+
+ @Override
+ public Transform day(String sourceName, int sourceId) {
+ return Expressions.days(sourceName);
+ }
+
+ @Override
+ public Transform hour(String sourceName, int sourceId) {
+ return Expressions.hours(sourceName);
+ }
+ });
+
+ return transforms.toArray(new Transform[0]);
+ }
+
+ /**
+ * Converts Spark transforms into a {@link PartitionSpec}.
+ *
+ * @param schema the table schema
+ * @param partitioning Spark Transforms
+ * @return a PartitionSpec
+ */
+ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partitioning) {
+ if (partitioning == null || partitioning.length == 0) {
+ return PartitionSpec.unpartitioned();
+ }
+
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+ for (Transform transform : partitioning) {
+ Preconditions.checkArgument(transform.references().length == 1,
+ "Cannot convert transform with more than one column reference: %s", transform);
+ String colName = DOT.join(transform.references()[0].fieldNames());
+ switch (transform.name()) {
+ case "identity":
+ builder.identity(colName);
+ break;
+ case "bucket":
+ builder.bucket(colName, findWidth(transform));
+ break;
+ case "years":
+ builder.year(colName);
+ break;
+ case "months":
+ builder.month(colName);
+ break;
+ case "days":
+ builder.day(colName);
+ break;
+ case "hours":
+ builder.hour(colName);
+ break;
+ case "truncate":
+ builder.truncate(colName, findWidth(transform));
+ break;
+ default:
+ throw new UnsupportedOperationException("Transform is not supported: " + transform);
+ }
+ }
+
+ return builder.build();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static int findWidth(Transform transform) {
+ for (Expression expr : transform.arguments()) {
+ if (expr instanceof Literal) {
+ if (((Literal) expr).dataType() instanceof IntegerType) {
+ Literal<Integer> lit = (Literal<Integer>) expr;
+ Preconditions.checkArgument(lit.value() > 0,
+ "Unsupported width for transform: %s", transform.describe());
+ return lit.value();
+
+ } else if (((Literal) expr).dataType() instanceof LongType) {
+ Literal<Long> lit = (Literal<Long>) expr;
+ Preconditions.checkArgument(lit.value() > 0 && lit.value() < Integer.MAX_VALUE,
+ "Unsupported width for transform: %s", transform.describe());
+ if (lit.value() > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException();
+ }
+ return lit.value().intValue();
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("Cannot find width for transform: " + transform.describe());
+ }
+
+ private static String leafName(String[] fieldNames) {
+ Preconditions.checkArgument(fieldNames.length > 0, "Invalid field name: at least one name is required");
+ return fieldNames[fieldNames.length - 1];
+ }
+
+ private static String parentName(String[] fieldNames) {
+ if (fieldNames.length > 1) {
+ return DOT.join(Arrays.copyOfRange(fieldNames, 0, fieldNames.length - 1));
+ }
+ return null;
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkUtils.java b/spark/src/main/java/org/apache/iceberg/spark/SparkUtils.java
deleted file mode 100644
index 64e81cb..0000000
--- a/spark/src/main/java/org/apache/iceberg/spark/SparkUtils.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark;
-
-
-import com.google.common.collect.Lists;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.expressions.Expressions;
-import org.apache.spark.sql.connector.expressions.Transform;
-
-public final class SparkUtils {
-
- private static final Pattern HAS_WIDTH = Pattern.compile("(\\w+)\\[(\\d+)\\]");
-
- private SparkUtils() {}
-
- public static SparkSession getSparkSession() {
- return SparkSession.builder().getOrCreate();
- }
-
- public static Configuration getBaseConf() {
- return getSparkSession().sparkContext().hadoopConfiguration();
- }
-
-
- public static Transform[] toTransforms(PartitionSpec spec) {
- List<Transform> transforms = Lists.newArrayList();
- int numBuckets = 0;
- List<String> bucketColumns = Lists.newArrayList();
-
- for (PartitionField f : spec.fields()) {
- Matcher widthMatcher = HAS_WIDTH.matcher(f.transform().toString());
- if (widthMatcher.matches()) {
- String name = widthMatcher.group(1);
- if (name.equalsIgnoreCase("truncate")) {
- throw new UnsupportedOperationException("Spark doesn't support truncate transform");
-
- } else if (name.equalsIgnoreCase("bucket")) {
- numBuckets = Integer.parseInt(widthMatcher.group(2));
- bucketColumns.add(spec.schema().findColumnName(f.sourceId()));
-
- } else if (f.transform().toString().equalsIgnoreCase("identity")) {
- transforms.add(Expressions.identity(spec.schema().findColumnName(f.sourceId())));
-
- } else if (f.transform().toString().equalsIgnoreCase("year")) {
- transforms.add(Expressions.years(spec.schema().findColumnName(f.sourceId())));
-
- } else if (f.transform().toString().equalsIgnoreCase("month")) {
- transforms.add(Expressions.months(spec.schema().findColumnName(f.sourceId())));
-
- } else if (f.transform().toString().equalsIgnoreCase("day")) {
- transforms.add(Expressions.days(spec.schema().findColumnName(f.sourceId())));
-
- } else if (f.transform().toString().equalsIgnoreCase("hour")) {
- transforms.add(Expressions.hours(spec.schema().findColumnName(f.sourceId())));
-
- } else {
- throw new UnsupportedOperationException("Spark doesn't support transform " + f.transform());
- }
- }
- }
-
- if (!bucketColumns.isEmpty()) {
- transforms.add(Expressions.bucket(numBuckets, bucketColumns.toArray(new String[0])));
- }
-
- return transforms.toArray(new Transform[0]);
- }
-}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergTableProvider.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
similarity index 60%
rename from spark/src/main/java/org/apache/iceberg/spark/source/IcebergTableProvider.java
rename to spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 9f7e1d9..0b1cb7a 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergTableProvider.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -21,47 +21,59 @@ package org.apache.iceberg.spark.source;
import com.google.common.base.Preconditions;
import java.util.Map;
-import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveCatalogs;
-import org.apache.iceberg.spark.SparkUtils;
-import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.TableProvider;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-public class IcebergTableProvider implements DataSourceRegister, TableProvider {
+public class IcebergSource implements DataSourceRegister, TableProvider {
@Override
public String shortName() {
return "iceberg";
}
@Override
- public Table getTable(CaseInsensitiveStringMap options) {
+ public SparkTable getTable(CaseInsensitiveStringMap options) {
return getTable(options, null);
}
@Override
- public Table getTable(CaseInsensitiveStringMap options, StructType readSchema) {
+ public SparkTable getTable(CaseInsensitiveStringMap options, StructType readSchema) {
// Get Iceberg table from options
- Configuration conf = new Configuration(SparkUtils.getBaseConf());
- org.apache.iceberg.Table tableInIceberg = getTableAndResolveHadoopConfiguration(options, conf);
+ Configuration conf = new Configuration(SparkSession.active().sparkContext().hadoopConfiguration());
+ Table icebergTable = getTableAndResolveHadoopConfiguration(options, conf);
// Build Spark table based on Iceberg table, and return it
- return new IcebergTable(tableInIceberg, readSchema);
+ return new SparkTable(icebergTable, readSchema);
}
- protected org.apache.iceberg.Table getTableAndResolveHadoopConfiguration(
+ protected Table findTable(CaseInsensitiveStringMap options, Configuration conf) {
+ Preconditions.checkArgument(options.containsKey("path"), "Cannot open table: path is not set");
+ String path = options.get("path");
+
+ if (path.contains("/")) {
+ HadoopTables tables = new HadoopTables(conf);
+ return tables.load(path);
+ } else {
+ HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
+ TableIdentifier tableIdentifier = TableIdentifier.parse(path);
+ return hiveCatalog.loadTable(tableIdentifier);
+ }
+ }
+
+ private Table getTableAndResolveHadoopConfiguration(
CaseInsensitiveStringMap options, Configuration conf) {
// Overwrite configurations from the Spark Context with configurations from the options.
mergeIcebergHadoopConfs(conf, options);
- // Find table (in Iceberg) based on the given path
- org.apache.iceberg.Table table = findTable(options, conf);
+ Table table = findTable(options, conf);
// Set confs from table properties
mergeIcebergHadoopConfs(conf, table.properties());
@@ -72,30 +84,10 @@ public class IcebergTableProvider implements DataSourceRegister, TableProvider {
return table;
}
- /**
- * Merge delta options into base conf
- *
- * @param baseConf the base conf
- * @param options the delta options to merge into base
- */
- private void mergeIcebergHadoopConfs(Configuration baseConf, Map<String, String> options) {
+ private static void mergeIcebergHadoopConfs(
+ Configuration baseConf, Map<String, String> options) {
options.keySet().stream()
- .filter(key -> key.startsWith("hadoop.")) /* filter all keys staring with "hadoop." */
+ .filter(key -> key.startsWith("hadoop."))
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
- /* Modify the key by removing the prefix of "hadoop." and merge into base */
- }
-
- protected org.apache.iceberg.Table findTable(CaseInsensitiveStringMap options, Configuration conf) {
- Optional<String> path = Optional.ofNullable(options.get("path"));
- Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is not set");
-
- if (path.get().contains("/")) { // hadoop table
- HadoopTables tables = new HadoopTables(conf);
- return tables.load(path.get());
- } else { // hive table
- HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
- TableIdentifier tableIdentifier = TableIdentifier.parse(path.get());
- return hiveCatalog.loadTable(tableIdentifier);
- }
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergTable.java
deleted file mode 100644
index d02645d..0000000
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergTable.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.source;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.iceberg.PartitionField;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.SparkUtils;
-import org.apache.iceberg.transforms.UnknownTransform;
-import org.apache.iceberg.types.CheckCompatibility;
-import org.apache.spark.sql.connector.catalog.SupportsRead;
-import org.apache.spark.sql.connector.catalog.SupportsWrite;
-import org.apache.spark.sql.connector.catalog.Table;
-import org.apache.spark.sql.connector.catalog.TableCapability;
-import org.apache.spark.sql.connector.expressions.Transform;
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.connector.write.BatchWrite;
-import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite;
-import org.apache.spark.sql.connector.write.SupportsTruncate;
-import org.apache.spark.sql.connector.write.WriteBuilder;
-import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-
-final class IcebergTable implements Table, SupportsRead, SupportsWrite {
-
- private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
- TableCapability.BATCH_READ,
- TableCapability.BATCH_WRITE,
- TableCapability.MICRO_BATCH_READ,
- TableCapability.STREAMING_WRITE,
- TableCapability.TRUNCATE,
- TableCapability.OVERWRITE_DYNAMIC);
-
- private final org.apache.iceberg.Table tableInIceberg;
- private StructType requestSchema;
-
- IcebergTable(org.apache.iceberg.Table tableInIceberg, StructType requestSchema) {
- this.tableInIceberg = tableInIceberg;
-
- if (requestSchema != null) {
- SparkSchemaUtil.convert(tableInIceberg.schema(), requestSchema);
- this.requestSchema = requestSchema;
- }
- }
-
- @Override
- public String name() {
- return tableInIceberg.name();
- }
-
- @Override
- public StructType schema() {
- if (requestSchema != null) {
- return requestSchema;
- }
- return SparkSchemaUtil.convert(tableInIceberg.schema());
- }
-
- @Override
- public Transform[] partitioning() {
- return SparkUtils.toTransforms(tableInIceberg.spec());
- }
-
- @Override
- public Map<String, String> properties() {
- return tableInIceberg.properties();
- }
-
- @Override
- public Set<TableCapability> capabilities() {
- return CAPABILITIES;
- }
-
- @Override
- public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
- return () -> new IcebergBatchScan(tableInIceberg, options, requestSchema);
- }
-
- @Override
- public WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options) {
- return new IcebergWriteBuilder(tableInIceberg, options);
- }
-
- static class IcebergWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, SupportsTruncate {
-
- private org.apache.iceberg.Table table;
- private CaseInsensitiveStringMap writeOptions;
- private TableCapability writeBehavior = TableCapability.BATCH_WRITE;
- private String writeQueryId = null;
- private StructType dsStruct = null;
-
- IcebergWriteBuilder(org.apache.iceberg.Table table, CaseInsensitiveStringMap options) {
- this.table = table;
- this.writeOptions = options;
- }
-
- @Override
- public WriteBuilder withQueryId(String queryId) {
- this.writeQueryId = queryId;
- return this;
- }
-
- @Override
- public WriteBuilder withInputDataSchema(StructType schemaInput) {
- this.dsStruct = schemaInput;
- return this;
- }
-
- @Override
- public WriteBuilder overwriteDynamicPartitions() {
- this.writeBehavior = TableCapability.OVERWRITE_DYNAMIC;
- return this;
- }
-
- @Override
- public WriteBuilder truncate() {
- this.writeBehavior = TableCapability.TRUNCATE;
- return this;
- }
-
- @Override
- public BatchWrite buildForBatch() {
- // TODO. Check queryId and schema before build?
-
- // Validate
- Schema dsSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
- validateWriteSchema(table.schema(), dsSchema, checkNullability(writeOptions));
- validatePartitionTransforms(table.spec());
-
- // Get application id
- String appId = SparkUtils.getSparkSession().sparkContext().applicationId();
-
- // Get write-audit-publish id
- String wapId = SparkUtils.getSparkSession().conf().get("spark.wap.id", null);
-
- return new IcebergBatchWriter(table, writeOptions, writeBehavior, appId, wapId, dsSchema);
- }
-
- @Override
- public StreamingWrite buildForStreaming() {
- // TODO. Check queryId and schema before build?
-
- // Validate
- Schema dsSchema = SparkSchemaUtil.convert(table.schema(), dsStruct);
- validateWriteSchema(table.schema(), dsSchema, checkNullability(writeOptions));
- validatePartitionTransforms(table.spec());
-
- // Change to streaming write if it is just append
- if (writeBehavior.equals(TableCapability.BATCH_WRITE)) {
- writeBehavior = TableCapability.STREAMING_WRITE;
- }
-
- // Get application id
- String appId = SparkUtils.getSparkSession().sparkContext().applicationId();
- String wapId = SparkUtils.getSparkSession().conf().get("spark.wap.id", null);
- return new IcebergStreamingWriter(table, writeOptions, writeQueryId, writeBehavior, appId, wapId, table.schema());
- }
-
- private void validateWriteSchema(Schema tableSchema, Schema dsSchema, Boolean checkNullability) {
- List<String> errors;
- if (checkNullability) {
- errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
- } else {
- errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema);
- }
- if (!errors.isEmpty()) {
- StringBuilder sb = new StringBuilder();
- sb.append("Cannot write incompatible dataset to table with schema:\n")
- .append(tableSchema)
- .append("\nProblems:");
- for (String error : errors) {
- sb.append("\n* ").append(error);
- }
- throw new IllegalArgumentException(sb.toString());
- }
- }
-
- private void validatePartitionTransforms(PartitionSpec spec) {
- if (spec.fields().stream().anyMatch(field -> field.transform() instanceof UnknownTransform)) {
- String unsupported = spec.fields().stream()
- .map(PartitionField::transform)
- .filter(transform -> transform instanceof UnknownTransform)
- .map(org.apache.iceberg.transforms.Transform::toString)
- .collect(Collectors.joining(", "));
-
- throw new UnsupportedOperationException(
- String.format("Cannot write using unsupported transforms: %s", unsupported));
- }
- }
-
- private boolean checkNullability(CaseInsensitiveStringMap options) {
- boolean sparkCheckNullability = Boolean.parseBoolean(SparkUtils.getSparkSession().conf()
- .get("spark.sql.iceberg.check-nullability", "true"));
- boolean dataFrameCheckNullability = options.getBoolean("check-nullability", true);
- return sparkCheckNullability && dataFrameCheckNullability;
- }
- }
-}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
similarity index 77%
rename from spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java
rename to spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
index b3fef6d..4e04749 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchScan.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
@@ -28,6 +28,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -56,7 +57,6 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.SparkAvroReader;
import org.apache.iceberg.spark.data.SparkOrcReader;
@@ -76,10 +76,7 @@ import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.Statistics;
-import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
-import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
-import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
@@ -91,14 +88,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.unsafe.types.UTF8String;
import scala.collection.JavaConverters;
-public class IcebergBatchScan implements Scan,
- Batch,
- SupportsPushDownFilters,
- SupportsPushDownRequiredColumns,
- SupportsReportStatistics {
- private static final Filter[] NO_FILTERS = new Filter[0];
+class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
private final Table table;
+ private final boolean caseSensitive;
+ private final Schema expectedSchema;
+ private final List<Expression> filterExpressions;
private final Long snapshotId;
private final Long asOfTimestamp;
private final Long splitSize;
@@ -106,18 +101,16 @@ public class IcebergBatchScan implements Scan,
private final Long splitOpenFileCost;
private final FileIO fileIo;
private final EncryptionManager encryptionManager;
- private final boolean caseSensitive;
- private StructType requestedSchema = null;
- private List<Expression> filterExpressions = null;
- private Filter[] pushedFilters = NO_FILTERS;
// lazy variables
- private Schema schema = null;
- private StructType type = null; // cached because Spark accesses it multiple times
private List<CombinedScanTask> tasks = null; // lazy cache of tasks
- public IcebergBatchScan(Table table, Boolean caseSensitive, CaseInsensitiveStringMap options) {
+ SparkBatchScan(Table table, boolean caseSensitive, Schema expectedSchema, List<Expression> filters,
+ CaseInsensitiveStringMap options) {
this.table = table;
+ this.caseSensitive = caseSensitive;
+ this.expectedSchema = expectedSchema;
+ this.filterExpressions = filters;
this.snapshotId = options.containsKey("snapshot-id") ? options.getLong("snapshot-id", 0) : null;
this.asOfTimestamp = options.containsKey("as-of-timestamp") ? options.getLong("as-of-timestamp", 0) : null;
@@ -134,22 +127,53 @@ public class IcebergBatchScan implements Scan,
this.splitOpenFileCost = options.containsKey("file-open-cost") ? options.getLong("file-open-cost",
TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT) : null;
- this.schema = table.schema();
this.fileIo = table.io();
this.encryptionManager = table.encryption();
- this.caseSensitive = caseSensitive;
}
- public IcebergBatchScan(Table table, CaseInsensitiveStringMap options) {
- this(table, true, options);
+ @Override
+ public Batch toBatch() {
+ return this;
}
- public IcebergBatchScan(Table table, CaseInsensitiveStringMap options, StructType requestedSchema) {
- this(table, true, options);
+ @Override
+ public StructType readSchema() {
+ return SparkSchemaUtil.convert(expectedSchema);
+ }
- if (requestedSchema != null) {
- pruneColumns(requestedSchema);
+ @Override
+ public InputPartition[] planInputPartitions() {
+ String tableSchemaString = SchemaParser.toJson(table.schema());
+ String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+ List<CombinedScanTask> scanTasks = tasks();
+ InputPartition[] readTasks = new InputPartition[scanTasks.size()];
+ for (int i = 0; i < scanTasks.size(); i++) {
+ readTasks[i] = new ReadTask(
+ scanTasks.get(i), tableSchemaString, expectedSchemaString, fileIo, encryptionManager, caseSensitive);
}
+
+ return readTasks;
+ }
+
+ @Override
+ public ReaderFactory createReaderFactory() {
+ return new ReaderFactory();
+ }
+
+ @Override
+ public Statistics estimateStatistics() {
+ long sizeInBytes = 0L;
+ long numRows = 0L;
+
+ for (CombinedScanTask task : tasks()) {
+ for (FileScanTask file : task.files()) {
+ sizeInBytes += file.length();
+ numRows += file.file().recordCount();
+ }
+ }
+
+ return new Stats(sizeInBytes, numRows);
}
private List<CombinedScanTask> tasks() {
@@ -157,7 +181,7 @@ public class IcebergBatchScan implements Scan,
TableScan scan = table
.newScan()
.caseSensitive(caseSensitive)
- .project(lazySchema());
+ .project(expectedSchema);
if (snapshotId != null) {
scan = scan.useSnapshot(snapshotId);
@@ -196,301 +220,96 @@ public class IcebergBatchScan implements Scan,
}
@Override
- public Filter[] pushFilters(Filter[] filters) {
- this.tasks = null; // invalidate cached tasks, if present
-
- List<Expression> expressions = Lists.newArrayListWithExpectedSize(filters.length);
- List<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length);
-
- for (Filter filter : filters) {
- Expression expr = SparkFilters.convert(filter);
- if (expr != null) {
- expressions.add(expr);
- pushed.add(filter);
- }
- }
-
- this.filterExpressions = expressions;
- this.pushedFilters = pushed.toArray(new Filter[0]);
-
- // invalidate the schema that will be projected
- this.schema = null;
- this.type = null;
-
- // Spark doesn't support residuals per task, so return all filters
- // to get Spark to handle record-level filtering
- return filters;
- }
-
- @Override
- public Batch toBatch() {
- return this;
- }
-
- @Override
- public Filter[] pushedFilters() {
- return pushedFilters;
- }
-
- @Override
- public void pruneColumns(StructType newRequestedSchema) {
- this.requestedSchema = newRequestedSchema;
-
- // invalidate the schema that will be projected
- this.schema = null;
- this.type = null;
+ public String toString() {
+ return String.format(
+ "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s)",
+ table, expectedSchema.asStruct(), filterExpressions, caseSensitive);
}
- @Override
- public Scan build() {
- return this;
- }
-
- @Override
- public Statistics estimateStatistics() {
- long sizeInBytes = 0L;
- long numRows = 0L;
-
- for (CombinedScanTask task : tasks()) {
- for (FileScanTask file : task.files()) {
- sizeInBytes += file.length();
- numRows += file.file().recordCount();
+ private static class ReaderFactory implements PartitionReaderFactory {
+ @Override
+ public PartitionReader<InternalRow> createReader(InputPartition inputPartition) {
+ if (inputPartition instanceof ReadTask) {
+ return new TaskDataReader((ReadTask) inputPartition);
+ } else {
+ throw new UnsupportedOperationException("Incorrect input partition type: " + inputPartition);
}
}
-
- return new Stats(sizeInBytes, numRows);
}
- public static class BatchReadInputPartition implements InputPartition, Serializable {
+ private static class ReadTask implements InputPartition, Serializable {
private final CombinedScanTask task;
private final String tableSchemaString;
private final String expectedSchemaString;
- private final FileIO fileIo;
+ private final FileIO io;
private final EncryptionManager encryptionManager;
private final boolean caseSensitive;
private transient Schema tableSchema = null;
private transient Schema expectedSchema = null;
- BatchReadInputPartition(
- CombinedScanTask task,
- String tableSchemaString,
- String expectedSchemaString,
- FileIO fileIo,
- EncryptionManager encryptionManager,
- boolean caseSensitive) {
+ ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
+ FileIO io, EncryptionManager encryptionManager, boolean caseSensitive) {
this.task = task;
this.tableSchemaString = tableSchemaString;
this.expectedSchemaString = expectedSchemaString;
- this.fileIo = fileIo;
+ this.io = io;
this.encryptionManager = encryptionManager;
this.caseSensitive = caseSensitive;
}
- private Schema lazyTableSchema() {
- if (tableSchema == null) {
- this.tableSchema = SchemaParser.fromJson(tableSchemaString);
- }
- return tableSchema;
+ public Collection<FileScanTask> files() {
+ return task.files();
}
- private Schema lazyExpectedSchema() {
- if (expectedSchema == null) {
- this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
- }
- return expectedSchema;
- }
- }
-
- @Override
- public InputPartition[] planInputPartitions() {
- String tableSchemaString = SchemaParser.toJson(table.schema());
- String expectedSchemaString = SchemaParser.toJson(lazySchema());
-
- List<CombinedScanTask> scanTasks = tasks();
- InputPartition[] readTasks = new InputPartition[scanTasks.size()];
- for (int i = 0; i < scanTasks.size(); i++) {
- readTasks[i] = new BatchReadInputPartition(scanTasks.get(i), tableSchemaString, expectedSchemaString, fileIo,
- encryptionManager, caseSensitive);
- }
-
- return readTasks;
- }
-
- @Override
- public IcebergRowReaderFactory createReaderFactory() {
- return new IcebergRowReaderFactory();
- }
-
- private Schema lazySchema() {
- if (schema == null) {
- if (requestedSchema != null) {
- this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema);
- } else {
- this.schema = table.schema();
- }
+ public FileIO io() {
+ return io;
}
- return schema;
- }
-
- private StructType lazyType() {
- if (type == null) {
- this.type = SparkSchemaUtil.convert(lazySchema());
- }
- return type;
- }
-
- @Override
- public StructType readSchema() {
- return lazyType();
- }
-
-
- public static class PartitionRowConverter implements Function<StructLike, InternalRow> {
- private final DataType[] types;
- private final int[] positions;
- private final Class<?>[] javaTypes;
- private final GenericInternalRow reusedRow;
-
- PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
- StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
- StructField[] fields = partitionType.fields();
-
- this.types = new DataType[fields.length];
- this.positions = new int[types.length];
- this.javaTypes = new Class<?>[types.length];
- this.reusedRow = new GenericInternalRow(types.length);
-
- List<PartitionField> partitionFields = spec.fields();
- for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
- this.types[rowIndex] = fields[rowIndex].dataType();
- int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
- for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
- PartitionField field = spec.fields().get(specIndex);
- if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
- positions[rowIndex] = specIndex;
- javaTypes[rowIndex] = spec.javaClasses()[specIndex];
- break;
- }
- }
- }
+ public EncryptionManager encryptionManager() {
+ return encryptionManager;
}
- @Override
- public InternalRow apply(StructLike tuple) {
- for (int i = 0; i < types.length; i += 1) {
- Object value = tuple.get(positions[i], javaTypes[i]);
- if (value != null) {
- reusedRow.update(i, convert(value, types[i]));
- } else {
- reusedRow.setNullAt(i);
- }
- }
-
- return reusedRow;
+ public boolean isCaseSensitive() {
+ return caseSensitive;
}
- /**
- * Converts the objects into instances used by Spark's InternalRow.
- *
- * @param value a data value
- * @param type the Spark data type
- * @return the value converted to the representation expected by Spark's InternalRow.
- */
- private static Object convert(Object value, DataType type) {
- if (type instanceof StringType) {
- return UTF8String.fromString(value.toString());
- } else if (type instanceof BinaryType) {
- return ByteBuffers.toByteArray((ByteBuffer) value);
- } else if (type instanceof DecimalType) {
- return Decimal.fromDecimal(value);
+ private Schema tableSchema() {
+ if (tableSchema == null) {
+ this.tableSchema = SchemaParser.fromJson(tableSchemaString);
}
- return value;
+ return tableSchema;
}
- }
-
- public static class StructLikeInternalRow implements StructLike {
- private final DataType[] types;
- private InternalRow row = null;
- StructLikeInternalRow(StructType struct) {
- this.types = new DataType[struct.size()];
- StructField[] fields = struct.fields();
- for (int i = 0; i < fields.length; i += 1) {
- types[i] = fields[i].dataType();
+ private Schema expectedSchema() {
+ if (expectedSchema == null) {
+ this.expectedSchema = SchemaParser.fromJson(expectedSchemaString);
}
- }
-
- public StructLikeInternalRow setRow(InternalRow row) {
- this.row = row;
- return this;
- }
-
- @Override
- public int size() {
- return types.length;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <T> T get(int pos, Class<T> javaClass) {
- return javaClass.cast(row.get(pos, types[pos]));
- }
-
- @Override
- public <T> void set(int pos, T value) {
- throw new UnsupportedOperationException("Not implemented: set");
- }
- }
-
-
- private static class IcebergRowReaderFactory implements PartitionReaderFactory {
- IcebergRowReaderFactory() {
- }
-
- @Override
- public PartitionReader<InternalRow> createReader(InputPartition inputPartition) {
- return new TaskDataReader(inputPartition);
+ return expectedSchema;
}
}
- public static class TaskDataReader implements PartitionReader<InternalRow> {
+ private static class TaskDataReader implements PartitionReader<InternalRow> {
// for some reason, the apply method can't be called from Java without reflection
private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
.impl(UnsafeProjection.class, InternalRow.class)
.build();
+ private final ReadTask split;
private final Iterator<FileScanTask> tasks;
- private final Schema tableSchema;
- private final Schema expectedSchema;
- private final FileIO fileIo;
private final Map<String, InputFile> inputFiles;
- private final boolean caseSensitive;
private Iterator<InternalRow> currentIterator = null;
private Closeable currentCloseable = null;
private InternalRow current = null;
- TaskDataReader(InputPartition inputPartition) {
- BatchReadInputPartition batchReadInputPartition = (BatchReadInputPartition) inputPartition;
+ TaskDataReader(ReadTask task) {
+ this.split = task;
+ this.tasks = task.files().iterator();
+ this.inputFiles = batchDecrypt(task.files());
- this.fileIo = batchReadInputPartition.fileIo;
- this.tableSchema = batchReadInputPartition.lazyTableSchema();
- this.expectedSchema = batchReadInputPartition.lazyExpectedSchema();
- this.tasks = batchReadInputPartition.task.files().iterator();
- Iterable<InputFile> decryptedFiles = batchReadInputPartition.encryptionManager.decrypt(
- Iterables.transform(batchReadInputPartition.task.files(),
- fileScanTask ->
- EncryptedFiles.encryptedInput(
- this.fileIo.newInputFile(fileScanTask.file().path().toString()),
- fileScanTask.file().keyMetadata())));
- ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
- decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
- this.inputFiles = inputFileBuilder.build();
- // open last because the schemas and fileIo must be set
+ // open last because the split must be set
this.currentIterator = open(tasks.next());
- this.caseSensitive = batchReadInputPartition.caseSensitive;
}
@Override
@@ -526,17 +345,31 @@ public class IcebergBatchScan implements Scan,
}
}
+ private Map<String, InputFile> batchDecrypt(Collection<FileScanTask> files) {
+ Iterable<InputFile> decryptedFiles = split.encryptionManager().decrypt(
+ Iterables.transform(files,
+ fileScanTask ->
+ EncryptedFiles.encryptedInput(
+ split.io().newInputFile(fileScanTask.file().path().toString()),
+ fileScanTask.file().keyMetadata())));
+
+ ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
+ decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
+ return inputFileBuilder.build();
+ }
+
private Iterator<InternalRow> open(FileScanTask task) {
DataFile file = task.file();
// schema or rows returned by readers
- Schema finalSchema = expectedSchema;
+ Schema finalSchema = split.expectedSchema();
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();
// schema needed for the projection and filtering
StructType sparkType = SparkSchemaUtil.convert(finalSchema);
- Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive);
+ Schema requiredSchema = SparkSchemaUtil.prune(
+ split.tableSchema(), sparkType, task.residual(), split.isCaseSensitive());
boolean hasJoinedPartitionColumns = !idColumns.isEmpty();
boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size();
@@ -647,7 +480,7 @@ public class IcebergBatchScan implements Scan,
.split(task.start(), task.length())
.createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
.filter(task.residual())
- .caseSensitive(caseSensitive)
+ .caseSensitive(split.isCaseSensitive())
.build();
}
@@ -658,17 +491,114 @@ public class IcebergBatchScan implements Scan,
.schema(readSchema)
.split(task.start(), task.length())
.createReaderFunc(SparkOrcReader::new)
- .caseSensitive(caseSensitive)
+ .caseSensitive(split.isCaseSensitive())
.build();
}
private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
- StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
+ StructInternalRow row = new StructInternalRow(split.tableSchema().asStruct());
CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
task.asDataTask().rows(), row::setStruct);
return CloseableIterable.transform(
- asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
+ asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, split.tableSchema()))::invoke);
+ }
+ }
+
+ private static class PartitionRowConverter implements Function<StructLike, InternalRow> {
+ private final DataType[] types;
+ private final int[] positions;
+ private final Class<?>[] javaTypes;
+ private final GenericInternalRow reusedRow;
+
+ PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) {
+ StructType partitionType = SparkSchemaUtil.convert(partitionSchema);
+ StructField[] fields = partitionType.fields();
+
+ this.types = new DataType[fields.length];
+ this.positions = new int[types.length];
+ this.javaTypes = new Class<?>[types.length];
+ this.reusedRow = new GenericInternalRow(types.length);
+
+ List<PartitionField> partitionFields = spec.fields();
+ for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) {
+ this.types[rowIndex] = fields[rowIndex].dataType();
+
+ int sourceId = partitionSchema.columns().get(rowIndex).fieldId();
+ for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) {
+ PartitionField field = spec.fields().get(specIndex);
+ if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) {
+ positions[rowIndex] = specIndex;
+ javaTypes[rowIndex] = spec.javaClasses()[specIndex];
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public InternalRow apply(StructLike tuple) {
+ for (int i = 0; i < types.length; i += 1) {
+ Object value = tuple.get(positions[i], javaTypes[i]);
+ if (value != null) {
+ reusedRow.update(i, convert(value, types[i]));
+ } else {
+ reusedRow.setNullAt(i);
+ }
+ }
+
+ return reusedRow;
+ }
+
+ /**
+ * Converts the objects into instances used by Spark's InternalRow.
+ *
+ * @param value a data value
+ * @param type the Spark data type
+ * @return the value converted to the representation expected by Spark's InternalRow.
+ */
+ private static Object convert(Object value, DataType type) {
+ if (type instanceof StringType) {
+ return UTF8String.fromString(value.toString());
+ } else if (type instanceof BinaryType) {
+ return ByteBuffers.toByteArray((ByteBuffer) value);
+ } else if (type instanceof DecimalType) {
+ return Decimal.fromDecimal(value);
+ }
+ return value;
}
}
+ private static class StructLikeInternalRow implements StructLike {
+ private final DataType[] types;
+ private InternalRow row = null;
+
+ StructLikeInternalRow(StructType struct) {
+ this.types = new DataType[struct.size()];
+ StructField[] fields = struct.fields();
+ for (int i = 0; i < fields.length; i += 1) {
+ types[i] = fields[i].dataType();
+ }
+ }
+
+ public StructLikeInternalRow setRow(InternalRow row) {
+ this.row = row;
+ return this;
+ }
+
+ @Override
+ public int size() {
+ return types.length;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> T get(int pos, Class<T> javaClass) {
+ return javaClass.cast(row.get(pos, types[pos]));
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("Not implemented: set");
+ }
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
similarity index 90%
rename from spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchWriter.java
rename to spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
index dfdf6b1..b0c1ded 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergBatchWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchWrite.java
@@ -48,7 +48,7 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
@@ -59,11 +59,11 @@ import org.apache.iceberg.spark.data.SparkParquetWriters;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,31 +81,30 @@ import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-class IcebergBatchWriter implements BatchWrite {
- private static final Logger LOG = LoggerFactory.getLogger(IcebergBatchWriter.class);
+class SparkBatchWrite implements BatchWrite {
+ private static final Logger LOG = LoggerFactory.getLogger(SparkBatchWrite.class);
private final Table table;
private final FileFormat format;
private final FileIO fileIo;
private final EncryptionManager encryptionManager;
- private final TableCapability writeBehavior;
+ private final boolean overwriteDynamic;
+ private final boolean overwriteByFilter;
+ private final Expression overwriteExpr;
private final String applicationId;
private final String wapId;
private final long targetFileSize;
private final Schema dsSchema;
- IcebergBatchWriter(
- Table table,
- CaseInsensitiveStringMap options,
- TableCapability writeBehavior,
- String applicationId,
- String wapId,
- Schema dsSchema) {
+ SparkBatchWrite(Table table, CaseInsensitiveStringMap options, boolean overwriteDynamic, boolean overwriteByFilter,
+ Expression overwriteExpr, String applicationId, String wapId, Schema dsSchema) {
this.table = table;
this.format = getFileFormat(table.properties(), options);
this.fileIo = table.io();
this.encryptionManager = table.encryption();
- this.writeBehavior = writeBehavior;
+ this.overwriteDynamic = overwriteDynamic;
+ this.overwriteByFilter = overwriteByFilter;
+ this.overwriteExpr = overwriteExpr;
this.applicationId = applicationId;
this.wapId = wapId;
this.dsSchema = dsSchema;
@@ -128,7 +127,7 @@ class IcebergBatchWriter implements BatchWrite {
}
@Override
- public DataWriterFactory createBatchWriterFactory() {
+ public WriterFactory createBatchWriterFactory() {
return new WriterFactory(
table.spec(), format, table.locationProvider(), table.properties(), fileIo, encryptionManager, targetFileSize,
dsSchema);
@@ -136,14 +135,12 @@ class IcebergBatchWriter implements BatchWrite {
@Override
public void commit(WriterCommitMessage[] messages) {
- if (writeBehavior.equals(TableCapability.OVERWRITE_DYNAMIC)) {
+ if (overwriteDynamic) {
replacePartitions(messages);
- } else if (writeBehavior.equals(TableCapability.BATCH_WRITE)) {
- append(messages);
- } else if (writeBehavior.equals(TableCapability.TRUNCATE)) {
+ } else if (overwriteByFilter) {
overwrite(messages);
} else {
- throw new IllegalArgumentException("Iceberg doen't support write behavior " + writeBehavior + " for now");
+ append(messages);
}
}
@@ -192,7 +189,7 @@ class IcebergBatchWriter implements BatchWrite {
private void overwrite(WriterCommitMessage[] messages) {
OverwriteFiles overwriteFiles = table.newOverwrite();
- overwriteFiles.overwriteByRowFilter(Expressions.alwaysTrue());
+ overwriteFiles.overwriteByRowFilter(overwriteExpr);
int numFiles = 0;
for (DataFile file : files(messages)) {
@@ -200,7 +197,7 @@ class IcebergBatchWriter implements BatchWrite {
overwriteFiles.addFile(file);
}
- commitOperation(overwriteFiles, numFiles, "overwrite by filter or truncate");
+ commitOperation(overwriteFiles, numFiles, "overwrite by filter");
}
@Override
@@ -266,7 +263,7 @@ class IcebergBatchWriter implements BatchWrite {
}
}
- protected static class WriterFactory implements DataWriterFactory {
+ private static class WriterFactory implements DataWriterFactory, StreamingDataWriterFactory {
private final PartitionSpec spec;
private final FileFormat format;
private final LocationProvider locations;
@@ -276,15 +273,9 @@ class IcebergBatchWriter implements BatchWrite {
private final long targetFileSize;
private final Schema dsSchema;
- WriterFactory(
- PartitionSpec spec,
- FileFormat format,
- LocationProvider locations,
- Map<String, String> properties,
- FileIO fileIo,
- EncryptionManager encryptionManager,
- long targetFileSize,
- Schema dsSchema) {
+ WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
+ Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
+ long targetFileSize, Schema dsSchema) {
this.spec = spec;
this.format = format;
this.locations = locations;
@@ -295,20 +286,13 @@ class IcebergBatchWriter implements BatchWrite {
this.dsSchema = dsSchema;
}
- public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
- OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, epochId);
- AppenderFactory<InternalRow> appenderFactory = new SparkAppenderFactory();
- if (spec.fields().isEmpty()) {
- return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
- } else {
- return new PartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
- }
+ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
+ return createWriter(partitionId, taskId, 0);
}
- public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
- OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, 0);
+ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
+ OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, epochId);
AppenderFactory<InternalRow> appenderFactory = new SparkAppenderFactory();
-
if (spec.fields().isEmpty()) {
return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
} else {
@@ -406,13 +390,8 @@ class IcebergBatchWriter implements BatchWrite {
private EncryptedOutputFile currentFile = null;
private long currentRows = 0;
- BaseWriter(
- PartitionSpec spec,
- FileFormat format,
- AppenderFactory<InternalRow> appenderFactory,
- WriterFactory.OutputFileFactory fileFactory,
- FileIO fileIo,
- long targetFileSize) {
+ BaseWriter(PartitionSpec spec, FileFormat format, AppenderFactory<InternalRow> appenderFactory,
+ WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
this.spec = spec;
this.format = format;
this.appenderFactory = appenderFactory;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
new file mode 100644
index 0000000..cb932ff
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.spark.SparkFilters;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
+import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns {
+ private static final Filter[] NO_FILTERS = new Filter[0];
+
+ private final SparkSession spark;
+ private final Table table;
+ private final CaseInsensitiveStringMap options;
+
+ private Schema schema;
+ private boolean caseSensitive;
+ private List<Expression> filterExpressions = null;
+ private Filter[] pushedFilters = NO_FILTERS;
+
+ SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
+ this.spark = spark;
+ this.table = table;
+ this.options = options;
+ this.schema = table.schema();
+ this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
+ }
+
+ public SparkScanBuilder caseSensitive(boolean isCaseSensitive) {
+ this.caseSensitive = isCaseSensitive;
+ return this;
+ }
+
+ @Override
+ public Filter[] pushFilters(Filter[] filters) {
+ List<Expression> expressions = Lists.newArrayListWithExpectedSize(filters.length);
+ List<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length);
+
+ for (Filter filter : filters) {
+ Expression expr = SparkFilters.convert(filter);
+ if (expr != null) {
+ expressions.add(expr);
+ pushed.add(filter);
+ }
+ }
+
+ this.filterExpressions = expressions;
+ this.pushedFilters = pushed.toArray(new Filter[0]);
+
+ // Spark doesn't support residuals per task, so return all filters
+ // to get Spark to handle record-level filtering
+ return filters;
+ }
+
+ @Override
+ public Filter[] pushedFilters() {
+ return pushedFilters;
+ }
+
+ @Override
+ public void pruneColumns(StructType requestedSchema) {
+ this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema);
+ }
+
+ public StructType readSchema() {
+ return SparkSchemaUtil.convert(schema);
+ }
+
+ @Override
+ public Scan build() {
+ return new SparkBatchScan(table, caseSensitive, schema, filterExpressions, options);
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergStreamingWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkStreamingWrite.java
similarity index 64%
rename from spark/src/main/java/org/apache/iceberg/spark/source/IcebergStreamingWriter.java
rename to spark/src/main/java/org/apache/iceberg/spark/source/SparkStreamingWrite.java
index e1d0dfb..5781534 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergStreamingWriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkStreamingWrite.java
@@ -35,7 +35,6 @@ import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
@@ -44,58 +43,31 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+public class SparkStreamingWrite extends SparkBatchWrite implements StreamingWrite {
-public class IcebergStreamingWriter extends IcebergBatchWriter implements StreamingWrite {
-
- private static final Logger LOG = LoggerFactory.getLogger(IcebergStreamingWriter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingWrite.class);
private static final String QUERY_ID_PROPERTY = "spark.sql.streaming.queryId";
private static final String EPOCH_ID_PROPERTY = "spark.sql.streaming.epochId";
+ private final boolean truncateBatches;
private final String queryId;
- private final TableCapability writeBehavior;
- private final Table table;
- private final long targetFileSize;
- private final FileFormat format;
- private final Schema dsSchema;
- IcebergStreamingWriter(Table table, CaseInsensitiveStringMap options, String queryId, TableCapability writeBehavior,
- String applicationId, String wapId, Schema dsSchema) {
- super(table, options, writeBehavior, applicationId, wapId, dsSchema);
+ SparkStreamingWrite(Table table, CaseInsensitiveStringMap options, boolean truncateBatches, String queryId,
+ String applicationId, String wapId, Schema dsSchema) {
+ super(table, options, false, truncateBatches, Expressions.alwaysTrue(), applicationId, wapId, dsSchema);
+ this.truncateBatches = truncateBatches;
this.queryId = queryId;
- this.writeBehavior = writeBehavior;
- this.table = table;
- this.format = getFileFormat(table.properties(), options);
- long tableTargetFileSize = PropertyUtil.propertyAsLong(
- table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize);
- this.dsSchema = dsSchema;
}
@Override
public StreamingDataWriterFactory createStreamingWriterFactory() {
- return new StreamingWriterFactory(table.spec(), format, table.locationProvider(),
- table.properties(), table.io(), table.encryption(), targetFileSize, dsSchema);
- }
-
- private static class StreamingWriterFactory extends WriterFactory implements StreamingDataWriterFactory {
-
- StreamingWriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
- Map<String, String> properties, FileIO fileIo, EncryptionManager encryptionManager,
- long targetFileSize, Schema dsSchema) {
- super(spec, format, locations, properties, fileIo, encryptionManager, targetFileSize, dsSchema);
- }
-
- @Override
- public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
- return super.createWriter(partitionId, taskId, epochId);
- }
+ // the writer factory works for both batch and streaming
+ return createBatchWriterFactory();
}
@Override
public void commit(long epochId, WriterCommitMessage[] messages) {
- LOG.info("Committing epoch {} for query {} in {} mode", epochId, queryId, writeBehavior);
+ LOG.info("Committing epoch {} for query {} in {} mode", epochId, queryId, truncateBatches ? "complete" : "append");
table().refresh();
Long lastCommittedEpochId = getLastCommittedEpochId();
@@ -104,7 +76,7 @@ public class IcebergStreamingWriter extends IcebergBatchWriter implements Stream
return;
}
- if (writeBehavior.equals(TableCapability.TRUNCATE)) {
+ if (truncateBatches) {
OverwriteFiles overwriteFiles = table().newOverwrite();
overwriteFiles.overwriteByRowFilter(Expressions.alwaysTrue());
int numFiles = 0;
@@ -113,7 +85,7 @@ public class IcebergStreamingWriter extends IcebergBatchWriter implements Stream
numFiles++;
}
commit(overwriteFiles, epochId, numFiles, "streaming complete overwrite");
- } else if (writeBehavior.equals(TableCapability.STREAMING_WRITE)) {
+ } else {
AppendFiles append = table().newFastAppend();
int numFiles = 0;
for (DataFile file : files(messages)) {
@@ -121,8 +93,6 @@ public class IcebergStreamingWriter extends IcebergBatchWriter implements Stream
numFiles++;
}
commit(append, epochId, numFiles, "streaming append");
- } else {
- throw new IllegalArgumentException("Iceberg doen't support write behavior " + writeBehavior + " for now");
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
new file mode 100644
index 0000000..f85aac2
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.SupportsWrite;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkTable implements org.apache.spark.sql.connector.catalog.Table, SupportsRead, SupportsWrite {
+
+ private static final Set<TableCapability> CAPABILITIES = ImmutableSet.of(
+ TableCapability.BATCH_READ,
+ TableCapability.BATCH_WRITE,
+ TableCapability.STREAMING_WRITE,
+ TableCapability.OVERWRITE_BY_FILTER,
+ TableCapability.OVERWRITE_DYNAMIC);
+
+ private final Table icebergTable;
+ private final StructType requestedSchema;
+ private StructType lazyTableSchema = null;
+ private SparkSession lazySpark = null;
+
+ SparkTable(Table icebergTable, StructType requestedSchema) {
+ this.icebergTable = icebergTable;
+ this.requestedSchema = requestedSchema;
+
+ if (requestedSchema != null) {
+ // convert the requested schema to throw an exception if any requested fields are unknown
+ SparkSchemaUtil.convert(icebergTable.schema(), requestedSchema);
+ }
+ }
+
+ private SparkSession sparkSession() {
+ if (lazySpark == null) {
+ this.lazySpark = SparkSession.active();
+ }
+
+ return lazySpark;
+ }
+
+ @Override
+ public String name() {
+ return icebergTable.toString();
+ }
+
+ @Override
+ public StructType schema() {
+ if (lazyTableSchema == null) {
+ if (requestedSchema != null) {
+ this.lazyTableSchema = SparkSchemaUtil.convert(SparkSchemaUtil.prune(icebergTable.schema(), requestedSchema));
+ } else {
+ this.lazyTableSchema = SparkSchemaUtil.convert(icebergTable.schema());
+ }
+ }
+
+ return lazyTableSchema;
+ }
+
+ @Override
+ public Transform[] partitioning() {
+ return SparkUtil.toTransforms(icebergTable.spec());
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return icebergTable.properties();
+ }
+
+ @Override
+ public Set<TableCapability> capabilities() {
+ return CAPABILITIES;
+ }
+
+ @Override
+ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+ SparkScanBuilder scanBuilder = new SparkScanBuilder(sparkSession(), icebergTable, options);
+ if (requestedSchema != null) {
+ scanBuilder.pruneColumns(requestedSchema);
+ }
+
+ return scanBuilder;
+ }
+
+ @Override
+ public WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options) {
+ return new SparkWriteBuilder(sparkSession(), icebergTable, options);
+ }
+
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
new file mode 100644
index 0000000..3f1b48b
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.spark.SparkFilters;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.transforms.UnknownTransform;
+import org.apache.iceberg.types.CheckCompatibility;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.write.BatchWrite;
+import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite;
+import org.apache.spark.sql.connector.write.SupportsOverwrite;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
+import org.apache.spark.sql.sources.Filter;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, SupportsOverwrite {
+
+ private final SparkSession spark;
+ private final Table table;
+ private final CaseInsensitiveStringMap options;
+ private final String overwriteMode;
+ private boolean overwriteDynamic = false;
+ private boolean overwriteByFilter = false;
+ private Expression overwriteExpr = null;
+ private String writeQueryId = null;
+ private StructType writeSchema = null;
+
+ SparkWriteBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
+ this.spark = spark;
+ this.table = table;
+ this.options = options;
+ this.overwriteMode = options.containsKey("overwrite-mode") ?
+ options.get("overwrite-mode").toLowerCase(Locale.ROOT) : null;
+ }
+
+ @Override
+ public WriteBuilder withQueryId(String queryId) {
+ this.writeQueryId = queryId;
+ return this;
+ }
+
+ @Override
+ public WriteBuilder withInputDataSchema(StructType schemaInput) {
+ this.writeSchema = schemaInput;
+ return this;
+ }
+
+ @Override
+ public WriteBuilder overwriteDynamicPartitions() {
+ Preconditions.checkState(!overwriteByFilter, "Cannot overwrite dynamically and by filter: %s", overwriteExpr);
+ this.overwriteDynamic = true;
+ return this;
+ }
+
+ @Override
+ public WriteBuilder overwrite(Filter[] filters) {
+ this.overwriteExpr = SparkFilters.convert(filters);
+ if (overwriteExpr == Expressions.alwaysTrue() && "dynamic".equals(overwriteMode)) {
+ // use the write option to override truncating the table. use dynamic overwrite instead.
+ this.overwriteDynamic = true;
+ } else {
+ Preconditions.checkState(!overwriteDynamic, "Cannot overwrite dynamically and by filter: %s", overwriteExpr);
+ this.overwriteByFilter = true;
+ }
+ return this;
+ }
+
+ @Override
+ public BatchWrite buildForBatch() {
+ // Validate
+ Schema dsSchema = SparkSchemaUtil.convert(table.schema(), writeSchema);
+ validateWriteSchema(table.schema(), dsSchema, checkNullability(options));
+ validatePartitionTransforms(table.spec());
+
+ // Get application id
+ String appId = spark.sparkContext().applicationId();
+
+ // Get write-audit-publish id
+ String wapId = spark.conf().get("spark.wap.id", null);
+
+ return new SparkBatchWrite(table,
+ options, overwriteDynamic, overwriteByFilter, overwriteExpr, appId, wapId, dsSchema);
+ }
+
+ @Override
+ public StreamingWrite buildForStreaming() {
+ // Validate
+ Schema dsSchema = SparkSchemaUtil.convert(table.schema(), writeSchema);
+ validateWriteSchema(table.schema(), dsSchema, checkNullability(options));
+ validatePartitionTransforms(table.spec());
+
+ // Change to streaming write if it is just append
+ Preconditions.checkState(!overwriteDynamic,
+ "Unsupported streaming operation: dynamic partition overwrite");
+ Preconditions.checkState(!overwriteByFilter || overwriteExpr == Expressions.alwaysTrue(),
+ "Unsupported streaming operation: overwrite by filter: %s", overwriteExpr);
+
+ // Get application id
+ String appId = spark.sparkContext().applicationId();
+ String wapId = spark.conf().get("spark.wap.id", null);
+
+ return new SparkStreamingWrite(table, options, overwriteByFilter, writeQueryId, appId, wapId, dsSchema);
+ }
+
+ private void validateWriteSchema(Schema tableSchema, Schema dsSchema, Boolean checkNullability) {
+ List<String> errors;
+ if (checkNullability) {
+ errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
+ } else {
+ errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema);
+ }
+ if (!errors.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Cannot write incompatible dataset to table with schema:\n")
+ .append(tableSchema)
+ .append("\nProblems:");
+ for (String error : errors) {
+ sb.append("\n* ").append(error);
+ }
+ throw new IllegalArgumentException(sb.toString());
+ }
+ }
+
+ private void validatePartitionTransforms(PartitionSpec spec) {
+ if (spec.fields().stream().anyMatch(field -> field.transform() instanceof UnknownTransform)) {
+ String unsupported = spec.fields().stream()
+ .map(PartitionField::transform)
+ .filter(transform -> transform instanceof UnknownTransform)
+ .map(org.apache.iceberg.transforms.Transform::toString)
+ .collect(Collectors.joining(", "));
+
+ throw new UnsupportedOperationException(
+ String.format("Cannot write using unsupported transforms: %s", unsupported));
+ }
+ }
+
+ private boolean checkNullability(CaseInsensitiveStringMap options) {
+ boolean sparkCheckNullability = Boolean.parseBoolean(spark.conf()
+ .get("spark.sql.iceberg.check-nullability", "true"));
+ boolean dataFrameCheckNullability = options.getBoolean("check-nullability", true);
+ return sparkCheckNullability && dataFrameCheckNullability;
+ }
+}
diff --git a/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 04e5f32..01a6c4e 100644
--- a/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -17,4 +17,4 @@
# under the License.
#
-org.apache.iceberg.spark.source.IcebergTableProvider
+org.apache.iceberg.spark.source.IcebergSource
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
index 9ff583d..85d2627 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
@@ -53,8 +53,8 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -221,9 +221,10 @@ public class TestDataFrameWrites extends AvroDataTest {
return spark.internalCreateDataFrame(JavaRDD.toRDD(rdd), convert(schema), false);
}
- // This fails due to SPARK-28730
- @Ignore
+ @Test
public void testNullableWithWriteOption() throws IOException {
+ Assume.assumeTrue("Spark 3.0 rejects writing nulls to a required column", spark.version().startsWith("2"));
+
File location = new File(temp.newFolder("parquet"), "test");
String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location.toString());
String targetPath = String.format("%s/nullable_poc/targetFolder/", location.toString());
@@ -266,9 +267,10 @@ public class TestDataFrameWrites extends AvroDataTest {
}
- // This fails due to SPARK-28730
- @Ignore
+ @Test
public void testNullableWithSparkSqlOption() throws IOException {
+ Assume.assumeTrue("Spark 3.0 rejects writing nulls to a required column", spark.version().startsWith("2"));
+
File location = new File(temp.newFolder("parquet"), "test");
String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location.toString());
String targetPath = String.format("%s/nullable_poc/targetFolder/", location.toString());
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index 991e01d..ead9a27 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -51,8 +51,10 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
import org.apache.spark.sql.sources.And;
import org.apache.spark.sql.sources.EqualTo;
@@ -212,10 +214,11 @@ public class TestFilteredScan {
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of(
"path", unpartitioned.toString())
);
- IcebergBatchScan scan = new IcebergBatchScan(TABLES.load(options.get("path")), options);
+ SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
for (int i = 0; i < 10; i += 1) {
- pushFilters(scan, EqualTo.apply("id", i));
+ pushFilters(builder, EqualTo.apply("id", i));
+ Batch scan = builder.build().toBatch();
InputPartition[] partitions = scan.planInputPartitions();
Assert.assertEquals("Should only create one task for a small file", 1, partitions.length);
@@ -239,9 +242,11 @@ public class TestFilteredScan {
try {
for (int i = 0; i < 10; i += 1) {
- IcebergBatchScan scan = new IcebergBatchScan(TABLES.load(options.get("path")), false, options);
+ SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options)
+ .caseSensitive(false);
- pushFilters(scan, EqualTo.apply("ID", i)); // note lower(ID) == lower(id), so there must be a match
+ pushFilters(builder, EqualTo.apply("ID", i)); // note lower(ID) == lower(id), so there must be a match
+ Batch scan = builder.build().toBatch();
InputPartition[] tasks = scan.planInputPartitions();
Assert.assertEquals("Should only create one task for a small file", 1, tasks.length);
@@ -262,9 +267,10 @@ public class TestFilteredScan {
"path", unpartitioned.toString())
);
- IcebergBatchScan scan = new IcebergBatchScan(TABLES.load(options.get("path")), options);
+ SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
- pushFilters(scan, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+ pushFilters(builder, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+ Batch scan = builder.build().toBatch();
InputPartition[] tasks = scan.planInputPartitions();
Assert.assertEquals("Should only create one task for a small file", 1, tasks.length);
@@ -278,14 +284,15 @@ public class TestFilteredScan {
Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID, "bucket4", "id");
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location()));
- IcebergBatchScan unfiltered = new IcebergBatchScan(table, options);
+ Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options).build().toBatch();
Assert.assertEquals("Unfiltered table should created 4 read tasks",
4, unfiltered.planInputPartitions().length);
for (int i = 0; i < 10; i += 1) {
- IcebergBatchScan scan = new IcebergBatchScan(table, options);
+ SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
- pushFilters(scan, EqualTo.apply("id", i));
+ pushFilters(builder, EqualTo.apply("id", i));
+ Batch scan = builder.build().toBatch();
InputPartition[] tasks = scan.planInputPartitions();
@@ -302,15 +309,16 @@ public class TestFilteredScan {
public void testDayPartitionedTimestampFilters() {
Table table = buildPartitionedTable("partitioned_by_day", PARTITION_BY_DAY, "ts_day", "ts");
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location()));
- IcebergBatchScan unfiltered = new IcebergBatchScan(table, options);
+ Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options).build().toBatch();
Assert.assertEquals("Unfiltered table should created 2 read tasks",
2, unfiltered.planInputPartitions().length);
{
- IcebergBatchScan scan = new IcebergBatchScan(table, options);
+ SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
- pushFilters(scan, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+ pushFilters(builder, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+ Batch scan = builder.build().toBatch();
InputPartition[] tasks = scan.planInputPartitions();
Assert.assertEquals("Should create one task for 2017-12-21", 1, tasks.length);
@@ -320,11 +328,12 @@ public class TestFilteredScan {
}
{
- IcebergBatchScan scan = new IcebergBatchScan(table, options);
+ SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
- pushFilters(scan, And.apply(
+ pushFilters(builder, And.apply(
GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"),
LessThan.apply("ts", "2017-12-22T08:00:00+00:00")));
+ Batch scan = builder.build().toBatch();
InputPartition[] tasks = scan.planInputPartitions();
Assert.assertEquals("Should create one task for 2017-12-22", 1, tasks.length);
@@ -341,29 +350,31 @@ public class TestFilteredScan {
Table table = buildPartitionedTable("partitioned_by_hour", PARTITION_BY_HOUR, "ts_hour", "ts");
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location()));
- IcebergBatchScan unfiltered = new IcebergBatchScan(table, options);
+ Batch unfiltered = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options).build().toBatch();
Assert.assertEquals("Unfiltered table should created 9 read tasks",
9, unfiltered.planInputPartitions().length);
{
- IcebergBatchScan scan = new IcebergBatchScan(table, options);
+ SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
- pushFilters(scan, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+ pushFilters(builder, LessThan.apply("ts", "2017-12-22T00:00:00+00:00"));
+ Batch scan = builder.build().toBatch();
InputPartition[] tasks = scan.planInputPartitions();
Assert.assertEquals("Should create 4 tasks for 2017-12-21: 15, 17, 21, 22", 4, tasks.length);
assertEqualsSafe(SCHEMA.asStruct(), expected(8, 9, 7, 6, 5),
- read(table.location().toString(), "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
+ read(table.location(), "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
}
{
- IcebergBatchScan scan = new IcebergBatchScan(table, options);
+ SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
- pushFilters(scan, And.apply(
+ pushFilters(builder, And.apply(
GreaterThan.apply("ts", "2017-12-22T06:00:00+00:00"),
LessThan.apply("ts", "2017-12-22T08:00:00+00:00")));
+ Batch scan = builder.build().toBatch();
InputPartition[] tasks = scan.planInputPartitions();
Assert.assertEquals("Should create 2 tasks for 2017-12-22: 6, 7", 2, tasks.length);
@@ -412,9 +423,10 @@ public class TestFilteredScan {
Table table = buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, "data_ident", "data");
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(ImmutableMap.of("path", table.location()));
- IcebergBatchScan scan = new IcebergBatchScan(table, options);
+ SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
- pushFilters(scan, new StringStartsWith("data", "junc"));
+ pushFilters(builder, new StringStartsWith("data", "junc"));
+ Batch scan = builder.build().toBatch();
Assert.assertEquals(1, scan.planInputPartitions().length);
}
@@ -427,8 +439,10 @@ public class TestFilteredScan {
"path", table.location())
);
- IcebergBatchScan scan = new IcebergBatchScan(table, options);
- pushFilters(scan, new StringStartsWith("data", "junc"));
+ SparkScanBuilder builder = new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
+
+ pushFilters(builder, new StringStartsWith("data", "junc"));
+ Batch scan = builder.build().toBatch();
Assert.assertEquals(1, scan.planInputPartitions().length);
}
@@ -487,7 +501,7 @@ public class TestFilteredScan {
return expected;
}
- private void pushFilters(Scan scan, Filter... filters) {
+ private void pushFilters(ScanBuilder scan, Filter... filters) {
Assert.assertTrue(scan instanceof SupportsPushDownFilters);
SupportsPushDownFilters filterable = (SupportsPushDownFilters) scan;
filterable.pushFilters(filters);
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
index fdf5daa..dd820fa 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
@@ -20,16 +20,17 @@
package org.apache.iceberg.spark.source;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Table;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-public class TestIcebergSource extends IcebergTableProvider {
+public class TestIcebergSource extends IcebergSource {
@Override
public String shortName() {
return "iceberg-test";
}
@Override
- public org.apache.iceberg.Table findTable(CaseInsensitiveStringMap options, Configuration conf) {
+ protected Table findTable(CaseInsensitiveStringMap options, Configuration conf) {
return TestTables.load(options.get("iceberg.table.name"));
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
index da936ed..31ce025 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java
@@ -39,8 +39,8 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -164,8 +164,7 @@ public class TestParquetWrite {
Assert.assertEquals("Result rows should match", expected, actual);
}
- // ignore due to spark default use static
- @Ignore
+ @Test
public void testOverwrite() throws IOException {
File parent = temp.newFolder("parquet");
File location = new File(parent, "test");
@@ -197,7 +196,7 @@ public class TestParquetWrite {
// overwrite with 2*id to replace record 2, append 4 and 6
df.withColumn("id", df.col("id").multiply(2)).select("id", "data").write()
- .option("partitionOverwriteMode", "dynamic")
+ .option("overwrite-mode", "dynamic")
.format("iceberg")
.mode("overwrite")
.save(location.toString());
@@ -342,9 +341,12 @@ public class TestParquetWrite {
Assert.assertTrue("All DataFiles contain 1000 rows", files.stream().allMatch(d -> d.recordCount() == 1000));
}
- // This fails due to SPARK-28730
- @Ignore
+ @Test
public void testWriteProjection() throws IOException {
+ Assume.assumeTrue(
+ "Not supported in Spark 3.0; analysis requires all columns are present",
+ spark.version().startsWith("2"));
+
File parent = temp.newFolder("parquet");
File location = new File(parent, "test");
@@ -376,9 +378,12 @@ public class TestParquetWrite {
Assert.assertEquals("Result rows should match", expected, actual);
}
- // This fails due to SPARK-28730
- @Ignore
+ @Test
public void testWriteProjectionWithMiddle() throws IOException {
+ Assume.assumeTrue(
+ "Not supported in Spark 3.0; analysis requires all columns are present",
+ spark.version().startsWith("2"));
+
File parent = temp.newFolder("parquet");
File location = new File(parent, "test");
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
index 9550d00..b32ce7f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
@@ -41,7 +41,6 @@ import org.apache.spark.sql.streaming.StreamingQueryException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -200,8 +199,7 @@ public class TestStructuredStreaming {
}
}
- // This fails due to SPARK-28730
- @Ignore
+ @Test
public void testStreamingWriteCompleteModeWithProjection() throws IOException {
File parent = temp.newFolder("parquet");
File location = new File(parent, "test-table");