You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/08/23 18:02:35 UTC

[GitHub] [hudi] sathyaprakashg opened a new pull request #2012: HUDI-1129 Deltastreamer Add support for schema evaluation

sathyaprakashg opened a new pull request #2012:
URL: https://github.com/apache/hudi/pull/2012


   ## What is the purpose of the pull request
   
   When schema is evolved but producer is still producing events using older version of schema, Hudi delta streamer is failing. This fix is to make sure delta streamer works fine with schema evoluation.
   
   Related issues #1845 #1971 #1972 
   
   ## Brief change log
   
     - Update avro to spark conversion method `AvroConversionHelper.createConverterToRow` to handle scenario when provided schema has more fields than data (scenario where producer is still sending events with old schema)
    -  Introduce new schema provider class called `SchemaBasedSchemaProvider`. This is used to set schema based on schema of the data. Currently, `HoodieAvroUtils.avroToBytes` uses the schema of the data to convert to bytes, but `HoodieAvroUtils.bytesToAvro` uses provided schema. Since both may not match always, it results in error. By using data's schema using new schema provider, we can ensure, same schema is used for converting avro to bytes and bytes back to avro.
   
   
   ## Verify this pull request
   
   This change added tests and can be verified as follows:
   
     - *Added unit test to verify schema evoluation* Thanks @sbernauer for unit test
   
   ## Committer checklist
   
    - [x] Has a corresponding JIRA in PR title & commit
    
    - [x] Commit message is descriptive of the change
    
    - [x] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834118960


   I actually see that @n3nash has approved this several months ago, pending the perf  issue that @sathyaprakashg himself has pointed out. Agree with you,. Let me gather some context and see how we can move forward. I also wonder 
   
   Any reason we have not landed this @n3nash? any open items that you see besides the perf issue. (storing schema in payload should be avoided at all costs). This is what a schema registry solves for.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-841601332


   @vinothchandar We have found a way to avoid `BaseAvroPayloadWithSchema`. We are validating some more corner cases with respect to schema evolution in the PR @nsivabalan has pointed out before landing that PR. ETA is May 17th


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-809974270


   @sathyaprakashg Can you rebase this PR ? I'll wait for sometime before I'll give it a shot by myself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r526563594



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @bvaradar @n3nash  Yes, it will break existing MOR table who have log records written with old namespace and one way it can be avoided is by doing one time compaction before running job with version of hudi that this change is going to ship with. 
   
   There are three different flows in delta streamer as I have explained in one of my previous comment and I would like to reiterate that it will affect only those using thrid flow (Transformation without userProvidedSchema) and also when schema has fixed fields. Even without this change, if user wants to change from thrid flow to any of the other flows, they will still face this issue. So, by implementing this change, we will make all three flows to produce same output 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r487165310



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834087051


   Actually pretty bummed to hear that. @nsivabalan @n3nash can we do something here to prioritize this? Or at least source more help from dev list/slack? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r618643401



##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -696,6 +697,44 @@ public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
     assertTrue(fieldNames.containsAll(expectedFieldNames));
   }
 
+  @Test
+  public void testSchemaEvolution() throws Exception {
+    String tableBasePath = dfsBasePath + "/test_table_schema_evolution";
+
+    // Insert data produced with Schema A, pass Schema A
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.INSERT, Collections.singletonList(IdentityTransformer.class.getName()));
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc");
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source.avsc");
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+
+    // Upsert data produced with Schema B, pass Schema B
+    cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()));
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source_evolved.avsc");
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc");
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(1450, tableBasePath + "/*/*.parquet", sqlContext);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
+    assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());

Review comment:
       can you add a comment here as to how 1450 is expected. 
   // batch 1: 1000 inserts
   // batch2: total records to be generated 1000. 50% inserts. 50% updates. 50 count deletes. and so, 500 new inserts. 450 updates from previous batch. 50 deletes. in all, 1450. 
   

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -696,6 +697,44 @@ public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
     assertTrue(fieldNames.containsAll(expectedFieldNames));
   }
 
+  @Test
+  public void testSchemaEvolution() throws Exception {
+    String tableBasePath = dfsBasePath + "/test_table_schema_evolution";
+
+    // Insert data produced with Schema A, pass Schema A
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, Operation.INSERT, Collections.singletonList(IdentityTransformer.class.getName()));
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source.avsc");
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source.avsc");
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(1000, tableBasePath + "/*/*.parquet", sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+
+    // Upsert data produced with Schema B, pass Schema B
+    cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()));
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source_evolved.avsc");
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc");
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(1450, tableBasePath + "/*/*.parquet", sqlContext);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
+    assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
+
+    sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").createOrReplaceTempView("tmp_trips");
+    long recordCount =
+            sqlContext.sparkSession().sql("select * from tmp_trips where evoluted_optional_union_field is not NULL").count();
+    assertEquals(950, recordCount);
+
+    // Upsert data produced with Schema A, pass Schema B
+    cfg = TestHelpers.makeConfig(tableBasePath, Operation.UPSERT, Collections.singletonList(IdentityTransformer.class.getName()));
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" + dfsBasePath + "/source_evolved.avsc");
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + dfsBasePath + "/source_evolved.avsc");
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(1900, tableBasePath + "/*/*.parquet", sqlContext);
+    TestHelpers.assertCommitMetadata("00002", tableBasePath, dfs, 3);
+    counts = TestHelpers.countsPerCommit(tableBasePath + "/*/*.parquet", sqlContext);
+    assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());

Review comment:
       similarly, would be good to add a comment here as to how 1900 is expected. 

