You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/11/13 01:39:57 UTC

[incubator-iceberg] branch master updated: Spark: Add option to allow writing optional to required fields (#514)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new cd7cd7b  Spark: Add option to allow writing optional to required fields (#514)
cd7cd7b is described below

commit cd7cd7b7169c04c15c71f147a37fbe8b23ff95a2
Author: Andrei Ionescu <we...@gmail.com>
AuthorDate: Wed Nov 13 03:39:48 2019 +0200

    Spark: Add option to allow writing optional to required fields (#514)
---
 .../apache/iceberg/types/CheckCompatibility.java   |  25 +++-
 .../iceberg/types/TestReadabilityChecks.java       |  22 ++++
 site/docs/configuration.md                         |   1 +
 .../apache/iceberg/spark/source/IcebergSource.java |  20 +++-
 .../iceberg/spark/source/TestDataFrameWrites.java  | 129 +++++++++++++++++++++
 5 files changed, 189 insertions(+), 8 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java b/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java
index 07a84f2..89ea1b0 100644
--- a/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java
+++ b/api/src/main/java/org/apache/iceberg/types/CheckCompatibility.java
@@ -32,13 +32,28 @@ import org.apache.iceberg.Schema;
 public class CheckCompatibility extends TypeUtil.CustomOrderSchemaVisitor<List<String>> {
   /**
    * Returns a list of compatibility errors for writing with the given write schema.
+   * This includes nullability: writing optional (nullable) values to a required field is an error.
    *
    * @param readSchema a read schema
    * @param writeSchema a write schema
    * @return a list of error details, or an empty list if there are no compatibility problems
    */
   public static List<String> writeCompatibilityErrors(Schema readSchema, Schema writeSchema) {
-    return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, true));
+    return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, true, true));
+  }
+
+  /**
+   * Returns a list of compatibility errors for writing with the given write schema.
+   * This checks type compatibility and not nullability: writing optional (nullable) values
+   * to a required field is not an error. To check nullability as well as types,
+   * use {@link #writeCompatibilityErrors(Schema, Schema)}.
+   *
+   * @param readSchema a read schema
+   * @param writeSchema a write schema
+   * @return a list of error details, or an empty list if there are no compatibility problems
+   */
+  public static List<String> typeCompatibilityErrors(Schema readSchema, Schema writeSchema) {
+    return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, true, false));
   }
 
   /**
@@ -49,20 +64,22 @@ public class CheckCompatibility extends TypeUtil.CustomOrderSchemaVisitor<List<S
    * @return a list of error details, or an empty list if there are no compatibility problems
    */
   public static List<String> readCompatibilityErrors(Schema readSchema, Schema writeSchema) {
-    return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, false));
+    return TypeUtil.visit(readSchema, new CheckCompatibility(writeSchema, false, true));
   }
 
   private static final ImmutableList<String> NO_ERRORS = ImmutableList.of();
 
   private final Schema schema;
   private final boolean checkOrdering;
+  private final boolean checkNullability;
 
   // the current file schema, maintained while traversing a write schema
   private Type currentType;
 
