You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/13 01:39:57 UTC
[incubator-iceberg] branch master updated: Spark: Add option to
allow writing optional to required fields (#514)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new cd7cd7b Spark: Add option to allow writing optional to required fields (#514)
cd7cd7b is described below
commit cd7cd7b7169c04c15c71f147a37fbe8b23ff95a2
Author: Andrei Ionescu <we...@gmail.com>
AuthorDate: Wed Nov 13 03:39:48 2019 +0200
Spark: Add option to allow writing optional to required fields (#514)
---
.../apache/iceberg/types/CheckCompatibility.java | 25 +++-
.../iceberg/types/TestReadabilityChecks.java | 22 ++++
site/docs/configuration.md | 1 +
.../apache/iceberg/spark/source/IcebergSource.java | 20 +++-
.../iceberg/spark/source/TestDataFrameWrites.java | 129 +++++++++++++++++++++
5 files changed, 189 insertions(+), 8 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java b/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java
index 07a84f2..89ea1b0 100644
--- a/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java
+++ b/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java
@@ -32,13 +32,28 @@ import org.apache.iceberg.Schema;
public class CheckCompatibility extends TypeUtil.CustomOrderSchemaVisitor<List<String>> {
/**
* Returns a list of compatibility errors for writing with the given write schema.
+ * This includes nullability: writing optional (nullable) values to a required field is an error.
*
* @param readSchema a read schema
* @param writeSchema a write schema
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> writeCompatibilityErrors(Schema readSchema, Schema writeSchema) {
- return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, true));
+ return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, true, true));
+ }
+
+ /**
+ * Returns a list of compatibility errors for writing with the given write schema.
+ * This checks type compatibility and not nullability: writing optional (nullable) values
+ * to a required field is not an error. To check nullability as well as types,
+ * use {@link #writeCompatibilityErrors(Schema, Schema)}.
+ *
+ * @param readSchema a read schema
+ * @param writeSchema a write schema
+ * @return a list of error details, or an empty list if there are no compatibility problems
+ */
+ public static List<String> typeCompatibilityErrors(Schema readSchema, Schema writeSchema) {
+ return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, true, false));
}
/**
@@ -49,20 +64,22 @@ public class CheckCompatibility extends TypeUtil.CustomOrderSchemaVisitor<List<S
* @return a list of error details, or an empty list if there are no compatibility problems
*/
public static List<String> readCompatibilityErrors(Schema readSchema, Schema writeSchema) {
- return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, false));
+ return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, false, true));
}
private static final ImmutableList<String> NO_ERRORS = ImmutableList.of();
private final Schema schema;
private final boolean checkOrdering;
+ private final boolean checkNullability;
// the current file schema, maintained while traversing a write schema
private Type currentType;
- private CheckCompatibility(Schema schema, boolean checkOrdering) {
+ private CheckCompatibility(Schema schema, boolean checkOrdering, boolean checkNullability) {
this.schema = schema;
this.checkOrdering = checkOrdering;
+ this.checkNullability = checkNullability;
}
@Override
@@ -132,7 +149,7 @@ public class CheckCompatibility extends TypeUtil.CustomOrderSchemaVisitor<List<S
this.currentType = field.type();
try {
- if (readField.isRequired() && field.isOptional()) {
+ if (checkNullability && readField.isRequired() && field.isOptional()) {
errors.add(readField.name() + " should be required, but is optional");
}
diff --git a/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java b/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java
index c2af6bf..cb5325d 100644
--- a/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java
+++ b/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java
@@ -385,4 +385,26 @@ public class TestReadabilityChecks {
Assert.assertNotNull(schema.caseInsensitiveSelect("LoCaTiOnS.LaT").findField(1));
Assert.assertNotNull(schema.caseInsensitiveSelect("locations.LONG").findField(2));
}
+
+ @Test
+ public void testCheckNullabilityRequiredSchemaField() {
+ Schema write = new Schema(optional(1, "from_field", Types.IntegerType.get()));
+ Schema read = new Schema(required(1, "to_field", Types.IntegerType.get()));
+
+ List<String> errors = CheckCompatibility.typeCompatibilityErrors(read, write);
+ Assert.assertEquals("Should produce no error messages", 0, errors.size());
+ }
+
+ @Test
+ public void testCheckNullabilityRequiredStructField() {
+ Schema write = new Schema(required(0, "nested", Types.StructType.of(
+ optional(1, "from_field", Types.IntegerType.get())
+ )));
+ Schema read = new Schema(required(0, "nested", Types.StructType.of(
+ required(1, "to_field", Types.IntegerType.get())
+ )));
+
+ List<String> errors = CheckCompatibility.typeCompatibilityErrors(read, write);
+ Assert.assertEquals("Should produce no error messages", 0, errors.size());
+ }
}
diff --git a/site/docs/configuration.md b/site/docs/configuration.md
index bc412d7..abec8ea 100644
--- a/site/docs/configuration.md
+++ b/site/docs/configuration.md
@@ -94,4 +94,5 @@ df.write
| ------------ | -------------------------- | ------------------------------------------------------------ |
| write-format | Table write.format.default | File format to use for this write operation; parquet or avro |
| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
+| check-nullability | true | Sets the nullable check on fields |
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 6cb83d0..7fe25a8 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -78,7 +78,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
"Save mode %s is not supported", mode);
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
- validateWriteSchema(table.schema(), dsStruct);
+ validateWriteSchema(table.schema(), dsStruct, checkNullability(options));
validatePartitionTransforms(table.spec());
String appId = lazySparkSession().sparkContext().applicationId();
String wapId = lazySparkSession().conf().get("spark.wap.id", null);
@@ -93,7 +93,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
"Output mode %s is not supported", mode);
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
- validateWriteSchema(table.schema(), dsStruct);
+ validateWriteSchema(table.schema(), dsStruct, checkNullability(options));
validatePartitionTransforms(table.spec());
// Spark 2.4.x passes runId to createStreamWriter instead of real queryId,
// so we fetch it directly from sparkContext to make writes idempotent
@@ -149,9 +149,14 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
}
- private void validateWriteSchema(Schema tableSchema, StructType dsStruct) {
+ private void validateWriteSchema(Schema tableSchema, StructType dsStruct, Boolean checkNullability) {
Schema dsSchema = SparkSchemaUtil.convert(tableSchema, dsStruct);
- List<String> errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
+ List<String> errors;
+ if (checkNullability) {
+ errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
+ } else {
+ errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema);
+ }
if (!errors.isEmpty()) {
StringBuilder sb = new StringBuilder();
sb.append("Cannot write incompatible dataset to table with schema:\n")
@@ -176,4 +181,11 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
String.format("Cannot write using unsupported transforms: %s", unsupported));
}
}
+
+ private boolean checkNullability(DataSourceOptions options) {
+ boolean sparkCheckNullability = Boolean.parseBoolean(lazySpark.conf()
+ .get("spark.sql.iceberg.check-nullability", "true"));
+ boolean dataFrameCheckNullability = options.getBoolean("check-nullability", true);
+ return sparkCheckNullability && dataFrameCheckNullability;
+ }
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
index 22bd419..4f4843e 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
@@ -19,11 +19,14 @@
package org.apache.iceberg.spark.source;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Files;
@@ -35,14 +38,17 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.AvroDataTest;
import org.apache.iceberg.spark.data.RandomData;
import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.types.Types;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.AfterClass;
@@ -77,6 +83,35 @@ public class TestDataFrameWrites extends AvroDataTest {
private static SparkSession spark = null;
private static JavaSparkContext sc = null;
+ private Map<String, String> tableProperties;
+
+ private org.apache.spark.sql.types.StructType sparkSchema = new org.apache.spark.sql.types.StructType(
+ new org.apache.spark.sql.types.StructField[] {
+ new org.apache.spark.sql.types.StructField(
+ "optionalField",
+ org.apache.spark.sql.types.DataTypes.StringType,
+ true,
+ org.apache.spark.sql.types.Metadata.empty()),
+ new org.apache.spark.sql.types.StructField(
+ "requiredField",
+ org.apache.spark.sql.types.DataTypes.StringType,
+ false,
+ org.apache.spark.sql.types.Metadata.empty())
+ });
+
+ private Schema icebergSchema = new Schema(
+ Types.NestedField.optional(1, "optionalField", Types.StringType.get()),
+ Types.NestedField.required(2, "requiredField", Types.StringType.get()));
+
+ private List<String> data0 = Arrays.asList(
+ "{\"optionalField\": \"a1\", \"requiredField\": \"bid_001\"}",
+ "{\"optionalField\": \"a2\", \"requiredField\": \"bid_002\"}");
+ private List<String> data1 = Arrays.asList(
+ "{\"optionalField\": \"d1\", \"requiredField\": \"bid_101\"}",
+ "{\"optionalField\": \"d2\", \"requiredField\": \"bid_102\"}",
+ "{\"optionalField\": \"d3\", \"requiredField\": \"bid_103\"}",
+ "{\"optionalField\": \"d4\", \"requiredField\": \"bid_104\"}");
+
@BeforeClass
public static void startSpark() {
TestDataFrameWrites.spark = SparkSession.builder().master("local[2]").getOrCreate();
@@ -184,4 +219,98 @@ public class TestDataFrameWrites extends AvroDataTest {
JavaRDD<InternalRow> rdd = sc.parallelize(rows);
return spark.internalCreateDataFrame(JavaRDD.toRDD(rdd), convert(schema), false);
}
+
+ @Test
+ public void testNullableWithWriteOption() throws IOException {
+ File location = new File(temp.newFolder("parquet"), "test");
+ String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location.toString());
+ String targetPath = String.format("%s/nullable_poc/targetFolder/", location.toString());
+
+ tableProperties = ImmutableMap.of(TableProperties.WRITE_NEW_DATA_LOCATION, targetPath);
+
+ spark = SparkSession.builder()
+ .master("local[2]")
+ .appName("NullableTest")
+ .getOrCreate();
+
+ // read this and append to iceberg dataset
+ spark
+ .read().schema(sparkSchema).json(
+ JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data1))
+ .write().parquet(sourcePath);
+
+ // this is our iceberg dataset to which we will append data
+ new HadoopTables(spark.sparkContext().hadoopConfiguration())
+ .create(
+ icebergSchema,
+ PartitionSpec.builderFor(icebergSchema).identity("requiredField").build(),
+ tableProperties,
+ targetPath);
+
+ // this is the initial data inside the iceberg dataset
+ spark
+ .read().schema(sparkSchema).json(
+ JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data0))
+ .write().format("iceberg").mode(SaveMode.Append).save(targetPath);
+
+ // read from parquet and append to iceberg w/ nullability check disabled
+ spark
+ .read().schema(SparkSchemaUtil.convert(icebergSchema)).parquet(sourcePath)
+ .write().format("iceberg").option("check-nullability", false).mode(SaveMode.Append).save(targetPath);
+
+ // read all data
+ List<Row> rows = spark.read().format("iceberg").load(targetPath).collectAsList();
+ Assert.assertEquals("Should contain 6 rows", 6, rows.size());
+
+ }
+
+ @Test
+ public void testNullableWithSparkSqlOption() throws IOException {
+ File location = new File(temp.newFolder("parquet"), "test");
+ String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location.toString());
+ String targetPath = String.format("%s/nullable_poc/targetFolder/", location.toString());
+
+ tableProperties = ImmutableMap.of(TableProperties.WRITE_NEW_DATA_LOCATION, targetPath);
+
+ spark = SparkSession.builder()
+ .master("local[2]")
+ .appName("NullableTest")
+ .getOrCreate();
+
+ // read this and append to iceberg dataset
+ spark
+ .read().schema(sparkSchema).json(
+ JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data1))
+ .write().parquet(sourcePath);
+
+ SparkSession newSparkSession = SparkSession.builder()
+ .master("local[2]")
+ .appName("NullableTest")
+ .config("spark.sql.iceberg.check-nullability", false)
+ .getOrCreate();
+
+ // this is our iceberg dataset to which we will append data
+ new HadoopTables(newSparkSession.sparkContext().hadoopConfiguration())
+ .create(
+ icebergSchema,
+ PartitionSpec.builderFor(icebergSchema).identity("requiredField").build(),
+ tableProperties,
+ targetPath);
+
+ // this is the initial data inside the iceberg dataset
+ newSparkSession
+ .read().schema(sparkSchema).json(
+ JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data0))
+ .write().format("iceberg").mode(SaveMode.Append).save(targetPath);
+
+ // read from parquet and append to iceberg
+ newSparkSession
+ .read().schema(SparkSchemaUtil.convert(icebergSchema)).parquet(sourcePath)
+ .write().format("iceberg").mode(SaveMode.Append).save(targetPath);
+
+ // read all data
+ List<Row> rows = newSparkSession.read().format("iceberg").load(targetPath).collectAsList();
+ Assert.assertEquals("Should contain 6 rows", 6, rows.size());
+
+ }
}