##########
File path: hudi-utilities/src/test/resources/delta-streamer-config/source_evolved.avsc
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "triprec",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "type" : "long"
+  }, {
+    "name" : "_row_key",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "type" : "string"
+  }, {
+    "name" : "driver",
+    "type" : "string"
+  }, {
+    "name" : "begin_lat",
+    "type" : "double"
+  }, {
+    "name" : "begin_lon",
+    "type" : "double"
+  }, {
+    "name" : "end_lat",
+    "type" : "double"
+  }, {
+    "name" : "end_lon",
+    "type" : "double"
+  }, {
+    "name" : "distance_in_meters",
+    "type" : "int"
+  }, {
+    "name" : "seconds_since_epoch",
+    "type" : "long"
+  }, {
+    "name" : "weight",
+    "type" : "float"
+  },{
+    "name" : "nation",
+    "type" : "bytes"
+  },{
+    "name" : "current_date",
+    "type" : {
+      "type" : "int",
+      "logicalType" : "date"
+      }
+  },{
+    "name" : "current_ts",
+    "type" : {
+      "type" : "long"
+      }
+  },{
+    "name" : "height",
+    "type" : {
+      "type" : "fixed",
+      "name" : "fixed",
+      "size" : 5,
+      "logicalType" : "decimal",
+      "precision" : 10,
+      "scale": 6
+      }
+  }, {
+    "name" :"city_to_state",
+    "type" : {
+      "type" : "map",
+      "values": "string"
+    }
+  },
+  {
+    "name" : "fare",
+    "type" : {
+      "type" : "record",
+      "name" : "fare",
+      "fields" : [
+        {
+         "name" : "amount",
+         "type" : "double"
+        },
+        {
+         "name" : "currency",
+         "type" : "string"
+        }
+      ]
+    }
+  },
+  {

Review comment:
       may I know why this new column is not added at the end of the schema. 

##########
File path: hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
##########
@@ -70,7 +70,7 @@
     "name" : "height",
     "type" : {
       "type" : "fixed",
-      "name" : "abc",

Review comment:
       also, can you help me understand why we change the name here to fixed. name could be anything in general right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r487165310



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-841681910


   @sathyaprakashg and others: Can you folks check out https://github.com/apache/hudi/pull/2927 and see if it solves the schema evolution problem you guys are facing. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r520056512



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @n3nash @sathyaprakashg : The fundamental problem is this would break existing MOR table who have log records written with old namespace. right ? So, this would be unsafe. @n3nash : Thoughts ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r485249027



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash  Yes, it is very valid point. Adding namespace to reader schema might make it incompatible with data already written without namespace. 
   
   I am thinking we should probably remove the namespace in the writerSchema fixed fields instead of adding namespace to the reader schema fixed fields. Since we are not altering reader schema in this approach, there shouldn't be incompatibility issue with data already written.
   
   If you also agree with this approach, i will update `AvroConversionHelper.createConverterToAvro` method to remove namespace in writer schema fixed fields and revoke the changes made in `HoodieAvroUtils`  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
vinothchandar edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-838077879


   @n3nash `BaseAvroPayloadWithSchema` is this still needed, which keeps the schema in each record? What are the concrete next steps here? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg closed pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg closed pull request #2012:
URL: https://github.com/apache/hudi/pull/2012


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sbernauer edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834085407


   Hi together,
   
   we sadly can't do schema evolution for 10 months now (https://github.com/apache/hudi/issues/1845) and have to rely on ugly workarounds.
   Many thanks for working together to find a solution!
   We have tested this patch out in our test systems and everything worked fine. When we rolled it out to production we noticed that the Memory consumption increased by multiple times. This caused our executors to spill to disk and crash. We had to rollback to a previous version.
   So i would like to highlight the comment of @sathyaprakashg
   > @n3nash I am working on fixing build issue and will have that fix pushed soon. I would like to point out that with this new approach, we are stroing writer schema part of payload, which means, size of dataframe would increase to store same schema information with each record. Any suggestion on optimizing this?
   
   Regards,
   Sebastian


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-835429596


   yes, thanks for clarifying. I guess, embedding schema in every payload might be detrimental as you have experienced. So, have thought of a diff approach to regenerate records w/ new schema at spark datasource level. Only the batch that is getting ingested w/ old schema after table's schema got evolved will take a hit with this conversion. 
   
   https://github.com/apache/hudi/pull/2927
   
   Also, as I have mentioned earlier, if others (@n3nash , @bvaradar ) confirm that schema post processor is not required as a mandatory step with this [fix](https://github.com/apache/hudi/pull/2765) for default vals, we don't need any changes in delta streamer as such, just https://github.com/apache/hudi/pull/2927 would suffice. 
   
   @n3nash is doing more testing around this as well. So, will wait for him to comment on the patch as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-835222588


   > > @sathyaprakashg and others: trying to understand the use-case here. I understand its related to deltastreamer receiving events in old schema after Hudi's dataset schema got evolved. what's the schema from schema provider when source is producing events in old schema (after schema got evolved w/ hudi dataset)? if the schema provider's schema is updated, I guess there is no need to store the writer schema w/ payload.
   > > AvroConversionUtils.createDataFrame() will ensure to convert the JavaRDD w/ old schema to Dataset w/ new schema if schemaProvider.SourceSchema() has the evolved schema.
   
   Issue with schema evoluation happens in `HoodieAvroUtils.avroToBytes` and `HoodieAvroUtils.bytesToAvro`. Let us consider a scenario where there are two versions of schema in schema regsitry. In the 2nd (latest) version, there is a new field added. But data is stilling with schema version 1. 
   
   `HoodieAvroUtils.avroToBytes` uses schema part of data (i.e version 1 schema) to convert avro to bytes. `HoodieAvroUtils.bytesToAvro` uses the latest schema registry schema (version 2) to convert the bytes to avro. This will fail because v1 schema was used to convert to bytes, but v2 schema is being used to convert bytes to avro. In order to solve this, we need both v1 (writer schema) and v2 (reader schema) to convert bytes back to avro. We can get v2 schema from schema registry, but to get v1 schema, we were trying to store the writer schema part of the payload itself.
   
   Please let me know if it is still not very clear


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-693155102


   @sathyaprakashg thanks for the explanation, please update the PR accordingly, rebase and squash the commits.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-841601332


   @vinothchandar We have found a way to avoid `BaseAvroPayloadWithSchema`. We are validating some more corner cases with respect to schema evolution in the PR @nsivabalan has pointed out. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r513742129



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @sathyaprakashg : Thanks for the detailed write up. Sorry for missing this part. Regarding the 3rd flow (transformation without user provided schema),  the exception indicate https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java#L157 but per your observation, HoodieAvroUtils.bytesToAvro() works without issue. Can you see if you can use HoodieAvroUtils.bytesToAvro() in HoodieAvroDataBlock. Does this solve the issue w.r.t schema evolution handling ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
##########
@@ -39,13 +40,19 @@
    */
   protected final Comparable orderingVal;
 
+  /**
+   * Schema used to convert avro to bytes.
+   */
+  protected final Schema writerSchema;

Review comment:
       This would increase the memory footprint (and increase I/O in shuffle stages if we introduce schema at payload level. 
   You may want to introduce another class (similar to BaseAvroPayload) but also tracking schema at record level cc @n3nash
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r519230640



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @n3nash @bvaradar I checked the three steps you mentioned and it works fine when the reader and writer schema has same set of fields (and writer schema has namespace in fixed field). 
   
   If reader schema has extra field then, this approach does not work. Here is an [example](https://gist.github.com/sathyaprakashg/f423291be7be6f9d96b9cb850fc72edf) that has extra field in reader schema and gives error.  When schema evolves, table schema (reader schema) may have more or less number of fields then writer schema(mor log file schema). So, if we have to implement this approach, then it would work only when schema is same (except the extra namespace information in writer schema). Please let me know how to handle this or correct me if approach i took is wrong.
   
   Just to recap, issue we are trying to solve is, in the existing code, when we write fixed avro field in mor log file, it gets written with extra namespace information in one of the flow (Transformation without userProvidedSchema) but not in other two flows and with this PR, extra namespace information will no longer be written. 
   
   Since this extra namespace information is written only in mor log file and not in parquet file, one possible solution for user to do is do compaction before running job with this upgraded version of hudi. Also, compaction is not mandatory for upgrading to this version but only needs to be done if they are having fixed field in schema and they were using Transformation without userProvidedSchema flow.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on pull request #2012: HUDI-1129 Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-684007672


   @vinothchandar Sure, I will start looking into using writer and reader schema. Once that looks good, then i will remove the new Schema provide class i added


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-835222588


   > > @sathyaprakashg and others: trying to understand the use-case here. I understand its related to deltastreamer receiving events in old schema after Hudi's dataset schema got evolved. what's the schema from schema provider when source is producing events in old schema (after schema got evolved w/ hudi dataset)? if the schema provider's schema is updated, I guess there is no need to store the writer schema w/ payload.
   > > AvroConversionUtils.createDataFrame() will ensure to convert the JavaRDD w/ old schema to Dataset w/ new schema if schemaProvider.SourceSchema() has the evolved schema.
   
   Issue with schema evoluation happens in `HoodieAvroUtils.avroToBytes` and `HoodieAvroUtils.bytesToAvro`. Let us consider a scenario where there are two versions of schema in schema regsitry and in the 2nd (latest) version, there is a new field added. But data is stilling coming with schema version 1. 
   
   `HoodieAvroUtils.avroToBytes` uses schema part of data (i.e version 1 schema) to convert avro to bytes. `HoodieAvroUtils.bytesToAvro` uses the latest schema registry schema (version 2) to convert the bytes to avro. This will fail because v1 schema was used to convert to bytes, but v2 schema is being used to convert bytes to avro. In order to solve this, we need both v1 (writer schema) and v2 (reader schema) to convert bytes back to avro. We can get v2 schema from schema registry, but to get v1 schema, we were trying to store the writer schema part of the payload itself.
   
   Please let me know if it is still not very clear


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] giaosudau commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
giaosudau commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r543093343



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       it didn't work with 
   ```{
         "name": "fixed_type_field",
         "type": {
           "type": "fixed",
           "name": "fixed",
           "namespace": "hoodie.source.hoodie_source.fixed_type_field",
           "size": 7,
           "logicalType": "decimal",
           "precision": 15,
           "scale": 8
         }
       }
   ```
   ```
   Exception in thread "main" org.apache.avro.SchemaParseException: Can't redefine: fixed
   	at org.apache.avro.Schema$Names.put(Schema.java:1128)
   	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
   	at org.apache.avro.Schema$FixedSchema.toJson(Schema.java:907)
   	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
   	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
   	at org.apache.avro.Schema.toString(Schema.java:324)
   	at org.apache.hudi.utilities.schema.DebeziumSchemaRegistryProvider.getTargetSchema(DebeziumSchemaRegistryProvider.java:77)
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] dirksan28 edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
dirksan28 edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834091378


   I've been observing this issue for several months now. From my point of view a working schema evolution is highly fundamental for the meaningful use of hudi. I am amazed that this is not classified as a mission critical. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-825077766


   I spent sometime to understand this PR. thanks for putting it up @sathyaprakashg. I have few clarifications. 
   
   1. Can you fix the description wrt latest status. I don't see SchemaBasedSchemaProvider etc. 
   2. FYI We landed a [fix](https://github.com/apache/hudi/pull/2765) wrt default vals and null in unions. If incase, the schema post processing is not required at all w/ this fix, it would simplify things. Guess the namespace fix in this PR may not be required if the post processing step is not required. @bvaradar @n3nash : can you folks chime in here please. [fixed datatype jira](https://issues.apache.org/jira/browse/HUDI-1607).
   3. Also, I pulled the test locally and was trying to verify things. Looks like the test is not generating records as intended in 3rd step. Here is what is happening. 
       - TestDataSource generates data w/ intended schema(old)
       - But in SourceFormatAdapter, when we do AvroConversionUtils.createDataFrame(...), evolved schema is passed in. and so InputBatch<Dataset<Row>> returned from here has new column set to null for all records. 
       - I also verified this from within the IdentityTransformer which was showing evolved schema and record having new column as well. 
   so, essentially the test also need to be fixed. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r497257812



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @bvaradar 
   
   In delta streamer, currently, we have below three flows
   1) [No transformation](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335)
   2) [Transformation with userProvidedSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L315)
   3) [Transformation without userProvidedSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L323)
   
   Only schema converted from spark data type to avro schema has this namespace added to fixed fields. In delta streamer, currently we use user provided schema ([userProvidedSchemaProvider.targetSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L568)) to convert [bytes to avro](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L126), except for thrid flow (Transformation without userProvidedSchema). In such case, we [derive schema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L327) from spark data type. So, backward compatible issue arises only when we use transformer and no user provided schema.
   
   Below is example of avro fixed field with and without namespace.
   
   `{"name":"height","type":{"type":"fixed","name":"fixed","size":5,"logicalType":"decimal","precision":10,"scale":6}}`
   
   `{"name":"height","type":{"type":"fixed","name":"fixed","namespace":"hoodie.source.hoodie_source.height","size":5,"logicalType":"decimal","precision":10,"scale":6}}`
   
   Both of these result in same parquet schema
   `required fixed_len_byte_array(5) height (DECIMAL(10,6));`
   
   As we can see here, namespace in fixed field does not seem to have any impact on parquet schema. So, may be HoodieFileReader in MergeHelper file you referred shouldn't have any issue?  
   
   In general, it looks parquet file in existing hudi dataset would not have issue. So, we could rule out issue in COPY ON WRITE table type. But in case of MERGE ON READ table, I could see issue for thrid flow (Transformation without userProvidedSchema). Below is the stack trace.
   
   ```
   51511 [Executor task launch worker for task 502] ERROR org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner  - Got exception when reading log file
   org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
   	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
   	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
   	at org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
   	at org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
   	at org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
   	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
   	at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
   	at org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
   	at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
   	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
   	at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
   	at org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
   	at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
   	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
   	at org.apache.spark.scheduler.Task.run(Task.scala:123)
   	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   
   In summary, it looks third flow (Transformation without userProvidedSchema) produces different output schema in log file when compared to two other flows if there are fixed fields and this means if we want to change from thrid flow to say first flow (by removing transformation), then we already have problem since log files in MERGE ON READ table will have different schema, if there are fixed fields. This PR may cause backward compatible issue for thrid flow but would make sure, we produce same schema regardless of which flow we use.
   
   Incase if you have better suggestion to make this work without causing issue in existing dataset for third flow, please let me know, happy to update PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-835486764


   At a high level, my understanding is there are 3 issues
   
   1. Removing namespace could cause backwards compatibility -> Validated with [this](https://github.com/apache/hudi/pull/2928) PR that it should be OK to make this change. 
   2. Records with older schema received without a user defined schema. This breaks the `getInsertRecord` API since the writer schema is smaller than the reader schema and bytes to generic record conversion breaks, addressed by @nsivabalan [diff](https://github.com/apache/hudi/pull/2927)
   3. For general schema evolution to work when converting from DataFrame to Avro, we need to ensure that new elements added in the schema have a default value "null" using the UNION type as follows `name: a, type ["null", <type>]`. Spark's internal converter breaks the Avro spec of creating a UNION by reversing the order of null as follows `name: a, type [<type>, "null"]` , @nsivabalan fixed this with [this](https://github.com/apache/hudi/pull/2765) PR by relying on Hudi's internal ConversionHelper to ensure that "null" is appended first. 
   
   With all of these 3 fixes, we should be able to land this PR. @sathyaprakashg Let us know if there were any other concerns that you came across. 
   
   @nsivabalan I can confirm that (3) should be good to land. I've approved the diff and @bvaradar will be taking a look at this later today, once he confirms, we can land both (3) and then (2) with the assumption that (1) validates both. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
vinothchandar edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834118960


   I actually see that @n3nash has approved this several months ago, pending the perf  issue that @sathyaprakashg himself has pointed out. Agree with you,. Let me gather some context and see how we can move forward. I also wonder if we had landed other PRs around this. @nsivabalan remember you landed something around it or I am mistaken?
   
   Any reason we have not landed this @n3nash? any open items that you see besides the perf issue. (storing schema in payload should be avoided at all costs). This is what a schema registry solves for.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on pull request #2012: HUDI-1129 Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-685308504


   @n3nash I am working on fixing build issue and will have that fix pushed soon. I would like to point out that with this new approach, we are stroing writer schema part of payload, which means, size of dataframe would increase to store same schema information with each record. Any suggestion on optimizing this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r487165310



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-847846030


   @sbernauer : sorry I might need your email to invite to apache hudi's slack workspace. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-835486764


   At a high level, my understanding is there are 3 issues
   
   1. Removing namespace could cause backwards compatibility -> Validated with [this](https://github.com/apache/hudi/pull/2928) PR that it should be OK to make this change. 
   2. Records with older schema received without a user defined schema. This breaks the `getInsertRecord` API since the writer schema is smaller than the reader schema and bytes to generic record conversion breaks, addressed by @nsivabalan [diff](https://github.com/apache/hudi/pull/2927)
   3. For general schema evolution to work when converting from DataFrame to Avro, we need to ensure that new elements added in the schema have a default value "null" using the UNION type as follows `name: a, type ["null", <type>]`. Spark's internal converter breaks the Avro spec of creating a UNION by reversing the order of null as follows `name: a, type [<type>, "null"]` , @nsivabalan fixed this with [this](https://github.com/apache/hudi/pull/2911) PR by relying on Hudi's internal ConversionHelper to ensure that "null" is appended first. Also, [this](https://github.com/apache/hudi/pull/2765) PR addressed some other issues as well. 
   
   With all of these 3 fixes, we should be able to land this PR. @sathyaprakashg Let us know if there were any other concerns that you came across. 
   
   @nsivabalan I can confirm that (3) should be good to land. I've approved the diff and @bvaradar will be taking a look at this later today, once he confirms, we can land both (3) and then (2) with the assumption that (1) validates both. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-869777243


   From my sync up with @sbernauer, https://github.com/apache/hudi/pull/2927 and https://github.com/apache/hudi/pull/3111 are required. 3111 is already merged, and 2927 is up for review. 
   We can close this one though 2012 as 2927 is a replacement. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-835216648


   > @sathyaprakashg and others: trying to understand the use-case here. I understand its related to deltastreamer receiving events in old schema after Hudi's dataset schema got evolved. what's the schema from schema provider when source is producing events in old schema (after schema got evolved w/ hudi dataset)? if the schema provider's schema is updated, I guess there is no need to store the writer schema w/ payload.
   > AvroConversionUtils.createDataFrame() will ensure to convert the JavaRDD w/ old schema to Dataset w/ new schema if schemaProvider.SourceSchema() has the evolved schema.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-824608661


   @n3nash I rebased the PR. Thanks @sbernauer for help


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] tandonraghavs commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
tandonraghavs commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834497022


   @sathyaprakashg @vinothchandar Can this PR handle [this](https://github.com/apache/hudi/issues/2919) issue as well? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sbernauer commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-847940734


   Hi @nsivabalan,
   
   we have multiple schema versions of the events we consume. We use kafka and Confluent Schema Registry. I think all the events in kafka are written with schema version 9.
   My testcase would be to read some Events with schema version 8, switch to schema version 9 and consume some evolved Events. We use a COW Table and INSERTs only (with dropping of duplicates).
   
   With the patch in https://github.com/apache/hudi/pull/2927 starting from an empty directory the ingestion throws this exception in the executors. Reading with schema version 9 works fine.
   
   ```
   schemaRegistryUrl: https://eventbus-schema-bs-qa.server.lan/subjects/MyEvent-v1/versions/8
   # Sets
   --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
   # and
   curl --silent $SCHEMA_REGISTRY_URL | jq -r -c '.schema' | jq '.' > /tmp/schema_source.json
   cp /tmp/schema_source.json /tmp/schema_target.json
   
   21/05/25 14:45:55 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=sourceEventHeader.happenedTimestamp:1621953763077,sourceEventHeader.eventId:143d1259-01c2-4346-a3c4-85b2e3325ff3 partitionPath=2021/05/25}, currentLocation='null', newLocation='null'}
    java.lang.ArrayIndexOutOfBoundsException: 22
           at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
           at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
           at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
           at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
           at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
           at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
           at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:136)
           at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:126)
           at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69)
           at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:88)
           at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:101)
           at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
           at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
           at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   The schema difference. The field is nested multiple times.
   ```
   $ curl https://eventbus-schema-bs-qa.server.lan/subjects/MyEvent-v1/versions/8 | jq -r '.schema' | jq > 8
   $ curl https://eventbus-schema-bs-qa.server.lan/subjects/MyEvent-v1/versions/9 | jq -r '.schema' | jq > 9
   
   $ diff -U 5 8 9
   --- 8   2021-05-25 16:51:21.416603077 +0200
   +++ 9   2021-05-25 16:51:25.072629744 +0200
   @@ -326,10 +326,22 @@
                    "type": "string",
                    "avro.java.string": "String"
                  }
                },
                "doc": "* List of optional claim names"
   +          },
   +          {
   +            "name": "voluntary",
   +            "type": {
   +              "type": "array",
   +              "items": {
   +                "type": "string",
   +                "avro.java.string": "String"
   +              }
   +            },
   +            "doc": "* List of voluntary claim names",
   +            "default": []
              }
            ],
            "version": "1.0.0"
          },
          "doc": "* Info about the requested claims"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #2012: HUDI-1129 Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r480250733



##########
File path: hudi-utilities/src/test/resources/delta-streamer-config/source_evoluted.avsc
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{

Review comment:
       rename this file as well? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaBasedSchemaProvider.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.avro.Schema;
+
+/**
+ * A simple schema provider, that returns schema provided in the constructor.
+ */
+public class SchemaBasedSchemaProvider extends SchemaProvider {

Review comment:
       rename this a bit. may be  `InMemorySchemaProvider` (we have ones that fetch from dfs or registry) 

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -1124,6 +1163,30 @@ public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Ro
     }
   }
 
+  /**
+   * Add new field evoluted_optional_union_field with value of the field rider.
+   */
+  public static class TripsWithEvolutedOptionalFieldTransformer implements Transformer {

Review comment:
       rename: TripsWithEvolvedOptional... ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r516082122



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @n3nash : This might require holistic look at how schema evolution is handled.
   
   As a last option before I let @n3nash decide on how to best take in this change, @sathyaprakashg : Since this is not a backwards compatible change in the true sense (underlying type is same), Can you try adding a additional where, we do a variant of https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkMergeHelper.java#L73 
   
   In HoodieAvroDataBlock: 
   1. Use genericReader with only old schema. This will avoid schema evolution handling.
   2. Create a genericWriter and writes the record back to bytes but written with the new (updated) schema
   3. then use genericReader (like 1) to read but use the updated schema 
   
   Can you see if this works around the issue ? If it does, then this needs to be a configuration controlled feature when reading records from log records.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
##########
@@ -39,13 +40,19 @@
    */
   protected final Comparable orderingVal;
 
+  /**
+   * Schema used to convert avro to bytes.
+   */
+  protected final Schema writerSchema;

Review comment:
       You can introduce another base class BaseAvroPayloadWithSchema which extends from BaseAvroPayload and stores the schema. This will be the base class for any new implementation which needs to store schema as part of pyload




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r492198773



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @sathyaprakashg : What happens to existing records in hudi dataset which have namespace for fixed fields ? 
   https://github.com/apache/hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeHelper.java#L70




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar closed pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
vinothchandar closed pull request #2012:
URL: https://github.com/apache/hudi/pull/2012


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on pull request #2012: HUDI-1129 Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-686046387


   Below issue was causing build failure and i fixed it now by implementing a method `HoodieAvroUtils.addNamespaceToFixedField`. Not sure whether this is the best way to fix this issue. So, let me know if any sugestions.
   
   Below is the height field defintion in `triprec` schema
   ``` 
     {
         "name": "height",
         "type": {
           "type": "fixed",
           "name": "abc",
           "size": 5,
           "logicalType": "decimal",
           "precision": 10,
           "scale": 6
         }
       }
   ```
   When we use org.apache.spark.sql.avro.SchemaConverters.toAvroType method in `AvroConversionHelper` to convert Spark data type to avro type, spark-avro gives name as fixed and also adds namespace to it as per code [here](https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177)
   ```
     {
         "name": "height",
         "type": {
           "type": "fixed",
           "name": "fixed",
           "namespace": "hoodie.source.hoodie_source.height",
           "size": 5,
           "logicalType": "decimal",
           "precision": 10,
           "scale": 6
         }
       }
   ```
   First schema is used as reader schema and second one is used as writer schema in `HoodieAvroUtils.bytesToAvro`. This results in below error.
   ```
   ERROR org.apache.hudi.io.HoodieWriteHandle  - Error writing record HoodieRecord{key=HoodieKey { recordKey=c597b0c2-6f07-4c8e-b239-2752b60e0449 partitionPath=default}, currentLocation='null', newLocation='null'}
   org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting abc
     at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
     at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
     at org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
     at org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
     at org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
     at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
     at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
     at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
     at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
     at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:142)
     at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:70)
     at org.apache.hudi.execution.LazyInsertIterable$HoodieInsertValueGenResult.<init>(LazyInsertIterable.java:92)
     at org.apache.hudi.execution.LazyInsertIterable.lambda$getTransformFunction$0(LazyInsertIterable.java:105)
     at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:170)
     at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
   ```
   In order to fix this issue, approach i took is change the field name from `abc` to `fixed` in all the schema used in test and also add namespace to reader schema if it exists in writer schema for fixed field.
   
   If we just rename the field from `abc` to `fixed` but don't add namespace, then we get below error
   `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #2012: HUDI-1129 Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-685269655


   @sathyaprakashg Looks good to me. Can you please see why the build is failing ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-847855344


   @sbernauer @giaosudau @dirksan28 @sathyaprakashg : There are quite a few flows or use-cases in general wrt schema evolution. Would you mind helping us explain your use-case. 
   
   Let me call out few of them : 
   1. Existing hudi table is in schema1 with 3 cols and you are trying to ingest new batch with schema2 with 4 cols. 
   2. Existing hudi table is in schema2 with 4 cols (after schema got evolved from schema1). new batch of ingest has records in old schema(schema1). 
   For both (1) and (2), there could be different flows in deltastreamer. 
   a. no transformer and no schema provider. 
   b. no transformer and user provides a schema provider with non null target schema.
   c. no transformer and user provides a schema provider with NULL target schema.
   d. has transformer and no schema provider. 
   e. has transformer and user provides a schema provider with non null target schema.
   f. has transformer and user provides a schema provider with NULL target schema.
   
   Can you call out if your use case is 1a or 2e etc. Patch we have put up solves most of the above use-cases, but we would like to better understand whats exactly your use-case is. And simple schema evolution of case 1b should already work in hudi w/o any fix. 
   If your use-case does not belong to any of the above categories, do help us explain so that we can work towards a fix. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r628721147



##########
File path: hudi-utilities/src/test/resources/delta-streamer-config/source_evolved.avsc
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "triprec",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "type" : "long"
+  }, {
+    "name" : "_row_key",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "type" : "string"
+  }, {
+    "name" : "driver",
+    "type" : "string"
+  }, {
+    "name" : "begin_lat",
+    "type" : "double"
+  }, {
+    "name" : "begin_lon",
+    "type" : "double"
+  }, {
+    "name" : "end_lat",
+    "type" : "double"
+  }, {
+    "name" : "end_lon",
+    "type" : "double"
+  }, {
+    "name" : "distance_in_meters",
+    "type" : "int"
+  }, {
+    "name" : "seconds_since_epoch",
+    "type" : "long"
+  }, {
+    "name" : "weight",
+    "type" : "float"
+  },{
+    "name" : "nation",
+    "type" : "bytes"
+  },{
+    "name" : "current_date",
+    "type" : {
+      "type" : "int",
+      "logicalType" : "date"
+      }
+  },{
+    "name" : "current_ts",
+    "type" : {
+      "type" : "long"
+      }
+  },{
+    "name" : "height",
+    "type" : {
+      "type" : "fixed",
+      "name" : "fixed",
+      "size" : 5,
+      "logicalType" : "decimal",
+      "precision" : 10,
+      "scale": 6
+      }
+  }, {
+    "name" :"city_to_state",
+    "type" : {
+      "type" : "map",
+      "values": "string"
+    }
+  },
+  {
+    "name" : "fare",
+    "type" : {
+      "type" : "record",
+      "name" : "fare",
+      "fields" : [
+        {
+         "name" : "amount",
+         "type" : "double"
+        },
+        {
+         "name" : "currency",
+         "type" : "string"
+        }
+      ]
+    }
+  },
+  {

Review comment:
       Please refer [this comment](https://github.com/apache/hudi/pull/2012#issuecomment-679910464) 
   
   I think it is handle the case that even if the new field is not always at the end, it should still work. 
   
   cc @sbernauer




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on pull request #2012: HUDI-1129 Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-684687937


   @vinothchandar I updated the code to use reader and writer schema for avro payload. Please review it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-751283111


   @n3nash : Looks like you folks(you and Balaji) have lot of context wrt this patch. I am afraid if I review this, might miss some corner cases or might have to anyways wait for your review. Can you take this up and get it landed for upcoming release. 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-869215376


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f7e53eb22bf7a1e109ba258c7ac21d983d807738",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f7e53eb22bf7a1e109ba258c7ac21d983d807738",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f7e53eb22bf7a1e109ba258c7ac21d983d807738 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r485009679



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       How many places is this method being used ? Is adding namespace to the reader schema backwards compatible for older data/schema written without the namespace ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-869215376


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f7e53eb22bf7a1e109ba258c7ac21d983d807738",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=470",
       "triggerID" : "f7e53eb22bf7a1e109ba258c7ac21d983d807738",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f7e53eb22bf7a1e109ba258c7ac21d983d807738 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=470) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sbernauer commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-847796132


   Sure @nsivabalan i will try out the fix in https://github.com/apache/hudi/pull/2927 and give feedback.
   Thanks for the invitation for slack, i appreciate! My memberId is U022VCGBVLM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-835222588


   > > @sathyaprakashg and others: trying to understand the use-case here. I understand its related to deltastreamer receiving events in old schema after Hudi's dataset schema got evolved. what's the schema from schema provider when source is producing events in old schema (after schema got evolved w/ hudi dataset)? if the schema provider's schema is updated, I guess there is no need to store the writer schema w/ payload.
   > > AvroConversionUtils.createDataFrame() will ensure to convert the JavaRDD w/ old schema to Dataset w/ new schema if schemaProvider.SourceSchema() has the evolved schema.
   
   Issue with schema evoluation happens in `HoodieAvroUtils.avroToBytes` and `HoodieAvroUtils.bytesToAvro`. Let us consider a scenario where there are two versions of schema in schema regsitry and in the 2nd (latest) version, there is a new field added. But data is stilling coming with schema version 1. 
   
   `HoodieAvroUtils.avroToBytes` uses schema part of data (i.e v1 schema) to convert avro to bytes. `HoodieAvroUtils.bytesToAvro` uses the latest schema  (v2)  from schema registryto convert the bytes to avro. This will fail because v1 schema was used to convert to bytes, but v2 schema is being used to convert bytes back to avro. 
   
   In order to solve this, we need both v1 (writer schema) and v2 (reader schema) to convert bytes back to avro. We can get v2 schema from schema registry, but to get v1 schema, we were trying to store the writer schema part of the payload itself.
   
   Please let me know if it is still not very clear


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sbernauer commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-847848890


   No problem. It is bernauerse@web.de


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] dirksan28 edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
dirksan28 edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834091378


   I've been observing this issue for several months now. From my point of view working schema evolution is highly fundamental for the meaningful use of hudi. I am amazed that this is not classified as a mission critical. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on pull request #2012: HUDI-1129 Deltastreamer Add support for schema evaluation

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-680502038


   Thanks @sbernauer for the code example. I fixed it now


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-869777243


   From my sync up with @sbernauer, https://github.com/apache/hudi/pull/2927 and https://github.com/apache/hudi/pull/3111 are required. 3111 is already merged, and 2927 is up for review. 
   We can close this one though 2012 as 2927 is a replacement. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834436516


   @sathyaprakashg and others: trying to understand the use-case here. I understand its related to deltastreamer receiving events in old schema after Hudi's dataset schema got evolved. what's the schema from schema provider when source is producing events in old schema (after schema got evolved w/ hudi dataset)? if the schema provider's schema is updated, I guess there is no need to store the writer schema w/ payload. 
   AvroConversionUtils.createDataFrame() will ensure to convert the JavaRDD<GenRec> w/ old schema to Dataset<Row> w/ new schema if schemaProvider.SourceSchema() has the evolved schema. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-851585795


   @sbernauer : based on our offline sync up. I guess you are explicitly setting target schema to old schema even though Hudi's table schema got evolved to new schema. not sure if hudi can do much here. If you hadn't set any target schema or set to RowBased, hudi could try to evolve your old records to latest table schema. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r487415553



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash This is the execption we get when we don't add namespace to reader schema. `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`. 
   
   This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-688954187


   @sathyaprakashg Thanks for looking into this. I see that the `org.apache.spark.sql.avro.SchemaConverters` uses the `fixed` name so it's difficult to workaround it. Your approach sounds fine to me as long as it does not break any existing tests.  Left a couple of comments for changes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-869215376


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f7e53eb22bf7a1e109ba258c7ac21d983d807738",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=470",
       "triggerID" : "f7e53eb22bf7a1e109ba258c7ac21d983d807738",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f7e53eb22bf7a1e109ba258c7ac21d983d807738 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=470) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on pull request #2012: HUDI-1129 Deltastreamer Add support for schema evaluation

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-678806335


   @bvaradar @sbernauer 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sbernauer commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-823214232


   @sathyaprakashg @n3nash and others thanks for your work! I have rebased the commit for the current master and resolved all the conflicts here https://github.com/sbernauer/hudi/commit/b383883742ad63899fa43584ab7a10cd72d533fe
   @sathyaprakashg this may help you while rebasing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] giaosudau commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
giaosudau commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r543093343



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       it didn't work with table with more than **2 fixed type field.**
   
   ```{
         "name": "fixed_type_field",
         "type": {
           "type": "fixed",
           "name": "fixed",
           "namespace": "hoodie.source.hoodie_source.fixed_type_field",
           "size": 7,
           "logicalType": "decimal",
           "precision": 15,
           "scale": 8
         }
       }
   ```
   ```
   Exception in thread "main" org.apache.avro.SchemaParseException: Can't redefine: fixed
   	at org.apache.avro.Schema$Names.put(Schema.java:1128)
   	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
   	at org.apache.avro.Schema$FixedSchema.toJson(Schema.java:907)
   	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
   	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
   	at org.apache.avro.Schema.toString(Schema.java:324)
   	at org.apache.hudi.utilities.schema.DebeziumSchemaRegistryProvider.getTargetSchema(DebeziumSchemaRegistryProvider.java:77)
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834312959


   > I spent sometime to understand this PR. thanks for putting it up @sathyaprakashg. I have few clarifications.
   > 
   > 1. Can you fix the description wrt latest status. I don't see SchemaBasedSchemaProvider etc.
   > 2. FYI We landed a [fix](https://github.com/apache/hudi/pull/2765) wrt default vals and null in unions. If incase, the schema post processing is not required at all w/ this fix, it would simplify things. Guess the namespace fix in this PR may not be required if the post processing step is not required. @bvaradar @n3nash : can you folks chime in here please. another related [fixed datatype jira](https://issues.apache.org/jira/browse/HUDI-1607). the backwards incompatibility may not be an issue if we go this route. 
   > 3. Also, I pulled the test locally and was trying to verify things. Looks like the test is not generating records as intended in 3rd step. Here is what is happening.
   >    
   >    * TestDataSource generates data w/ intended schema(old)
   >    * But in SourceFormatAdapter, when we do AvroConversionUtils.createDataFrame(...), evolved schema is passed in. and so InputBatch<Dataset> returned from here has new column set to null for all records.
   >    * I also verified this from within the IdentityTransformer which was showing evolved schema and record having new column as well.
   >      so, essentially the test also need to be fixed.
   
   @vinothchandar : We need to iron out the perf issue. But these were my comments earlier. it could simplify the backwards compatibility issue which was being discussed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-846037950


   @sbernauer : We would appreciate if you can try out our fix https://github.com/apache/hudi/pull/2927 and let us know if it work for you. We can sync up via slack if you are available so that we can have quick turn around or clarify any doubts you have along the way. If you are open, let us know your slackId, we can create a group and go about. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-825077766


   I spent sometime to understand this PR. thanks for putting it up @sathyaprakashg. I have few clarifications. 
   
   1. Can you fix the description wrt latest status. I don't see SchemaBasedSchemaProvider etc. 
   2. FYI We landed a [fix](SchemaBasedSchemaProvider) wrt default vals and null in unions. If incase, the schema post processing is not required at all, it would simplify things. Guess the namespace fix in this PR may not be required. @bvaradar @n3nash . [fixed datatype jira](https://issues.apache.org/jira/browse/HUDI-1607).
   3.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sbernauer edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-847940734


   Hi @nsivabalan,
   
   we have multiple schema versions of the events we consume. We use kafka and Confluent Schema Registry. I think all the events in kafka are written with schema version 9.
   My testcase would be to read some Events with schema version 8, switch to schema version 9 and consume some evolved Events. We use a COW Table and INSERTs only (with dropping of duplicates) and no transformation (for most of our applications).
   
   With the patch in https://github.com/apache/hudi/pull/2927 starting from an empty directory the ingestion throws this exception in the executors. Reading with schema version 9 works fine.
   
   ```
   schemaRegistryUrl: https://eventbus-schema-bs-qa.server.lan/subjects/MyEvent-v1/versions/8
   # Sets
   --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
   # and
   curl --silent $SCHEMA_REGISTRY_URL | jq -r -c '.schema' | jq '.' > /tmp/schema_source.json
   cp /tmp/schema_source.json /tmp/schema_target.json
   
   21/05/25 14:45:55 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=sourceEventHeader.happenedTimestamp:1621953763077,sourceEventHeader.eventId:143d1259-01c2-4346-a3c4-85b2e3325ff3 partitionPath=2021/05/25}, currentLocation='null', newLocation='null'}
    java.lang.ArrayIndexOutOfBoundsException: 22
           at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:424)
           at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
           at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
           at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
           at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
           at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
           at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:136)
           at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:126)
           at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:69)
           at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:88)
           at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:101)
           at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
           at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
           at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   The schema difference. The field is nested multiple times.
   ```
   $ curl https://eventbus-schema-bs-qa.server.lan/subjects/MyEvent-v1/versions/8 | jq -r '.schema' | jq > 8
   $ curl https://eventbus-schema-bs-qa.server.lan/subjects/MyEvent-v1/versions/9 | jq -r '.schema' | jq > 9
   
   $ diff -U 5 8 9
   --- 8   2021-05-25 16:51:21.416603077 +0200
   +++ 9   2021-05-25 16:51:25.072629744 +0200
   @@ -326,10 +326,22 @@
                    "type": "string",
                    "avro.java.string": "String"
                  }
                },
                "doc": "* List of optional claim names"
   +          },
   +          {
   +            "name": "voluntary",
   +            "type": {
   +              "type": "array",
   +              "items": {
   +                "type": "string",
   +                "avro.java.string": "String"
   +              }
   +            },
   +            "doc": "* List of voluntary claim names",
   +            "default": []
              }
            ],
            "version": "1.0.0"
          },
          "doc": "* Info about the requested claims"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-896331643


   can we close this, given #2927 is landed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r512905076



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @bvaradar @n3nash @pratyakshsharma  Just checking to see if you guys had chance to review my last comment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r487165310



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @sathyaprakashg But like you mentioned, if you don't add namespace, you see the type mismatch exception, right ? How are you planning to avoid adding namespace but still not run into the type exception ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r485009679



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       How many places is this method being used ? Is adding namespace to the reader schema backwards compatible for data/schema written without the namespace ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r487415553



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash This is the execption we get when we don't add namespace to reader schema. `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`. 
   
   This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] pratyakshsharma commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-689476777


   Lagging a bit, will circle back on this. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-838077879


   @n3nash `BaseAvroPayloadWithSchema` is this still needed, which keeps the schema in each record?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r492198773



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @sathyaprakashg : What happens to existing records in hudi dataset which have namespace for fixed fields ? 
   https://github.com/apache/hudi/blob/master/hudi-client/src/main/java/org/apache/hudi/table/action/commit/MergeHelper.java#L70




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-694081153


   @n3nash Changes are done. Please review it when you get time


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] giaosudau commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
giaosudau commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r543093343



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       it didn't work with table with more than **2 fixed type field.**
   
   ```{
         "name": "fixed_type_field",
         "type": {
           "type": "fixed",
           "name": "fixed",
           "namespace": "hoodie.source.hoodie_source.fixed_type_field",
           "size": 7,
           "logicalType": "decimal",
           "precision": 15,
           "scale": 8
         }
       },
     "name": "fixed_type_field2",
         "type": {
           "type": "fixed",
           "name": "fixed",
           "namespace": "hoodie.source.hoodie_source.fixed_type_field2",
           "size": 7,
           "logicalType": "decimal",
           "precision": 15,
           "scale": 8
         }
       }
   ```
   ```
   Exception in thread "main" org.apache.avro.SchemaParseException: Can't redefine: fixed
   	at org.apache.avro.Schema$Names.put(Schema.java:1128)
   	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
   	at org.apache.avro.Schema$FixedSchema.toJson(Schema.java:907)
   	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
   	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
   	at org.apache.avro.Schema.toString(Schema.java:324)
   	at org.apache.hudi.utilities.schema.DebeziumSchemaRegistryProvider.getTargetSchema(DebeziumSchemaRegistryProvider.java:77)
   
   ```
   
   with namespace it has another error 
   ```
   Caused by: java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to org.apache.avro.generic.GenericFixed
   	at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$5(AvroConversionHelper.scala:98)
   	at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:173)
   	at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:173)
   	at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:43)
   ```
   
   Fixed by 
   ```
   case (d: DecimalType, FIXED) =>
             (item: AnyRef) =>
               if (item == null) {
                 null
               } else {
                 val decimalConversion = new DecimalConversion
   //              val bigDecimal = decimalConversion.fromFixed(item.asInstanceOf[GenericFixed], avroSchema,
   //                LogicalTypes.decimal(d.precision, d.scale))
   //              createDecimal(bigDecimal, d.precision, d.scale)
   
                 val bigDecimal = decimalConversion.fromBytes(item.asInstanceOf[ByteBuffer], avroSchema,
                   LogicalTypes.decimal(d.precision, d.scale))
                 createDecimal(bigDecimal, d.precision, d.scale)
   
               }
   ``` using the `case (d: DecimalType, BYTES) =>`
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r487415553



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash This is the execption we get when we don't add namespace to reader schema. `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`. 
   
   This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash This is the execption we get when we don't add namespace to reader schema. `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`. 
   
   This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash This is the execption we get when we don't add namespace to reader schema. `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`. 
   
   This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sbernauer commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834085407


   Hi together,
   
   we sadly can't do schema evolution for 10 months now (https://github.com/apache/hudi/issues/1845) and have to rely on ugly workarounds.
   Many thanks for working together to find a solution!
   We have tested this patch out in our test systems and everything worked fine. When we rolled it out to production we noticed that the Memory consumption increased by multiple times. This caused our executors to spill to disk and crash.
   So i would like to highlight the comment of @sathyaprakashg
   > @n3nash I am working on fixing build issue and will have that fix pushed soon. I would like to point out that with this new approach, we are stroing writer schema part of payload, which means, size of dataframe would increase to store same schema information with each record. Any suggestion on optimizing this?
   
   Regards,
   Sebastian


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] hudi-bot edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
hudi-bot edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-869215376


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f7e53eb22bf7a1e109ba258c7ac21d983d807738",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=470",
       "triggerID" : "f7e53eb22bf7a1e109ba258c7ac21d983d807738",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f7e53eb22bf7a1e109ba258c7ac21d983d807738 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=470) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run travis` re-run the last Travis build
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-835429596


   yes, thanks for clarifying. I guess, embedding schema in every payload might be detrimental as you have experienced. So, have thought of a diff approach to regenerate records w/ new schema at spark datasource layer. Only the batch that is getting ingested w/ old schema after table's schema got evolved will take a hit with this conversion. 
   
   https://github.com/apache/hudi/pull/2927
   
   Also, as I have mentioned earlier, if others (@n3nash , @bvaradar ) confirm that schema post processor is not required as a mandatory step with this [fix](https://github.com/apache/hudi/pull/2765) for default vals, we don't need any changes in delta streamer as such, just https://github.com/apache/hudi/pull/2927 would suffice. 
   
   @n3nash is doing more testing around this as well. So, will wait for him to comment on the patch as well. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sbernauer commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r629070078



##########
File path: hudi-utilities/src/test/resources/delta-streamer-config/source_evolved.avsc
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "triprec",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "type" : "long"
+  }, {
+    "name" : "_row_key",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "type" : "string"
+  }, {
+    "name" : "driver",
+    "type" : "string"
+  }, {
+    "name" : "begin_lat",
+    "type" : "double"
+  }, {
+    "name" : "begin_lon",
+    "type" : "double"
+  }, {
+    "name" : "end_lat",
+    "type" : "double"
+  }, {
+    "name" : "end_lon",
+    "type" : "double"
+  }, {
+    "name" : "distance_in_meters",
+    "type" : "int"
+  }, {
+    "name" : "seconds_since_epoch",
+    "type" : "long"
+  }, {
+    "name" : "weight",
+    "type" : "float"
+  },{
+    "name" : "nation",
+    "type" : "bytes"
+  },{
+    "name" : "current_date",
+    "type" : {
+      "type" : "int",
+      "logicalType" : "date"
+      }
+  },{
+    "name" : "current_ts",
+    "type" : {
+      "type" : "long"
+      }
+  },{
+    "name" : "height",
+    "type" : {
+      "type" : "fixed",
+      "name" : "fixed",
+      "size" : 5,
+      "logicalType" : "decimal",
+      "precision" : 10,
+      "scale": 6
+      }
+  }, {
+    "name" :"city_to_state",
+    "type" : {
+      "type" : "map",
+      "values": "string"
+    }
+  },
+  {
+    "name" : "fare",
+    "type" : {
+      "type" : "record",
+      "name" : "fare",
+      "fields" : [
+        {
+         "name" : "amount",
+         "type" : "double"
+        },
+        {
+         "name" : "currency",
+         "type" : "string"
+        }
+      ]
+    }
+  },
+  {

Review comment:
       Yes exactly @sathyaprakashg




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-838539415


   @vinothchandar : we have raised another PR which fixes this use-case https://github.com/apache/hudi/pull/2927. We don't need to store the schema with payload. Feel free to review the fix.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r497257812



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -364,4 +366,40 @@ object AvroConversionHelper {
         }
     }
   }
+
+  /**
+   * Remove namespace from fixed field.
+   * org.apache.spark.sql.avro.SchemaConverters.toAvroType method adds namespace to fixed avro field
+   * https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177
+   * So, we need to remove that namespace so that reader schema without namespace do not throw erorr like this one
+   * org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
+   *
+   * @param schema Schema from which namespace needs to be removed for fixed fields
+   * @return input schema with namespace removed for fixed fields, if any
+   */
+  def removeNamespaceFromFixedFields(schema: Schema): Schema  ={

Review comment:
       @bvaradar 
   
   In delta streamer, currently, we have below three flows
   1) [No transformation](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335)
   2) [Transformation with userProvidedSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L315)
   3) [Transformation without userProvidedSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L323)
   
   Only schema converted from spark data type to avro schema has this namespace added to fixed fields. In delta streamer, currently we use user provided schema ([userProvidedSchemaProvider.targetSchema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L568)) to convert [bytes to avro](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L126), except for thrid flow (Transformation without userProvidedSchema). In such case, we [derive schema](https://github.com/apache/hudi/blob/a99e93bed542c8ae30a641d1df616cc2cd5798e1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L327) from spark data type. So, backward compatible issue arises only when we use transformer and no user provided schema.
   
   Below is example of avro fixed field with and without namespace.
   
   `{"name":"height","type":{"type":"fixed","name":"fixed","size":5,"logicalType":"decimal","precision":10,"scale":6}}`
   
   `{"name":"height","type":{"type":"fixed","name":"fixed","namespace":"hoodie.source.hoodie_source.height","size":5,"logicalType":"decimal","precision":10,"scale":6}}`
   
   Both of these result in same parquet schema
   `required fixed_len_byte_array(5) height (DECIMAL(10,6));`
   
   As we can see here, namespace in fixed field does not seem to have any impact on parquet schema. So, may be HoodieFileReader in MergeHelper file you referred shouldn't have any issue?  
   
   In general, it looks parquet file in existing hudi dataset would not have issue. I tested in COPY ON WRITE table type and couldn't see any issue. But in case of MERGE ON READ table, I could see issue for thrid flow (Transformation without userProvidedSchema). Below is the stack trace.
   
   ```
   51511 [Executor task launch worker for task 502] ERROR org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner  - Got exception when reading log file
   org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed
   	at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
   	at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
   	at org.apache.avro.io.ValidatingDecoder.checkFixed(ValidatingDecoder.java:135)
   	at org.apache.avro.io.ValidatingDecoder.readFixed(ValidatingDecoder.java:146)
   	at org.apache.avro.generic.GenericDatumReader.readFixed(GenericDatumReader.java:342)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
   	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
   	at org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.deserializeRecords(HoodieAvroDataBlock.java:157)
   	at org.apache.hudi.common.table.log.block.HoodieDataBlock.createRecordsFromContentBytes(HoodieDataBlock.java:128)
   	at org.apache.hudi.common.table.log.block.HoodieDataBlock.getRecords(HoodieDataBlock.java:106)
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processDataBlock(AbstractHoodieLogRecordScanner.java:275)
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.processQueuedBlocksForInstant(AbstractHoodieLogRecordScanner.java:308)
   	at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:241)
   	at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
   	at org.apache.hudi.HoodieMergeOnReadRDD$.scanLog(HoodieMergeOnReadRDD.scala:259)
   	at org.apache.hudi.HoodieMergeOnReadRDD$$anon$2.<init>(HoodieMergeOnReadRDD.scala:164)
   	at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:154)
   	at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:67)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
   	at org.apache.spark.scheduler.Task.run(Task.scala:123)
   	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   
   In summary, it looks third flow (Transformation without userProvidedSchema) produces different output schema in log file when compared to two other flows if there are fixed fields and this means if we want to change from thrid flow to say first flow (by removing transformation), then we already have problem since log files in MERGE ON READ table will have different schema, if there are fixed fields. This PR may cause backward compatible issue for thrid flow but would make sure, we produce same schema regardless of which flow we use.
   
   Incase if you have better suggestion to make this work without causing issue in existing dataset for third flow, please let me know, happy to update PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r519230695



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
##########
@@ -39,13 +40,19 @@
    */
   protected final Comparable orderingVal;
 