-  private CheckCompatibility(Schema schema, boolean checkOrdering) {
+  private CheckCompatibility(Schema schema, boolean checkOrdering, boolean checkNullability) {
     this.schema = schema;
     this.checkOrdering = checkOrdering;
+    this.checkNullability = checkNullability;
   }
 
   @Override
@@ -132,7 +149,7 @@ public class CheckCompatibility extends TypeUtil.CustomOrderSchemaVisitor<List<S
 
     this.currentType = field.type();
     try {
-      if (readField.isRequired() && field.isOptional()) {
+      if (checkNullability && readField.isRequired() && field.isOptional()) {
         errors.add(readField.name() + " should be required, but is optional");
       }
 
diff --git a/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java b/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java
index c2af6bf..cb5325d 100644
--- a/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java
+++ b/api/src/test/java/org/apache/iceberg/types/TestReadabilityChecks.java
@@ -385,4 +385,26 @@ public class TestReadabilityChecks {
     Assert.assertNotNull(schema.caseInsensitiveSelect("LoCaTiOnS.LaT").findField(1));
     Assert.assertNotNull(schema.caseInsensitiveSelect("locations.LONG").findField(2));
   }
+
+  @Test
+  public void testCheckNullabilityRequiredSchemaField() {
+    Schema write = new Schema(optional(1, "from_field", Types.IntegerType.get()));
+    Schema read = new Schema(required(1, "to_field", Types.IntegerType.get()));
+
+    List<String> errors = CheckCompatibility.typeCompatibilityErrors(read, write);
+    Assert.assertEquals("Should produce no error messages", 0, errors.size());
+  }
+
+  @Test
+  public void testCheckNullabilityRequiredStructField() {
+    Schema write = new Schema(required(0, "nested", Types.StructType.of(
+        optional(1, "from_field", Types.IntegerType.get())
+    )));
+    Schema read = new Schema(required(0, "nested", Types.StructType.of(
+        required(1, "to_field", Types.IntegerType.get())
+    )));
+
+    List<String> errors = CheckCompatibility.typeCompatibilityErrors(read, write);
+    Assert.assertEquals("Should produce no error messages", 0, errors.size());
+  }
 }
diff --git a/site/docs/configuration.md b/site/docs/configuration.md
index bc412d7..abec8ea 100644
--- a/site/docs/configuration.md
+++ b/site/docs/configuration.md
@@ -94,4 +94,5 @@ df.write
 | ------------ | -------------------------- | ------------------------------------------------------------ |
 | write-format | Table write.format.default | File format to use for this write operation; parquet or avro |
 | target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes     |
+| check-nullability | true         | Sets the nullable check on fields                        |
 
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 6cb83d0..7fe25a8 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -78,7 +78,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
         "Save mode %s is not supported", mode);
     Configuration conf = new Configuration(lazyBaseConf());
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
-    validateWriteSchema(table.schema(), dsStruct);
+    validateWriteSchema(table.schema(), dsStruct, checkNullability(options));
     validatePartitionTransforms(table.spec());
     String appId = lazySparkSession().sparkContext().applicationId();
     String wapId = lazySparkSession().conf().get("spark.wap.id", null);
@@ -93,7 +93,7 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
         "Output mode %s is not supported", mode);
     Configuration conf = new Configuration(lazyBaseConf());
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
-    validateWriteSchema(table.schema(), dsStruct);
+    validateWriteSchema(table.schema(), dsStruct, checkNullability(options));
     validatePartitionTransforms(table.spec());
     // Spark 2.4.x passes runId to createStreamWriter instead of real queryId,
     // so we fetch it directly from sparkContext to make writes idempotent
@@ -149,9 +149,14 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
         .forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
   }
 
-  private void validateWriteSchema(Schema tableSchema, StructType dsStruct) {
+  private void validateWriteSchema(Schema tableSchema, StructType dsStruct, Boolean checkNullability) {
     Schema dsSchema = SparkSchemaUtil.convert(tableSchema, dsStruct);
-    List<String> errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
+    List<String> errors;
+    if (checkNullability) {
+      errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
+    } else {
+      errors = CheckCompatibility.typeCompatibilityErrors(tableSchema, dsSchema);
+    }
     if (!errors.isEmpty()) {
       StringBuilder sb = new StringBuilder();
       sb.append("Cannot write incompatible dataset to table with schema:\n")
@@ -176,4 +181,11 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
           String.format("Cannot write using unsupported transforms: %s", unsupported));
     }
   }
+
+  private boolean checkNullability(DataSourceOptions options) {
+    boolean sparkCheckNullability = Boolean.parseBoolean(lazySpark.conf()
+        .get("spark.sql.iceberg.check-nullability", "true"));
+    boolean dataFrameCheckNullability = options.getBoolean("check-nullability", true);
+    return sparkCheckNullability && dataFrameCheckNullability;
+  }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
index 22bd419..4f4843e 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
@@ -19,11 +19,14 @@
 
 package org.apache.iceberg.spark.source;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Files;
@@ -35,14 +38,17 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroIterable;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.AvroDataTest;
 import org.apache.iceberg.spark.data.RandomData;
 import org.apache.iceberg.spark.data.SparkAvroReader;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.DataFrameWriter;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.junit.AfterClass;
