You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/18 23:57:31 UTC

[GitHub] [hudi] jkdll opened a new issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

jkdll opened a new issue #3113:
URL: https://github.com/apache/hudi/issues/3113


   **Problem**
   
   I am running deltastreamer (spark submit below) with schema registry provider `SchemaRegistryProvider` and source class `AvroKafkaSource`. I am reading a schema which contains UNION Avro types (sample below). While running, the deltastreamer seems to not be capable of reading UNION types which contain Nulls, with the error: `ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: Unsupported type NULL`.
   ```
   spark-submit \
   --master yarn \
   --deploy-mode cluster \
   --files "/home/workspace/configs/stage/*" \
   --packages org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:2.4.7 \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /lib/hudi/hudi-utilities-bundle_2.11-0.7.0-amzn-1.jar` \
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
   --op UPSERT \
   --source-ordering-field timestamp \
   --table-type COPY_ON_WRITE \
   --target-table "$1" \
   --target-base-path "s3a://aws-hudi-data/data/stage/data/$1" \
   --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
   --continuous \
   --enable-sync \
   --min-sync-interval-seconds 5 \
   --hoodie-conf "group.id=test" \
   --hoodie-conf "auto.offset.reset=earliest" \
   --hoodie-conf "hoodie.datasource.write.recordkey.field=body.id" \
   --hoodie-conf "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator" \
   --hoodie-conf "hoodie.deltastreamer.source.kafka.topic=$2" \
   --hoodie-conf "hoodie.deltastreamer.schemaprovider.registry.url=https://schema-registry-url.com/subjects/$2-value/versions/latest" \
   --hoodie-conf "hoodie.datasource.write.partitionpath.field=timestamp"
   ```
   
   **The Schema Contains Fields with UNION Structs and Arrays such as the following:**
   ```
   {
                               "name": "Title",
                               "type": [
                                 "null",
                                 {
                                   "type": "record",
                                   "name": "body",
                                   "namespace": "Title.additional.Payload",
                                   "fields": [
                                     {
                                       "name": "Name",
                                       "type": "string"
                                     },
                                     {
                                       "name": "value",
                                       "type": [
                                         "null",
                                         "string"
                                       ]
                                     }
                                   ]
                                 }
                               ],
                               "default": null
                             }
   ```
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   The deltastreamer is running on the AWS EMR version 5.33. Details of the distribution are below
   
   * Hudi version : 0.7.0
   
   * Spark version : 2.11.12
   
   * Hive version : 2.37-amzn-4
   
   * Hadoop version : 2.10.1-amzn-1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   **Stacktrace**
   
   ```
   21/06/18 23:47:53 ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: Unsupported type NULL
           at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:130)
           at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
           at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
           at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
           at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
           at scala.collection.Iterator$class.foreach(Iterator.scala:891)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
           at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
           at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
           at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
           at scala.collection.AbstractTraversable.map(Traversable.scala:104)
           at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
           at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46)
           at org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56)
           at org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType(AvroConversionUtils.scala)
           at org.apache.hudi.utilities.schema.SparkAvroPostProcessor.processSchema(SparkAvroPostProcessor.java:44)
           at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.lambda$getSourceSchema$0(SchemaProviderWithPostProcessor.java:41)
           at org.apache.hudi.common.util.Option.map(Option.java:107)
           at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:41)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:660)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:207)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:560)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:138)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:102)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:470)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