+  /**
+   * Schema used to convert avro to bytes.
+   */
+  protected final Schema writerSchema;

Review comment:
       Thanks. I have updated it now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-856887968


   Just to update everyone in the thread. 
   Regular schema evolution (adding new columns) does not have any issues w/ deltastreamer. This is not so common use-case where in, once hudi's schema got evolved, users wish to route records w/ older schema and expects hudi to handle them. The [patch](https://github.com/apache/hudi/pull/2927) that have been put up addresses this particular issue. Wanted to clarify with everyone that basic schema evolution is working as expected with deltastreamer. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-847855344


   @sbernauer @giaosudau @dirksan28 @sathyaprakashg : There are quite a few flows or use-cases in general wrt schema evolution. Would you mind helping us explain your use-case. 
   
   Let me call out few of them : 
   1. Existing hudi table is in schema1 with 3 cols and you are trying to ingest new batch with schema2 with 4 cols. 
   2. Existing hudi table is in schema2 with 4 cols (after schema got evolved from schema1). new batch of ingest has records in old schema(schema1). 
   For both (1) and (2), there could be different flows in deltastreamer. 
   a. no transformer and no schema provider. 
   b. no transformer and user provides a schema provider with non null target schema.
   c. no transformer and user provides a schema provider with NULL target schema.
   d. has transformer and no schema provider. 
   e. has transformer and user provides a schema provider with non null target schema.
   f. has transformer and user provides a schema provider with NULL target schema.
   
   Can you call out if your use case is 1a or 2e etc. Patch we have put up solves most of the above use-cases, but we would like to better understand whats exactly your use-case is. And simple schema evolution of cases of 1a, 1b should already work in hudi w/o any fix. 
   If your use-case does not belong to any of the above categories, do help us explain so that we can work towards a fix. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] dirksan28 commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
