You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/17 16:30:04 UTC

[GitHub] [iceberg] rdblue opened a new pull request #4154: Spark 3.2: Support mergeSchema option on write

rdblue opened a new pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154


   Some Spark tables support an option, `mergeSchema`, that will update the table schema on write if the incoming data has extra columns. This shows how Iceberg could support the `mergeSchema` option.
   
   Right now, this would not fully work because `SparkTable` would need to return `TableCapability.ACCEPT_ANY_SCHEMA` to disable Spark schema validation and rely completely on Iceberg. We would probably want to add a `SparkConf` setting to enable this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812483633



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
##########
@@ -171,11 +173,13 @@ public ThisT tableProperty(String name) {
     }
 
     protected T parse(Function<String, T> conversion, T defaultValue) {
-      if (optionName != null) {
-        // use lower case comparison as DataSourceOptions.asMap() in Spark 2 returns a lower case map
-        String optionValue = options.get(optionName.toLowerCase(Locale.ROOT));
-        if (optionValue != null) {
-          return conversion.apply(optionValue);
+      if (!optionNames.isEmpty()) {

Review comment:
       @aokolnychyi, I updated this to accept multiple calls to `option` and prefer the first one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r810541149



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -131,7 +132,19 @@ public Write build() {
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
     Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema", false);

Review comment:
       This is a property name used by other sources. We could use both, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812474210



##########
File path: api/src/main/java/org/apache/iceberg/Schema.java
##########
@@ -102,7 +103,7 @@ public Schema(int schemaId, List<NestedField> columns, Map<String, Integer> alia
 
     this.identifierFieldIds = identifierFieldIds != null ? Ints.toArray(identifierFieldIds) : new int[0];
 
-    lazyIdToName();

Review comment:
       The purpose of this call is to ensure that `lazyIdToName()` is called to index the schema. That will fail if there are duplicate IDs. Since that's what we use to find the highest field ID, this line now does both.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812553469



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java
##########
@@ -171,11 +173,13 @@ public ThisT tableProperty(String name) {
     }
 
     protected T parse(Function<String, T> conversion, T defaultValue) {
-      if (optionName != null) {
-        // use lower case comparison as DataSourceOptions.asMap() in Spark 2 returns a lower case map
-        String optionValue = options.get(optionName.toLowerCase(Locale.ROOT));
-        if (optionValue != null) {
-          return conversion.apply(optionValue);
+      if (!optionNames.isEmpty()) {

Review comment:
       Looks correct to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812476038



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
+        writeInfo.options().getBoolean("merge-schema", false));
+    if (mergeSchema) {

Review comment:
       Yes, but if the change is a noop then the commit will return without modifying the table. I think that's better than trying to detect ahead of time that the schema will change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812479852



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",

Review comment:
       Moved this to writeConf.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812483258



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
+        writeInfo.options().getBoolean("merge-schema", false));
+    if (mergeSchema) {
+      // convert the dataset schema and assign fresh ids for new fields
+      Schema newSchema = SparkSchemaUtil.convertWithFreshIds(table.schema(), dsSchema);
+
+      // update the table to get final id assignments and validate the changes
+      UpdateSchema update = table.updateSchema().unionByNameWith(newSchema);
+      Schema mergedSchema = update.apply();
+
+      // reconvert the dsSchema without assignment to use the ids assigned by UpdateSchema
+      writeSchema = SparkSchemaUtil.convert(mergedSchema, dsSchema);
+
+      TypeUtil.validateWriteSchema(
+          mergedSchema, writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+
+      // if the validation passed, update the table schema
+      update.commit();
+    } else {
+      writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);

Review comment:
       Done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r810541149



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -131,7 +132,19 @@ public Write build() {
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
     Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema", false);

Review comment:
       This is a property name used by other sources.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#issuecomment-1048341174


   @rdblue, there are some check style errors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#issuecomment-1043377253


   I think this feature is useful for operations like MERGE INTO where we populate changes from a source table that may have an evolved schema now. We have to be careful with assignment alignment in UPDATE and MERGE. I think Spark disables a few checks/rules if a table accepts any schema. That means we will have to reimplement it in Iceberg.
   
   We will also have to modify our row-level rules in extensions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r810554664



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -131,7 +132,19 @@ public Write build() {
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
     Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema", false);

Review comment:
       I implemented both.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812552945



##########
File path: api/src/main/java/org/apache/iceberg/types/TypeUtil.java
##########
@@ -273,7 +273,13 @@ public static Schema assignIncreasingFreshIds(Schema schema) {
    * @throws IllegalArgumentException if a field cannot be found (by name) in the source schema
    */
   public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
-    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema)).asStructType();
+    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema, null)).asStructType();
+    return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
+  }
+
+  public static Schema reassignOrRefreshIds(Schema schema, Schema idSourceSchema) {
+    AtomicInteger highest = new AtomicInteger(schema.highestFieldId());

Review comment:
       Sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812476404



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",

Review comment:
       `mergeSchema` is the Spark option that is used elsewhere. It is supported here for compatibility with the Hive-like table formats.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r809427756



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -131,7 +132,19 @@ public Write build() {
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
     Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema", false);

Review comment:
       Is it a commonly defined option name? Or should it be `merge-schema`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812475776



##########
File path: api/src/main/java/org/apache/iceberg/types/TypeUtil.java
##########
@@ -273,7 +273,13 @@ public static Schema assignIncreasingFreshIds(Schema schema) {
    * @throws IllegalArgumentException if a field cannot be found (by name) in the source schema
    */
   public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
-    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema)).asStructType();
+    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema, null)).asStructType();
+    return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
+  }
+
+  public static Schema reassignOrRefreshIds(Schema schema, Schema idSourceSchema) {
+    AtomicInteger highest = new AtomicInteger(schema.highestFieldId());

Review comment:
       Yes, that's exactly right. The assigned IDs are not the final IDs that are assigned to the table. They are just temporary IDs that will be replaced by `UpdateSchema` in `unionByNameWith`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812476038



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
+        writeInfo.options().getBoolean("merge-schema", false));
+    if (mergeSchema) {

Review comment:
       Yes, but if the change is a noop then the commit will return without modifying the table.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812552741



##########
File path: api/src/main/java/org/apache/iceberg/Schema.java
##########
@@ -102,7 +103,7 @@ public Schema(int schemaId, List<NestedField> columns, Map<String, Integer> alia
 
     this.identifierFieldIds = identifierFieldIds != null ? Ints.toArray(identifierFieldIds) : new int[0];
 
-    lazyIdToName();

Review comment:
       Oops, I did not see a call to `lazyIdToName()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#issuecomment-1043377253


   I think this feature is useful for operations like MERGE INTO where we populate changes from a source table that may have an evolved schema now. We have to be careful with assignment alignment in UPDATE and MERGE. I think Spark disables a few checks/rules if a table accepts any schema. That means we will have to reimplement it here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#issuecomment-1048445263


   Thanks, @rdblue!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812432890



##########
File path: api/src/main/java/org/apache/iceberg/types/TypeUtil.java
##########
@@ -273,7 +273,13 @@ public static Schema assignIncreasingFreshIds(Schema schema) {
    * @throws IllegalArgumentException if a field cannot be found (by name) in the source schema
    */
   public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
-    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema)).asStructType();
+    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema, null)).asStructType();
+    return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
+  }
+
+  public static Schema reassignOrRefreshIds(Schema schema, Schema idSourceSchema) {
+    AtomicInteger highest = new AtomicInteger(schema.highestFieldId());

Review comment:
       Can there be a mismatch between the highest field ID in the schema and `last-column-id` in the metadata? For instance, what if I add a column, then drop it, and call this method with a new column? Can it cause the same column ID be used twice?

##########
File path: api/src/main/java/org/apache/iceberg/Schema.java
##########
@@ -102,7 +103,7 @@ public Schema(int schemaId, List<NestedField> columns, Map<String, Integer> alia
 
     this.identifierFieldIds = identifierFieldIds != null ? Ints.toArray(identifierFieldIds) : new int[0];
 
-    lazyIdToName();

Review comment:
       Did we remove this call on purpose? Seems unrelated.

##########
File path: api/src/main/java/org/apache/iceberg/types/ReassignIds.java
##########
@@ -43,6 +45,17 @@ public Type schema(Schema schema, Supplier<Type> future) {
     }
   }
 
+  private int id(Types.StructType sourceStruct, String name) {
+    if (sourceStruct != null) {
+      Types.NestedField sourceField = sourceStruct.field(name);
+      if (sourceField != null) {
+        return sourceField.fieldId();
+      }
+    }
+
+    return assignId.get();

Review comment:
       I see that `assignId` is set to null in `reassignIds`. Can this result in an NPE when called in `struct`?

##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
+        writeInfo.options().getBoolean("merge-schema", false));
+    if (mergeSchema) {

Review comment:
       Will we always attempt to merge the schema if enabled? Even if the incoming schema doesn't include new columns?

##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
+        writeInfo.options().getBoolean("merge-schema", false));
+    if (mergeSchema) {
+      // convert the dataset schema and assign fresh ids for new fields
+      Schema newSchema = SparkSchemaUtil.convertWithFreshIds(table.schema(), dsSchema);
+
+      // update the table to get final id assignments and validate the changes
+      UpdateSchema update = table.updateSchema().unionByNameWith(newSchema);
+      Schema mergedSchema = update.apply();
+
+      // reconvert the dsSchema without assignment to use the ids assigned by UpdateSchema
+      writeSchema = SparkSchemaUtil.convert(mergedSchema, dsSchema);
+
+      TypeUtil.validateWriteSchema(
+          mergedSchema, writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+
+      // if the validation passed, update the table schema
+      update.commit();
+    } else {
+      writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);

Review comment:
       nit: What about putting the new logic into a separate method as `build` is already complicated.

##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",

Review comment:
       What about moving this to `SparkWriteConf`? We already have an instance of it and all other configs are parsed through it.
   
   Also, I am not sure about `mergeSchema`. It is the only option name that uses camel case. I'd say we should either just support `merge-schema`, which aligns with all other option names, or support camel case for all options (which could be done in a follow-up PR).

##########
File path: api/src/main/java/org/apache/iceberg/types/TypeUtil.java
##########
@@ -273,7 +273,13 @@ public static Schema assignIncreasingFreshIds(Schema schema) {
    * @throws IllegalArgumentException if a field cannot be found (by name) in the source schema
    */
   public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
-    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema)).asStructType();
+    Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema, null)).asStructType();
+    return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
+  }
+
+  public static Schema reassignOrRefreshIds(Schema schema, Schema idSourceSchema) {
+    AtomicInteger highest = new AtomicInteger(schema.highestFieldId());

Review comment:
       I guess we assume `UpdateSchema` will take care of the conflicting IDs?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi edited a comment on pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi edited a comment on pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#issuecomment-1043377253


   I think this feature is useful for operations like MERGE INTO where we populate changes from a source table that may have an evolved schema now. We have to be careful with assignment alignment in UPDATE and MERGE. I think Spark disables a few checks/rules if a table accepts any schema. That means we will have to reimplement it in Iceberg.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#issuecomment-1046135149


   @aokolnychyi can you have another look? I got this working and added tests.
   
   I had to update a couple of the classes that handle Spark types to support dataset schemas with extra fields, but it was a very simple update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812474737



##########
File path: api/src/main/java/org/apache/iceberg/types/ReassignIds.java
##########
@@ -43,6 +45,17 @@ public Type schema(Schema schema, Supplier<Type> future) {
     }
   }
 
+  private int id(Types.StructType sourceStruct, String name) {
+    if (sourceStruct != null) {
+      Types.NestedField sourceField = sourceStruct.field(name);
+      if (sourceField != null) {
+        return sourceField.fieldId();
+      }
+    }
+
+    return assignId.get();

Review comment:
       Yeah, this is from a previous version where I was going to throw an exception from `assignId`. I'll fix it up.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154#discussion_r812555356



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
##########
@@ -130,8 +131,31 @@ public Write build() {
     Preconditions.checkArgument(handleTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(table.schema()),
         SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
 
-    Schema writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema);
-    TypeUtil.validateWriteSchema(table.schema(), writeSchema, writeConf.checkNullability(), writeConf.checkOrdering());
+    Schema writeSchema;
+    boolean mergeSchema = writeInfo.options().getBoolean("mergeSchema",
+        writeInfo.options().getBoolean("merge-schema", false));
+    if (mergeSchema) {

Review comment:
       As long as it behaves that way, I agree.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #4154: Spark 3.2: Support mergeSchema option on write

Posted by GitBox <gi...@apache.org>.
aokolnychyi merged pull request #4154:
URL: https://github.com/apache/iceberg/pull/4154


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org