n3nash commented on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-865410086


   @jkdll Can you try changing your schema as follows : 
   
   ```
   {
                               "name": "Title",
                               "type": [
                                 "null",
                                 {
                                   "type": "record",
                                   "name": "body",
                                   "namespace": "Title.additional.Payload",
                                   "fields": [
                                     {
                                       "name": "Name",
                                       "type": [
                                         "null",
                                         "string"
                                       ]
                                     },
                                     {
                                       "name": "value",
                                       "type": [
                                         "null",
                                         "string"
                                       ]
                                     }
                                   ]
                                 }
                               ],
                               "default": null
                             }
   ```
   
   The "Name" field did not have a default value NULL before. 
   
   To be able to debug this further, could you trim down your schema to provide 1 with which we can reproduce this behavior ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll commented on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll commented on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-879860606


   @n3nash - after further analysis, this issue was solved by upgrading to use Spark 3.x instead of Spark 2, which has better handling for different AVRO types.
   Closing this ticket.
   
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll commented on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll commented on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-871620908


   @n3nash For the following error:
   ```
   ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: Unsupported type NULL
   ```
   I dug a bit deeper and found that the Spark Avro version we were running, 2.4.8, does not support null attributes, as shown here:
   https://github.com/apache/spark/blob/branch-2.4/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
   
   However, updated versions of the library, specifically 3.x do:
   https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
   
   Thus we upgraded our EMR cluster to run Spark 3, also with Hudi 0.7.0; after this, the error disappeared. I ran a spark submit command using the SQL transformer, with a simple query: `SELECT CAST(body.Id as STRING) as id, CAST(COALESCE(body.Payload.creation.time,'') as STRING) as body_Payload_creation_time FROM <SRC>;`
   
   However, we are met with another error:
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, ip-10-102-8-124.eu-central-1.compute.internal, executor 1): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro schema to catalyst type because schema at path messageKey is not compatible (avroType = NullType, sqlType = NULL).
   Source Avro Schema: ...
   Target Catalyst type: ...
           at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:265)
           at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:146)
           at org.apache.hudi.AvroConversionHelper$.createConverterToRow(AvroConversionHelper.scala:273)
           at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$1(AvroConversionUtils.scala:42)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
           at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:100)
           at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:127)
           at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   
   Driver stacktrace:
           at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2175)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2124)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2123)
           at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
           at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
           at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
           at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:990)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:990)
           at scala.Option.foreach(Option.scala:407)
           at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:990)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2355)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2304)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2293)
           at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
           at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:792)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
           at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1423)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
           at org.apache.spark.rdd.RDD.take(RDD.scala:1396)
           at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1531)
           at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
           at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1531)
           at org.apache.spark.api.java.JavaRDDLike.isEmpty(JavaRDDLike.scala:544)
           at org.apache.spark.api.java.JavaRDDLike.isEmpty$(JavaRDDLike.scala:544)
           at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:255)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:587)
           ... 4 more
   Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro schema to catalyst type because schema at path routingKey is not compatible (avroType = NullType, sqlType = NULL).
    at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:265)
           at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:146)
           at org.apache.hudi.AvroConversionHelper$.createConverterToRow(AvroConversionHelper.scala:273)
           at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$1(AvroConversionUtils.scala:42)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
           at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:100)
           at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:127)
           at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
           ... 3 more
   ```
   
   Spark Version: version 3.0.0-amzn-0
   Scala version: 2.12.10
   Java Version: OpenJDK 1.8.0_252
   Hudi Version: 0.7.0
   Spark-Avro Version: org.apache.spark:spark-avro_2.12:3.0.0
   
   Meanwhile, without the SQL transform, errors are still the same as "Problem 1" in the original ticket.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll edited a comment on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll edited a comment on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-870598881


   After further investiation; it seems like the first error (`java.lang.ArrayIndexOutOfBoundsException`) is related to data types being changed and new fields being introduced in the middle of the schema. The topic I am reading from contains a lot of data whose structure has changed quite drastically. It is backward compatible, but fields were introduced in the middle of the schema. Thus I have concluded that as per the documentation, such schema evolution will not work.
   
   Instead, I have opted for a different setup, but I am still experiencing the same error as reported (Unsupported type NULL):
   
   1. Using `org.apache.hudi.utilities.schema.SchemaRegistryProvider`, where I define the source and target schemas registry with the following configs:
   ```
   --hoodie-conf "hoodie.deltastreamer.schemaprovider.registry.targetUrl=
   --hoodie-conf "hoodie.deltastreamer.schemaprovider.registry.url=
   ```
   2. The schema specified for `targetURL` is a reduced version of the schema, containing just two fields. 
   3. Within the deltastreamer command, I use an SQLQueryBasedTransformer with the following properties:
   ```
   --transformer-class "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer"
   --hoodie-conf "hoodie.deltastreamer.transformer.sql=\
   SELECT CAST(id as STRING) as id,\
   CAST(COALESCE(body.Payload.creation.time,'') as STRING) as body_Payload_creation_time\
   FROM <SRC>"
   ```
   4. This directly maps to the target schema registry schema:
   ```
   "{\"type\":\"record\",\"name\":\"test_table_schema\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"body_Payload_creation_time\",\"type\":\"string\"}]}"
   ```
   5. But when using deltastreamer, I am met with this error (same as above):
   ```
   ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: Unsupported type NULL
           at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:130)
           at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
           at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
           at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
           at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
           at scala.collection.Iterator$class.foreach(Iterator.scala:891)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
           at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
           at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
           at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
           at scala.collection.AbstractTraversable.map(Traversable.scala:104)
           at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
           at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46)
           at org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:115)
           at org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType(AvroConversionUtils.scala)
           at org.apache.hudi.utilities.schema.SparkAvroPostProcessor.processSchema(SparkAvroPostProcessor.java:44)
           at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.lambda$getSourceSchema$0(SchemaProviderWithPostProcessor.java:41)
           at org.apache.hudi.common.util.Option.map(Option.java:107)
           at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:41)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:680)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:209)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:571)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:138)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:102)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:480)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)
   ```
   
   Given that I am explicitly using a smaller schema (which I do not expect to be reading data for NULL values, even though I casted it above), I don't know why I am getting this error. My reasoning is that since I am now using the transformer, and casting null values in the query, I should not be facing this error when writing to Hudi. **Could you please clarify whether this reasoning is correct?**
   
   The error above is the same as the original ticket when using the Flatten Transformer. I have either misunderstood something or the issue does not lie with the target table. Please clarify whether this reasoning is correct.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll edited a comment on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll edited a comment on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-871620908


   @n3nash For the following error:
   ```
   ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: Unsupported type NULL
   ```
   I dug a bit deeper and found that the Spark Avro version we were running, 2.4.8, does not support null attributes, as shown here:
   https://github.com/apache/spark/blob/branch-2.4/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
   
   However, updated versions of the library, specifically 3.x do:
   https://github.com/apache/spark/blob/master/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
   
   Thus we upgraded our EMR cluster to run Spark 3, also with Hudi 0.7.0; after this, the error disappeared. I ran a spark submit command using the SQL transformer, with a simple query: `SELECT CAST(body.Id as STRING) as id, CAST(COALESCE(body.Payload.creation.time,'') as STRING) as body_Payload_creation_time FROM <SRC>;`
   
   However, we are met with another error:
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, ip-10-102-8-124.eu-central-1.compute.internal, executor 1): org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro schema to catalyst type because schema at path messageKey is not compatible (avroType = NullType, sqlType = NULL).
   Source Avro Schema: ...
   Target Catalyst type: ...
           at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:265)
           at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:146)
           at org.apache.hudi.AvroConversionHelper$.createConverterToRow(AvroConversionHelper.scala:273)
           at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$1(AvroConversionUtils.scala:42)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
           at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:100)
           at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:127)
           at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   
   Driver stacktrace:
           at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2175)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2124)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2123)
           at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
           at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
           at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
           at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2123)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:990)
           at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:990)
           at scala.Option.foreach(Option.scala:407)
           at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:990)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2355)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2304)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2293)
           at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
           at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:792)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
           at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
           at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1423)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
           at org.apache.spark.rdd.RDD.take(RDD.scala:1396)
           at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1531)
           at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
           at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
           at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
           at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1531)
           at org.apache.spark.api.java.JavaRDDLike.isEmpty(JavaRDDLike.scala:544)
           at org.apache.spark.api.java.JavaRDDLike.isEmpty$(JavaRDDLike.scala:544)
           at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:255)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:587)
           ... 4 more
   Caused by: org.apache.spark.sql.avro.IncompatibleSchemaException: Cannot convert Avro schema to catalyst type because schema at path routingKey is not compatible (avroType = NullType, sqlType = NULL).
    at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:265)
           at org.apache.hudi.AvroConversionHelper$.createConverter$1(AvroConversionHelper.scala:146)
           at org.apache.hudi.AvroConversionHelper$.createConverterToRow(AvroConversionHelper.scala:273)
           at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$1(AvroConversionUtils.scala:42)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
           at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.sql.execution.SQLExecutionRDD.$anonfun$compute$1(SQLExecutionRDD.scala:52)
           at org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:100)
           at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:127)
           at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
           ... 3 more
   ```
   
   From the schema above, it seems to be failing where the avro schema contains a simple column of type null:
   ```
   {
         "name": "messageKey",
         "type": "null"
       },
   ```
   
   Unfortunately I don't have the capability to change the schema, as I am not the producer. 
   
   Spark Version: version 3.0.0-amzn-0
   Scala version: 2.12.10
   Java Version: OpenJDK 1.8.0_252
   Hudi Version: 0.7.0
   Spark-Avro Version: org.apache.spark:spark-avro_2.12:3.0.0
   
   Meanwhile, without the SQL transform, errors are still the same as "Problem 1" in the original ticket.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll commented on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll commented on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-869186682


   @n3nash apologies for the late reply. I have been trying to replicate this error using a different data set, however I have not managed to do so. 
   
   Could you please clarify why such an error (as shown in the ticket) would occur? (as this seems to be the original error).
   ```
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39662 partitionPath=1616684167}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   ```
   
   I will continue trying to replicate the error with a smaller schema.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll commented on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll commented on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-871786169


   @n3nash I believe I found the issue, and added this code: https://github.com/jkdll/hudi/commit/8c3aabca64b2f65061864bac2fb1816df2fb9d89
   
   I am unable to test right now because it seems like build is broken on release branches 0.8.0 and 0.7.0. Cannot find this dependency: https://mvnrepository.com/artifact/org.pentaho/pentaho-aggdesigner-algorithm/5.1.5-jhyde


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll closed issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll closed issue #3113:
URL: https://github.com/apache/hudi/issues/3113


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll commented on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll commented on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-864336980


   I have also tested with the latest version on the `master` branch. Same error.
   
   Moreover, without `--transformer-class org.apache.hudi.utilities.transform.FlatteningTransformer` I get this error, which I believe is triggered due to UNION structs. I believe the flatten transformer "fixes" this by flattening the struct, however it is only a workaround:
   ```
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39662 partitionPath=1616684167}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39576 partitionPath=1616419975}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39575 partitionPath=1616419975}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39589 partitionPath=1616426425}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=37234 partitionPath=1616426425}, currentLocation='null', newLocation='null'}
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
n3nash commented on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-865410086


   @jkdll Can you try changing your schema as follows : 
   
   ```
   {
                               "name": "Title",
                               "type": [
                                 "null",
                                 {
                                   "type": "record",
                                   "name": "body",
                                   "namespace": "Title.additional.Payload",
                                   "fields": [
                                     {
                                       "name": "Name",
                                       "type": [
                                         "null",
                                         "string"
                                       ]
                                     },
                                     {
                                       "name": "value",
                                       "type": [
                                         "null",
                                         "string"
                                       ]
                                     }
                                   ]
                                 }
                               ],
                               "default": null
                             }
   ```
   
   The "Name" field did not have a default value NULL before. 
   
   To be able to debug this further, could you trim down your schema to provide 1 with which we can reproduce this behavior ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll removed a comment on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll removed a comment on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-864336980


   I have also tested with the latest version on the `master` branch. Same error.
   
   Moreover, without `--transformer-class org.apache.hudi.utilities.transform.FlatteningTransformer` I get this error within the application logs: 
   ```
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39662 partitionPath=1616684167}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39576 partitionPath=1616419975}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39575 partitionPath=1616419975}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39589 partitionPath=1616426425}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=37234 partitionPath=1616426425}, currentLocation='null', newLocation='null'}
   ```
   I believe is triggered due to UNION structs. I believe the flatten transformer "fixes" this by flattening the struct, however it is only a workaround. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll edited a comment on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll edited a comment on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-864336980


   I have also tested with the latest version on the `master` branch. Same error.
   
   Moreover, without `--transformer-class org.apache.hudi.utilities.transform.FlatteningTransformer` I get this error within the application logs: 
   ```
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39662 partitionPath=1616684167}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39576 partitionPath=1616419975}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39575 partitionPath=1616419975}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=39589 partitionPath=1616426425}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException
   21/06/19 01:10:36 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=37234 partitionPath=1616426425}, currentLocation='null', newLocation='null'}
   ```
   I believe is triggered due to UNION structs. I believe the flatten transformer "fixes" this by flattening the struct, however it is only a workaround. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll commented on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll commented on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-871790513


   Added PR: https://github.com/apache/hudi/pull/3195


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] jkdll commented on issue #3113: [SUPPORT] Deltastreamer Error when reading AVRO Schema with UNION Types

Posted by GitBox <gi...@apache.org>.
jkdll commented on issue #3113:
URL: https://github.com/apache/hudi/issues/3113#issuecomment-870598881


   After further investiation; it seems like the first error (`java.lang.ArrayIndexOutOfBoundsException`) is related to data types being changed and new fields being introduced in the middle of the schema. The topic I am reading from contains a lot of data whose structure has changed quite drastically. It is backward compatible, but fields were introduced in the middle of the schema. Thus I have concluded that as per the documentation, such schema evolution will not work.
   
   Instead, I have opted for the following setup:
   
   1. Using `org.apache.hudi.utilities.schema.SchemaRegistryProvider`, where I define the source and target schemas registry with the following configs:
   ```
   --hoodie-conf "hoodie.deltastreamer.schemaprovider.registry.targetUrl=
   --hoodie-conf "hoodie.deltastreamer.schemaprovider.registry.url=
   ```
   2. The schema specified for `targetURL` is a reduced version of the schema, containing just two fields. 
   3. Within the deltastreamer command, I use an SQLQueryBasedTransformer with the following properties:
   ```
   --transformer-class "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer"
   --hoodie-conf "hoodie.deltastreamer.transformer.sql=\
   SELECT CAST(id as STRING) as id,\
   CAST(COALESCE(body.Payload.creation.time,'') as STRING) as body_Payload_creation_time\
   FROM <SRC>"
   ```
   4. This directly maps to the target schema registry schema:
   ```
   "{\"type\":\"record\",\"name\":\"test_table_schema\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"body_Payload_creation_time\",\"type\":\"string\"}]}"
   ```
   5. But when using deltastreamer, I am met with this error (same as above):
   ```
   ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.avro.IncompatibleSchemaException: Unsupported type NULL
           at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:130)
           at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
           at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
           at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
           at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
           at scala.collection.Iterator$class.foreach(Iterator.scala:891)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
           at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
           at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
           at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
           at scala.collection.AbstractTraversable.map(Traversable.scala:104)
           at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
           at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46)
           at org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:115)
           at org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType(AvroConversionUtils.scala)
           at org.apache.hudi.utilities.schema.SparkAvroPostProcessor.processSchema(SparkAvroPostProcessor.java:44)
           at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.lambda$getSourceSchema$0(SchemaProviderWithPostProcessor.java:41)
           at org.apache.hudi.common.util.Option.map(Option.java:107)
           at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:41)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:680)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:209)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:571)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:138)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:102)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:480)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)
   ```
   
   Given that I am explicitly using a smaller schema (which I do not expect to be reading data for NULL values, even though I casted it above), I don't know why I am getting this error. My reasoning is that since I am now using the transformer, and casting null values in the query, I should not be facing this error when writing to Hudi. **Could you please clarify whether this reasoning is correct?**
   
   The error above is the same as the original ticket when using the Flatten Transformer. I have either misunderstood something or the issue does not lie with the target table. Please clarify whether this reasoning is correct.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org