dirksan28 commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834091378


   I've been observing this issue for several months now. From my point of view that is a big problem for the meaningful use of hudi. I am amazed that this is not classified as a mission critical. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] giaosudau commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
giaosudau commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-834140226


   Yes I am following this PR for the last 6 months. 
   When I presented to my teammate I thought it works out of the box but it didn't. Kind of sad.
   Please help put more focus on it.
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-825077766


   I spent sometime to understand this PR. thanks for putting it up @sathyaprakashg. I have few clarifications. 
   
   1. Can you fix the description wrt latest status. I don't see SchemaBasedSchemaProvider etc. 
   2. FYI We landed a [fix](SchemaBasedSchemaProvider) wrt default vals and null in unions. If incase, the schema post processing is not required at all w/ this fix, it would simplify things. Guess the namespace fix in this PR may not be required if the post processing step is not required. @bvaradar @n3nash : can you folks chime in here please. [fixed datatype jira](https://issues.apache.org/jira/browse/HUDI-1607).
   3. Also, I pulled the test locally and was trying to verify things. Looks like the test is not generating records as intended in 3rd step. Here is what is happening. 
       - TestDataSource generates data w/ intended schema(old)
       - But in SourceFormatAdapter, when we do AvroConversionUtils.createDataFrame(...), evolved schema is passed in. and so InputBatch<Dataset<Row>> returned from here has new column set to null for all records. 
       - I also verified this from within the IdentityTransformer which was showing evolved schema and record having new column as well. 
   so, essentially the test also need to be fixed. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r628720757



##########
File path: hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
##########
@@ -70,7 +70,7 @@
     "name" : "height",
     "type" : {
       "type" : "fixed",
-      "name" : "abc",

Review comment:
       Because when we convert Spark schema to avro schema [here](https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala#L177), it always name it as `fixed`. If we name it different, then this avro schema and avro schema created from spark schema would not match




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on a change in pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on a change in pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#discussion_r487415553



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash This is the execption we get when we don't add namespace to reader schema. `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`. 
   
   This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash This is the execption we get when we don't add namespace to reader schema. `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`. 
   
   This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash This is the execption we get when we don't add namespace to reader schema. `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`. 
   
   This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash This is the execption we get when we don't add namespace to reader schema. `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`. 
   
   This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -127,12 +128,59 @@ public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOEx
    * Convert serialized bytes back into avro record.
    */
   public static GenericRecord bytesToAvro(byte[] bytes, Schema writerSchema, Schema readerSchema) throws IOException {

Review comment:
       @n3nash This is the execption we get when we don't add namespace to reader schema. `org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source.height.fixed, expecting fixed`. 
   
   This exception is raised because we have namespace in writer schema and not in reader schema. As per my proposed approach above, if we remove the namespace from writer schema, then we wouldn't get type exception, because now namespace does not exist in fixed fields for both writer and reader schema




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2012: [HUDI-1129] Deltastreamer Add support for schema evolution

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-869215210


   @nsivabalan whats the next step for this pr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sbernauer commented on pull request #2012: HUDI-1129 Deltastreamer Add support for schema evaluation

Posted by GitBox <gi...@apache.org>.
sbernauer commented on pull request #2012:
URL: https://github.com/apache/hudi/pull/2012#issuecomment-679910464


   Hi @sathyaprakashg, thanks for your work!
   
   When i move the new field `evoluted_optional_union_field` to a place not at the end of the schema (somewhere in the middle) i get the following exception:
   `java.lang.ClassCastException: java.lang.Boolean cannot be cast to java.lang.Iterable.`
   This makes sense, since now the ids of the fields dont line up any more. So reducing the length to the minimum is not sufficient here.
   
   I suggest using the field names instead of the ids (which dont match up any more after a new field in the middle). See https://github.com/sbernauer/hudi/commit/1adbc7bddac431bb060880efab2d3979840765da#diff-3c046573a91f36ba0f12dad0e3395dc9R139
   
   Should i open a PR for this change or do you want to modify yours?
   
   Cheers,
   Sebastian


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org