You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "danielfordfc (via GitHub)" <gi...@apache.org> on 2023/02/06 14:19:56 UTC

[GitHub] [hudi] danielfordfc opened a new issue, #7867: [SUPPORT] Errors ingesting topics containing "enum" data types

danielfordfc opened a new issue, #7867:
URL: https://github.com/apache/hudi/issues/7867

   **Describe the problem you faced**
   
   We are using the DeltaStreamer on EMR 6.9.0, sourcing data from Confluent Kafka Avro topics and using our Confluent Schema Registry to deserialize the messages, which we write to the Glue Data Catalog and query with Athena.
   
   For the majority of topics this works well, however, we noticed deserialisation errors when topics have Avro `enum` types in the schema.
   
   Errors come in two forms, based on whether we use the default `KafkaAvroDeserializer`, or the `KafkaAvroSchemaDeserializer `
   
   1. **Scala.MatchError**
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4) (ip-10-154-13-123.eu-west-1.compute.internal executor 1): scala.MatchError: processing (of class org.apache.avro.generic.GenericData$EnumSymbol)
   	at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$13(AvroDeserializer.scala:178)
   	at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$13$adapted(AvroDeserializer.scala:177)
   	at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:379)
   	at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:375)
   	at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$3(AvroDeserializer.scala:389)
   	at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$3$adapted(AvroDeserializer.scala:385)
   	at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$4(AvroDeserializer.scala:87)
   	at org.apache.hudi.org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:105)
   	at org.apache.hudi.org.apache.spark.sql.avro.HoodieSpark3_3AvroDeserializer.deserialize(HoodieSpark3_3AvroDeserializer.scala:30)
   ```
   
   2. **org.apache.avro.AvroTypeException**
   ```
   Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition {topic}-0 at offset 7202. If needed, please seek past the record to continue consumption.
   Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 4144
   Caused by: org.apache.avro.AvroTypeException: Found {avro_record_namespace}.{enum_name}, expecting string
   ```
   
   for an enum field resembling the following (note that sometimes there are `default:` added to the field, but never a `default:` at the symbol level...
   ```
   {
         "name": "status",
         "type": {
           "type": "enum",
           "name": "status_options",
           "symbols": [
             "processing",
             "completed",
             "error"
           ]
   }
   },
   ```
   
   For instance, we recieved org.apache.avro.AvroTypeException: Found {avro_record_namespace}.status_options, expecting string, and without the `KafkaAvroSchemaDeserializer`, we receive scala.MatchError: `{one_of_the_enum_symbols}` (of class org.apache.avro.generic.GenericData$EnumSymbol
   
   **To Reproduce**
   
   **Scala.MatchError**
   
   With a duplicate environment to the one mentioned at the beginning, our Spark command is:
   
   ```
   "spark-submit",
   "--class", "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer",
   "--conf", "spark.scheduler.mode=FAIR",
   "--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer",
   "--conf", "spark.sql.catalogImplementation=hive",
   "--conf", "spark.sql.hive.convertMetastoreParquet=false",
   "--conf", "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension",
   "--conf", "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog",
   "--conf", "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
   "--conf", "spark.streaming.kafka.allowNonConsecutiveOffsets=true",
   # IMPORTANT: hudi-utilities-bundle must be declared immediately before any Hudi spark commands
   "/usr/lib/hudi/hudi-utilities-bundle.jar",
   "--source-class", "org.apache.hudi.utilities.sources.{{ source_type }}",
   "--source-ordering-field", "{{ timestamp_field }}",
   "--table-type", "COPY_ON_WRITE",
   "--op", "UPSERT",
   "--enable-sync",
   "--continuous",
   # Hudi write config
   
   "--target-base-path", f"s3://{bucket}/raw/{{ table }}",
   "--target-table", "{{ table }}",
   "--hoodie-conf", "hoodie.database.name={{ database }}_raw",
   "--hoodie-conf", "hoodie.table.name={{ table }}",
   "--hoodie-conf", "hoodie.datasource.write.recordkey.field={{ primary_key }}",
   "--hoodie-conf", "hoodie.datasource.write.precombine.field={{ timestamp_field }}",
   "--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator",
   "--hoodie-conf", "hoodie.datasource.write.partitionpath.field={{ timestamp_field }}",
   "--hoodie-conf", "hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS",
   "--hoodie-conf", "hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd",
   # Filter invalid records
   "--transformer-class", "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer",
   "--hoodie-conf", "hoodie.deltastreamer.transformer.sql=SELECT * FROM <SRC> WHERE {{ primary_key }} is not null AND {{ timestamp_field }} is not null",
   # AWS Glue Data Catalog config
   "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
   "--hoodie-conf", "hoodie.datasource.hive_sync.database={{ database }}_raw",
   "--hoodie-conf", "hoodie.datasource.hive_sync.table={{ table }}",
   "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields=_event_date",
   "--hoodie-conf", f"hoodie.deltastreamer.source.kafka.topic={self.kafka_topic}",
   "--hoodie-conf", "auto.offset.reset=earliest",
   "--hoodie-conf", "sasl.mechanism=PLAIN",
   "--hoodie-conf", "security.protocol=SASL_SSL",
   "--hoodie-conf", f"bootstrap.servers={self.kafka_bootstrap_servers}",
   "--hoodie-conf", f'sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required 
   username="{self.kafka_sasl_username}" password="{self.kafka_sasl_password}";',
   "--schemaprovider-class", "org.apache.hudi.utilities.schema.SchemaRegistryProvider",
   "--hoodie-conf", f"schema.registry.url=https://{self.schema_registry_url}",
   "--hoodie-conf", "basic.auth.credentials.source=USER_INFO",
   "--hoodie-conf", f"schema.registry.basic.auth.user.info={self.schema_registry_auth}",
   "--hoodie-conf", f"hoodie.deltastreamer.schemaprovider.registry.url=https://{self.schema_registry_auth}@{self.schema_registry_url}//subjects/{self.kafka_topic}-value/versions/latest",
   ```
   
   **Most notably, removing the SQL Transformer we are using below seems to remove the error for many of the topics for both error cases.**
   ```
   "--transformer-class", "org.apache.hudi.utilities.transform.SqlQueryBasedTransformer",
   "--hoodie-conf", "hoodie.deltastreamer.transformer.sql=SELECT * FROM <SRC> WHERE {{ primary_key }} is not null AND {{ timestamp_field }} is not null",
   ```
   
   We are using this transformer to attempt to drop null records from being ingested, to avoid the below error which appears to be caused by empty data in our selected hoodie.datasource.write.partitionpath.field, although we cannot find any evidence of this in our topics:
   
   ```
   Caused by: org.apache.hudi.exception.HoodieException: The value of {timestamp_field} can not be null
   	at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:534)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$b741bfe4$1(DeltaSync.java:490)
   	at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
   ```
   But based on the fact that this appears to be causing the errors, there may be a better way of doing this? 
   One thing to note is that this error only seems to be occurring on topics that do not start at an offset of 0, so this error may be a red herring, or bad data on our end which we don't seem to able to confirm.
   
   
   **org.apache.avro.AvroTypeException**
   
   The only difference in this test setup is the addition of                     
   
   ```"--hoodie-conf", "hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer".```
   
   The error is different, but the fix appears to be removing the SQL transformer, and the working topics from the previous test again work, but the failing ones with `The value of {timestamp_field} can not be null` remains.
   
   
   **Expected behavior**
   
   I expected the enum data types to be able to be ingested, even when including an SQL transformer to only select records that fit the query description, [based on this documentation](https://spark.apache.org/docs/latest/sql-data-sources-avro.html#supported-types-for-avro---spark-sql-conversion)
   
   **Answers Required**
   
   If the above is an incorrect assumption, the cause is known and this is expected behaviour, then my question becomes "How can I filter out potentially null records that will break the primary key / partition path field handling i'm using for my `TimestampBasedKeyGenerator` and `UPSERT` behaviour"
   
   &&
   
   Is there an answer to why I may be receiving this `The value of {timestamp_field} can not be null` error on seemingly OK topics? The source system apparently guarantees that these fields are always populated, so the only thing I can think is an offset scanning problem as this failing topic is the only topic of the ones I tested that didn't start at an offset 0.
   
   **Environment Description**
   
   * Hudi version : Deltastreamer on EMR 6.9.0 running Hudi 0.12.1
   * Spark version : 3.3.0
   * Hive version : 3.1.3 
   * Hadoop version : 3.3.3
   * Storage (HDFS/S3/GCS..) : S3
   * Running on Docker? (yes/no) : No
   
   **Additional context**
   
   I have visited the Hudi office hours before writing this issue, where it was suggested that I investigate and write up my findings here if the root cause is still unknown.
   
   If there's any detail missing from these tests please let me know and I can recreate and provide more clarity!
   
   **Stacktraces**
   
   - These 3 traces are all processing the same topic with the different configs discussed, one that throws the `The value of {timestamp_field} can not be null` error when the transformer is removed.
   - Before each test or investigation, if s3 data or tables have been populated, I cancelled running EMR jobs, dropped any created tables and deleted underlying S3 data.
   
   **Potentially useful information about the topic:**
   - This schema has 5 versions and although the schema registry only enforces BACKWARDS compatibility, I believe the current evolutions to adhere also to BACKWARDS_TRANSITIVE.
   - The Enum failing in both the Scala Match Error and the org.apache.avro.AvroTypeException  message **“status”** has been unchanged the entire time, and is the first enum field in the schema
   - The topic appears to start at offset 7202, which is the offset declared in the org.apache.avro.AvroTypeException stack trace
   
   
   Using Default Deserializer - Scala Match Error
   [scala_match_error_stacktrace.txt](https://github.com/apache/hudi/files/10663859/scala_match_afmas_st_clean.txt)
   Using KafkaAvroSchemaDeserializer -  org.apache.avro.AvroTypeException 
   [avro_type_error_stacktrace.txt](https://github.com/apache/hudi/files/10663860/avro_type_afmas_st_clean.txt)
   Removing the SQL Transformer  - `The value of {timestamp_field} can not be null`
   [timestamp_field_cantbe_null_trace.txt](https://github.com/apache/hudi/files/10663925/timestamp_field_cantbe_null_trace.txt)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nfarah86 commented on issue #7867: [SUPPORT] [WIP] [PLEASE DO NOT TRIAGE] Errors ingesting topics containing "enum" data types

Posted by "nfarah86 (via GitHub)" <gi...@apache.org>.
nfarah86 commented on issue #7867:
URL: https://github.com/apache/hudi/issues/7867#issuecomment-1432238186

   cc @danielfordfc -> wdym with "please do not triage"? if you can clarify. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danielfordfc commented on issue #7867: [SUPPORT] [WIP] [PLEASE DO NOT TRIAGE] Errors ingesting topics containing "enum" data types

Posted by "danielfordfc (via GitHub)" <gi...@apache.org>.
danielfordfc commented on issue #7867:
URL: https://github.com/apache/hudi/issues/7867#issuecomment-1446039886

   Replaced with https://github.com/apache/hudi/issues/8042


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danielfordfc closed issue #7867: [SUPPORT] [WIP] [PLEASE DO NOT TRIAGE] Errors ingesting topics containing "enum" data types

Posted by "danielfordfc (via GitHub)" <gi...@apache.org>.
danielfordfc closed issue #7867: [SUPPORT] [WIP] [PLEASE DO NOT TRIAGE] Errors ingesting topics containing "enum" data types
URL: https://github.com/apache/hudi/issues/7867


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danielfordfc commented on issue #7867: [SUPPORT] [WIP] [PLEASE DO NOT TRIAGE] Errors ingesting topics containing "enum" data types

Posted by "danielfordfc (via GitHub)" <gi...@apache.org>.
danielfordfc commented on issue #7867:
URL: https://github.com/apache/hudi/issues/7867#issuecomment-1436818551

   I am looking to rewrite this ticket to focus on the SQL Transformer being unable to handle enum data types. The sections in which I discuss `The value of {timestamp_field} can not be null` error has since been identified as a data error in our topics.
   
   There is a separate question I have as to why the KafkaAvroDeserializer doesn't work but the KafkaAvroSchemaDeserializer one does for certain topics that have undergone an evolution, but I will open up another ticket for this.
   
   Should I edit the ticket to remove the `The value of {timestamp_field} can not be null` discussion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org