You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/08/16 20:25:20 UTC

[GitHub] [hudi] sathyaprakashg opened a new issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

sathyaprakashg opened a new issue #1971:
URL: https://github.com/apache/hudi/issues/1971


   Hi, I am facing below issues in deltastreamer due to schema evoluation in kafka. In schema registry, version 1 has 52 attributes. We added 2 additional attributes with default values in version 2. But, our producer is still sending events using version 1 schema id.
   
   ### Problem 1
   I ran the deltastreamer with this config `hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.url.net/subjects/topicname-value/versions/latest` and got below error
   
   ```
   20/08/16 12:35:05 ERROR io.HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=12345 partitionPath=Jan}, currentLocation='null', newLocation='null'}
   java.io.EOFException
           at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
           at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
           at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
           at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
           at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
           at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
           at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
           at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:119)
           at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:70)
           at org.apache.hudi.execution.LazyInsertIterable$HoodieInsertValueGenResult.<init>(LazyInsertIterable.java:92)
           at org.apache.hudi.execution.LazyInsertIterable.lambda$getTransformFunction$0(LazyInsertIterable.java:105)
           at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:170)
           at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
           at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   I debugged hudi code and found that RDD generic record's schema has 52 fields only since producer is still producing using version 1. RDD' schema is used to convert avro bytest in HoodieAvroUtils.avroToBytes.
   
   But to convert them back from bytes to avro HoodieAvroUtils.bytesToAvro method is using schema provided in property file. Since we specified latest version in `hoodie.deltastreamer.schemaprovider.registry.url` and this schema is not same as one used to convert to bytes, we are getting above error.  
   
   I noticed there is `targetUrl` property and i specified the version 1 and job started running fine. But problem is here when producer start using latest version to publish events, then we have to manually change the version in targetURL. My suggestion is whether we can use rdd schema for both HoodieAvroUtils.avroToBytes and HoodieAvroUtils.bytesToAvro, so that we can be sure same schema is used for both write and read?
   
   `hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.url.net/subjects/topicname-value/versions/latest`
   `hoodie.deltastreamer.schemaprovider.registry.targetUrl=https://schema-registry.url.net/subjects/topicname-value/versions/1`
   
   ### Problem 2
   After setting targetURL property as mentioned above, hudi delta streamer was working fine. We tried to add transformation by passing `--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer`
   
   ```
   java.lang.ArrayIndexOutOfBoundsException: 52
           at org.apache.avro.generic.GenericData$Record.get(GenericData.java:212)
           at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:170)
           at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:69)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
           at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
           at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
           at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
           at scala.collection.Iterator.isEmpty(Iterator.scala:385)
           at scala.collection.Iterator.isEmpty$(Iterator.scala:385)
           at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1429)
           at org.apache.hudi.AvroConversionUtils$.$anonfun$createRdd$2(AvroConversionUtils.scala:47)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:127)
           at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   This is because we have 54 converters in [AvroConversionHelper](https://github.com/apache/hudi/blob/4226d7514400d86761e39639e9554809b72b627c/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala#L170) as per latest schema but only 52 fields in RDD record. 
   
   If we set `hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry.url.net/subjects/topicname-value/versions/1`, then it works fine.
   
   So, in general, is it possible to set latest schema in property file and make delta streamer without issue or even better if we don't need to specify schema registry url in property file, instead we can use schema from the rdd?
   
   **Environment Description**
   
   * Hudi version : 0.5.3 & 0.6.0 rc1
   
   * Spark version : 2.4.4
   
   * Running on Docker? (yes/no) : no
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-767208636


   @jingweiz2017 : can you please check above response and let us know if you need anything more from Hudi community. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan closed issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
nsivabalan closed issue #1971:
URL: https://github.com/apache/hudi/issues/1971


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] poiyyq commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
poiyyq commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-678086882


   Hi, I face the same problem with you.    
   
   this exception happened on "reader.read(null, decoder);"
   
   Convert serialized bytes back into avro record,  it's a bug ? how to fix it?
   `
     /**
      * Convert serialized bytes back into avro record.
      */
     public static GenericRecord bytesToAvro(byte[] bytes, Schema schema) throws IOException {
       BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, reuseDecoder.get());
       reuseDecoder.set(decoder);
       GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
       return reader.read(null, decoder);
     }`
   
   `
   581408 [pool-17-thread-2] ERROR org.apache.hudi.io.HoodieWriteHandle  - Error writing record HoodieRecord{key=HoodieKey { recordKey=10001 partitionPath=driver001}, currentLocation='null', newLocation='null'}
   java.io.EOFException
   	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
   	at org.apache.avro.io.BinaryDecoder.readDouble(BinaryDecoder.java:243)
   	at org.apache.avro.io.ResolvingDecoder.readDouble(ResolvingDecoder.java:190)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:186)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
   	at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
   	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
   	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
   	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
   	at org.apache.hudi.common.util.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:89)
   	at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:80)
   	at org.apache.hudi.execution.CopyOnWriteLazyInsertIterable$HoodieInsertValueGenResult.<init>(CopyOnWriteLazyInsertIterable.java:74)
   	at org.apache.hudi.execution.CopyOnWriteLazyInsertIterable.lambda$getTransformFunction$0(CopyOnWriteLazyInsertIterable.java:87)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:170)
   	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
   	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
   	at java.util.concurrent.FutureTask.run(FutureTask.java)
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
   	at java.util.concurrent.FutureTask.run(FutureTask.java)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   	at java.lang.Thread.run(Thread.java:748)
   `


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jingweiz2017 commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
jingweiz2017 commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-767242422


   @nsivabalan @bvaradar , thanks for the reply. The commit mentioned by bvaradar should work for me case. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-739735068


   @jingweiz2017 :  I believe the recent commit https://github.com/apache/hudi/commit/62b392b49c13455199e0372204dedf8a371b452c  addresses the issue with transformers and null ordering. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-674950044


   thanks for reporting this. I have filed a ticket https://issues.apache.org/jira/browse/HUDI-1195 
   @bvaradar @bhasudha : fyi. 
   @sathyaprakashg : would you be interested in putting in a fix for this. Either ways let's get the idea/fix stamped from @bvaradar or @bhasudha . 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-675072299


   @nsivabalan Happy to work on fix once we get some suggestion from @bvaradar or @bhasudha. Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] sathyaprakashg commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
sathyaprakashg commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-678117088


   `java.io.EOFException` error means, schema you provided has more fields than fields available in RDD/Dataset. If your source is also kafka, then find out which schema version is used by kafka producer and use that schema version as workaround in `hoodie.deltastreamer.schemaprovider.registry.url` instead of latest schema.
   
   I think solution is try to use the same schema (get it from RDD/Dataset) to convert avro to bytes and to convert bytes back to avro. Once we have suggestion from @bvaradar or @bhasudha, I will see if i can create PR to get it fixed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-767568399


   @sathyaprakashg : since we have liras open, closed this out. If you have any other questions/asks, feel free to open a new one. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jingweiz2017 commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
jingweiz2017 commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-737794951


   hello, [#1972 ](https://github.com/apache/hudi/issues/1972) brought me here. Since, that one has been closed, I want to bring up my question here. Hope someone could help to answer. It seems the conclusion for #1972's solution is to support schema evolution. But, to me, #1972 is more about for the symbol "union" where should the "null" to be presented if any. As for avro 1.9.2, it requires null to be the first schema in union for nullability. Would it be better for the spark team to upgrade their support to avro 1.9.2 ? and then in turn they would change method: toAvroType in SchemaConverters to put nullSchema at the front while creating union? To me, this looks more like a rule breaking rather than schema evolution. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] poiyyq commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
poiyyq commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-678131112


   you're right. I run the demo program but failed. 
   
   the target schema fields is more than source schema fields.    After I delete the "haversine_distance" field, it runs ok.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-767208636


   @jingweiz2017 : can you please check above response and let us know if you need anything more from Hudi community. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jingweiz2017 commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
jingweiz2017 commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-767242422


   @nsivabalan @bvaradar , thanks for the reply. The commit mentioned by bvaradar should work for me case. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
bvaradar commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-678251962


   @sathyaprakashg : Sorry for the delay in responding. This was an earlier reported issue and we had suggested one workaround here : https://github.com/apache/hudi/issues/1845#issuecomment-665180775
   
   There are couple of jiras filed on this : https://issues.apache.org/jira/browse/HUDI-1128 and https://issues.apache.org/jira/browse/HUDI-1129  
   
   Regarding your approach to fixing bytesToAvro -> avroToBytes, it is a tricky change as the implementation could break the public API contructor of the payload but if it works, lets open a PR and discuss on how to best fix it.
   
   Regarding the issue https://issues.apache.org/jira/browse/HUDI-1129, we would need to fix AvroConversionUtils (which is mostly from spark-avro) to handle the case when there are fewer fields.  Is this something you can take a look at and see if this can be fixed ?  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on issue #1971: Schema evoluation causes issue when using kafka source in hudi deltastreamer

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #1971:
URL: https://github.com/apache/hudi/issues/1971#issuecomment-767567967


   thanks. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org