@@ -77,6 +83,35 @@ public class TestDataFrameWrites extends AvroDataTest {
   private static SparkSession spark = null;
   private static JavaSparkContext sc = null;
 
+  private Map<String, String> tableProperties;
+
+  private org.apache.spark.sql.types.StructType sparkSchema = new org.apache.spark.sql.types.StructType(
+      new org.apache.spark.sql.types.StructField[] {
+          new org.apache.spark.sql.types.StructField(
+              "optionalField",
+              org.apache.spark.sql.types.DataTypes.StringType,
+              true,
+              org.apache.spark.sql.types.Metadata.empty()),
+          new org.apache.spark.sql.types.StructField(
+              "requiredField",
+              org.apache.spark.sql.types.DataTypes.StringType,
+              false,
+              org.apache.spark.sql.types.Metadata.empty())
+      });
+
+  private Schema icebergSchema = new Schema(
+      Types.NestedField.optional(1, "optionalField", Types.StringType.get()),
+      Types.NestedField.required(2, "requiredField", Types.StringType.get()));
+
+  private List<String> data0 = Arrays.asList(
+      "{\"optionalField\": \"a1\", \"requiredField\": \"bid_001\"}",
+      "{\"optionalField\": \"a2\", \"requiredField\": \"bid_002\"}");
+  private List<String> data1 = Arrays.asList(
+      "{\"optionalField\": \"d1\", \"requiredField\": \"bid_101\"}",
+      "{\"optionalField\": \"d2\", \"requiredField\": \"bid_102\"}",
+      "{\"optionalField\": \"d3\", \"requiredField\": \"bid_103\"}",
+      "{\"optionalField\": \"d4\", \"requiredField\": \"bid_104\"}");
+
   @BeforeClass
   public static void startSpark() {
     TestDataFrameWrites.spark = SparkSession.builder().master("local[2]").getOrCreate();
@@ -184,4 +219,98 @@ public class TestDataFrameWrites extends AvroDataTest {
     JavaRDD<InternalRow> rdd = sc.parallelize(rows);
     return spark.internalCreateDataFrame(JavaRDD.toRDD(rdd), convert(schema), false);
   }
+
+  @Test
+  public void testNullableWithWriteOption() throws IOException {
+    File location = new File(temp.newFolder("parquet"), "test");
+    String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location.toString());
+    String targetPath = String.format("%s/nullable_poc/targetFolder/", location.toString());
+
+    tableProperties = ImmutableMap.of(TableProperties.WRITE_NEW_DATA_LOCATION, targetPath);
+
+    spark = SparkSession.builder()
+        .master("local[2]")
+        .appName("NullableTest")
+        .getOrCreate();
+
+    // read this and append to iceberg dataset
+    spark
+        .read().schema(sparkSchema).json(
+        JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data1))
+        .write().parquet(sourcePath);
+
+    // this is our iceberg dataset to which we will append data
+    new HadoopTables(spark.sparkContext().hadoopConfiguration())
+        .create(
+            icebergSchema,
+            PartitionSpec.builderFor(icebergSchema).identity("requiredField").build(),
+            tableProperties,
+            targetPath);
+
+    // this is the initial data inside the iceberg dataset
+    spark
+        .read().schema(sparkSchema).json(
+        JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data0))
+        .write().format("iceberg").mode(SaveMode.Append).save(targetPath);
+
+    // read from parquet and append to iceberg w/ nullability check disabled
+    spark
+        .read().schema(SparkSchemaUtil.convert(icebergSchema)).parquet(sourcePath)
+        .write().format("iceberg").option("check-nullability", false).mode(SaveMode.Append).save(targetPath);
+
+    // read all data
+    List<Row> rows = spark.read().format("iceberg").load(targetPath).collectAsList();
+    Assert.assertEquals("Should contain 6 rows", 6, rows.size());
+
+  }
+
+  @Test
+  public void testNullableWithSparkSqlOption() throws IOException {
+    File location = new File(temp.newFolder("parquet"), "test");
+    String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location.toString());
+    String targetPath = String.format("%s/nullable_poc/targetFolder/", location.toString());
+
+    tableProperties = ImmutableMap.of(TableProperties.WRITE_NEW_DATA_LOCATION, targetPath);
+
+    spark = SparkSession.builder()
+        .master("local[2]")
+        .appName("NullableTest")
+        .getOrCreate();
+
+    // read this and append to iceberg dataset
+    spark
+        .read().schema(sparkSchema).json(
+        JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data1))
+        .write().parquet(sourcePath);
+
+    SparkSession newSparkSession = SparkSession.builder()
+        .master("local[2]")
+        .appName("NullableTest")
+        .config("spark.sql.iceberg.check-nullability", false)
+        .getOrCreate();
+
+    // this is our iceberg dataset to which we will append data
+    new HadoopTables(newSparkSession.sparkContext().hadoopConfiguration())
+        .create(
+            icebergSchema,
+            PartitionSpec.builderFor(icebergSchema).identity("requiredField").build(),
+            tableProperties,
+            targetPath);
+
+    // this is the initial data inside the iceberg dataset
+    newSparkSession
+        .read().schema(sparkSchema).json(
+        JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data0))
+        .write().format("iceberg").mode(SaveMode.Append).save(targetPath);
+
+    // read from parquet and append to iceberg
+    newSparkSession
+        .read().schema(SparkSchemaUtil.convert(icebergSchema)).parquet(sourcePath)
+        .write().format("iceberg").mode(SaveMode.Append).save(targetPath);
+
+    // read all data
+    List<Row> rows = newSparkSession.read().format("iceberg").load(targetPath).collectAsList();
+    Assert.assertEquals("Should contain 6 rows", 6, rows.size());
+
+  }
 }