You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sydneyhoran (via GitHub)" <gi...@apache.org> on 2023/04/20 17:22:23 UTC

[GitHub] [hudi] sydneyhoran opened a new issue, #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

sydneyhoran opened a new issue, #8519:
URL: https://github.com/apache/hudi/issues/8519

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   We are using Deltastreamer with a PostgresDebeziumSource and consuming data from confluent Kafka, using Confluent Schema Registry as the Schema Provider. The job runs fine for some time and then all of a sudden fails with NullPointerException.
   
   We believe this to be caused by Kafka messages with empty/null values such as with Debezium tombstone records. We do not have the ability to modify the Debezium connectors to turn off tombstone records at this time.
   
   Looking for a solution to have Deltastreamer ignore/skip over tombstone Kafka messages that contain a null value.
   
   Thanks for any input!
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Use Debezium Kafka Connector to publish data from Postgres server to Kafka.
   2. Use PostgresDebeziumSource and Confluent Schema Registry to consume data.
   3. It runs fine for some records and stores the data into files in storage.
   4. Run a delete record operation on the Postgres DB to emit a tombstone record.
   5. After some time it fails with Null Pointer Exception without much description.
   
   **Expected behavior**
   
   The job should run without errors on empty Kafka message values.
   
   **Environment Description**
   
   * Hudi version : 0.13.0
   
   * Spark version : 3.1
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : both
   
   
   **Additional context**
   
   **hoodie configs:**
   ```
   --target-base-path s3a://{{ bucket }}/{{ table_path }}
   --target-table {{ table_name }}
   --continuous
   --props gs://path/to/tablename.properties
   --min-sync-interval-seconds 15
   --source-ordering-field updated_at
   --source-limit 5000
   --table-type COPY_ON_WRITE
   --op UPSERT
   --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
   --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
   ```
   
   **tablename.properties**
   ```
   hoodie.deltastreamer.schemaprovider.registry.url={{ schema_url }}.confluent.cloud/subjects/{{ topic }}-value/versions/latest
   hoodie.deltastreamer.source.kafka.topic=some.topic
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=inserted_at
   hoodie.datasource.write.precombine.field=updated_at
   schema.registry.url={{ schema_url }}
   schema.registry.basic.auth.user.info={{ schema_user }}:{{ schema_key }}
   sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ kafka_user }}' password='{{ kafka_key }}';
   bootstrap.servers={{ bootstrap_server }}
   hoodie.embed.timeline.server=false
   hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
   hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
   hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
   ```
   
   **Stacktrace**
   
   
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 14) (10.253.229.42 executor 1): java.lang.NullPointerException
   Driver stacktrace:
   	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252)
   	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
   	at scala.Option.foreach(Option.scala:407)
   	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422)
   	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
   	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
   	at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
   	at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
   	at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
   	at org.apache.hudi.data.HoodieJavaRDD.collectAsList(HoodieJavaRDD.java:177)
   	at org.apache.hudi.index.simple.HoodieSimpleIndex.fetchRecordLocationsForAffectedPartitions(HoodieSimpleIndex.java:142)
   	at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocationInternal(HoodieSimpleIndex.java:113)
   	at org.apache.hudi.index.simple.HoodieSimpleIndex.tagLocation(HoodieSimpleIndex.java:91)
   	at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:54)
   	at org.apache.hudi.table.action.commit.HoodieWriteHelper.tag(HoodieWriteHelper.java:36)
   	at org.apache.hudi.table.action.commit.BaseWriteHelper.write(BaseWriteHelper.java:64)
   	... 24 more
   Caused by: java.lang.NullPointerException
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] the-other-tim-brown commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "the-other-tim-brown (via GitHub)" <gi...@apache.org>.
the-other-tim-brown commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1542645730

   @Sam-Serpoosh for the first issue regarding the schema, this is because we are fetching that schema as a string. If that class is not defined in the string, we won't know how it is defined. Maybe there is some arg to pass to the api to get the schemas that this schema relies on as well? 
   
   For the second, it is hard to tell without looking at your data. If you pull the data locally and step through, you may have a better shot of understanding. The main thing I have seen trip people up is the requirements for the delete records in the topic. You can also try out the same patch Sydney posted above for filtering out the tombstones in kafka.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1555344231

   @the-other-tim-brown @ad1happy2go Since this issue is getting somewhat cluttered and long, I opened a new and designated issue at #8761 to continue the discussion/iterating there. Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] samserpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "samserpoosh (via GitHub)" <gi...@apache.org>.
samserpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1530731595

   Thanks @the-other-tim-brown! Here's the stack-trace (which looks quite similar to what Sydney has in her issue's description):
   
   ```
   23/05/01 21:09:27 INFO DeltaSync: Shutting down embedded timeline server
   23/05/01 21:09:27 INFO SparkUI: Stopped Spark web UI at http://spark-deltastreamer-driver.spark-deltastreamer-driver-headless.simian-dev8.svc.cluster.local:8090/
   23/05/01 21:09:27 INFO StandaloneSchedulerBackend: Shutting down all executors
   23/05/01 21:09:27 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
   23/05/01 21:09:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
   23/05/01 21:09:27 INFO MemoryStore: MemoryStore cleared
   23/05/01 21:09:27 INFO BlockManager: BlockManager stopped
   23/05/01 21:09:27 INFO BlockManagerMaster: BlockManagerMaster stopped
   23/05/01 21:09:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
   23/05/01 21:09:27 INFO SparkContext: Successfully stopped SparkContext
   Exception in thread "main" org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:191)
           at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:186)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:553)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
           at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
           at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
           at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
           at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
           at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
           at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
           at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException
           at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
           at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
           at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:189)
           ... 15 more
   Caused by: org.apache.hudi.exception.HoodieException
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:713)
           at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.NullPointerException
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:301)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679)
           ... 4 more
   ```
   
   The stack-trace looks incomplete or possibly inaccurate? Because I'm using `0.11.1` version. And when I check [the **line 301**](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L301) in `DeltaSync`, it's a blank line 😲 
   
   > you can confirm whether this is the case by deserializing the data with org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer without the extra postgres debezium logic.
   
   Good point ... Let me see if I can get a vanilla Kafka consumption, Avro deserialization and sinking into S3 working.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] the-other-tim-brown commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "the-other-tim-brown (via GitHub)" <gi...@apache.org>.
the-other-tim-brown commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1526004722

   @sydneyhoran I'm still trying to come up to speed on the errors you are seeing but I can chime in on the behavior for the PostgresDebeziumSource and the payload. The [source](https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#L79) is going to be doing something similar to `before.*` or `after.*` along with pulling out some metadata from the debezium record. The payload will be marking the row for deletion if the `op` is `d`. In order to properly delete the record, the `before` field needs to be set for deletions so you can extract the proper `id`, `inserted_at`, and `updated_at` values so Hudi knows which record to delete, which partition it is in, and whether it is the latest update for that record.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] the-other-tim-brown commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "the-other-tim-brown (via GitHub)" <gi...@apache.org>.
the-other-tim-brown commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1544025115

   @Sam-Serpoosh nice find! This is not what we've seen in practice when using debezium. You could work around this but let's see if we can get rid of this nesting so the data is easier to work with. I don't see anything in the debezium docs about this https://debezium.io/documentation/reference/stable/connectors/postgresql.html
   
   Can you let me know which version of debezium and postgres you are running?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] sydneyhoran commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "sydneyhoran (via GitHub)" <gi...@apache.org>.
sydneyhoran commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1533511853

   Thanks to help from Aditya, @rmahindra123  and @nsivabalan , this was the fix that worked for us to filter out tombstones: https://github.com/sydneyhoran/hudi/commit/b864a69e27d50424b6984f28a31c3bd99a025762


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1550062066

   @the-other-tim-brown Below I've laid out some information that you might find helpful ...
   
   ### DB Table I'm Testing With in PostgreSQL
   
   ```
   \d samser_customers
                                           Table "public.samser_customers"
      Column   |            Type             | Collation | Nullable |                   Default
   ------------+-----------------------------+-----------+----------+----------------------------------------------
    id         | integer                     |           | not null | nextval('samser_customers_id_seq'::regclass)
    name       | character varying(50)       |           | not null |
    age        | integer                     |           | not null |
    created_at | timestamp without time zone |           |          |
    event_ts   | bigint                      |           |          |
   Indexes:
       "samser_customers_pkey" PRIMARY KEY, btree (id)
   Referenced by:
       TABLE "samser_orders" CONSTRAINT "fk_customer" FOREIGN KEY (customer_id) REFERENCES samser_customers(id)
   Publications:
       "dbz_publication"
   ```
   
   The dummy data currently in the table:
   
   ```
   SELECT * FROM samser_customers;
    id |   name   | age |         created_at         |   event_ts
   ----+----------+-----+----------------------------+---------------
     1 | Bob      |  40 | 2023-05-12 00:05:27.204463 | 1681984800000
     2 | Alice    |  30 | 2023-05-12 00:05:27.204463 | 1681988400000
     3 | John     |  37 | 2023-05-12 00:05:27.204463 | 1681992000000
     4 | Jon      |  25 | 2023-05-12 00:05:27.204463 | 1681995600000
     5 | David    |  20 | 2023-05-12 00:05:27.204463 | 1681999200000
     6 | Jack     |  70 | 2023-05-12 00:05:27.204463 | 1682002800000
     8 | Ali      |  35 | 2023-05-15 18:03:11.675954 | 1683712800000
     9 | Jonathan |  30 | 2023-05-15 18:03:11.675954 | 1683716400000
    10 | Daniel   |  37 | 2023-05-15 18:03:11.675954 | 1683720000000
    11 | Taylor   |  25 | 2023-05-15 18:03:11.675954 | 1683723600000
    12 | Lex      |  32 | 2023-05-15 18:03:11.675954 | 1683727200000
    13 | Shane    |  45 | 2023-05-15 18:03:11.675954 | 1683730800000
   (12 rows)
   ```
   
   ### Debezium Setup
   
   I'm leveraging [**Strimzi K8S Kafka Operator**](https://strimzi.io/) which includes `KafkaConnect` in it suite. Then I create a Docker Image as instructed in [this doc](https://strimzi.io/blog/2020/01/27/deploying-debezium-with-kafkaconnector-resource/) to create/set up the Debezium Connector within that KafkaConnect.
   
   I also include **Confluent's relevant JARs** in that image under the `libs` directory (more on Debezium <> Confluent [here](https://debezium.io/documentation/reference/2.1/configuration/avro.html#confluent-schema-registry)). Essentially this is my `Dockerfile`:
   
   ```Dockerfile
   FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
   USER root:root
   RUN mkdir -p /opt/kafka/plugins/debezium
   RUN mkdir -p /opt/kafka/libs
   COPY ./debezium-connector-postgres-2.2.0.Final/ /opt/kafka/plugins/debezium/
   COPY ./confluent-jars-7.3.3/*.jar /opt/kafka/libs/
   USER 1001
   ```
   
   ### Debezium Kafka Connector Configuration
   
   You can see some information in [that same doc](https://strimzi.io/blog/2020/01/27/deploying-debezium-with-kafkaconnector-resource/) under a section titled "Create The Connector". And my Debezium configuration is laid out below. And the **inferred/registered schema** and **sample messages consumed** are laid out in [my older comment](https://github.com/apache/hudi/issues/8519#issuecomment-1542967885) as you know.
   
   ```
   plugin.name: pgoutput
   database.hostname: <DB_HOST>
   database.port: 5432
   database.user: <DB_USERNAME>
   database.password: <DB_PWD>
   database.dbname : <DB_NAME>
   topic.prefix: <KAFKA_TOPIC_PREFIX>
   schema.include.list: public
   key.converter: io.confluent.connect.avro.AvroConverter
   key.converter.schema.registry.url: http://<REGISTRY_URL>:8081
   value.converter: io.confluent.connect.avro.AvroConverter
   value.converter.schema.registry.url: http://<REGISTRY_URL>:8081
   table.include.list: public.samser_customers
   topic.creation.enable: true
   topic.creation.default.replication.factor: 1
   topic.creation.default.partitions: 1
   topic.creation.default.cleanup.policy: compact
   topic.creation.default.compression.type: lz4
   decimal.handling.mode: double
   tombstones.on.delete: false
   ```
   
   Thanks again @the-other-tim-brown for the help, I really appreciate it and please let me know in case there is anything else you need.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1578489646

   Tracking this issue as part of this issue - https://github.com/apache/hudi/issues/8268#issuecomment-1578486469
   
   Closing this issue as can be said duplicate as we could track with this jira - https://issues.apache.org/jira/browse/HUDI-6321
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] sydneyhoran commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "sydneyhoran (via GitHub)" <gi...@apache.org>.
sydneyhoran commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1533514539

   The reason I was getting an error on deleting records without tombstone was because we were testing by starting from a midpoint of a Kafka topic, so I suspect Deltastreamer didn't know what to do with `"d"` messages for records that didn't exist. Had to add a few more logging lines and check executor logs to find out what was going on.
   
   We'll make sure jobs are starting fresh from the start of topics and turn on `commit-on-errors` if needed to get around non-existent deletes (until there is a possible fix for that).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] samserpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "samserpoosh (via GitHub)" <gi...@apache.org>.
samserpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1542618638

   Certainly Apicurio's Avro serialization and more importantly **inferred schema** was inaccurate as detailed in [this comment](https://github.com/apache/hudi/issues/8519#issuecomment-1540961546). So I ended up leveraging **Confluent's Schema Registry (SR)** instead. Especially since Hudi's source code mentions in multiple places that it's very much entangled with Confluent's SR style APIs/responses.
   
   Their SR did the right thing and I verified that the `source` field is properly serialized and its schema/type is inferred accurately now. However, when trying to run the `DeltaStreamer` job, I'm still getting the **same** error:
   
   ```
   Exception in thread "main" org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:191)
           at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:186)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:553)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
           at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
           at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
           at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
           at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
           at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
           at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
           at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException
           at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
           at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
           at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:189)
           ... 15 more
   Caused by: org.apache.hudi.exception.HoodieException
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:713)
           at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.NullPointerException
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:301)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679)
           ... 4 more
   ```
   
   The command I'm running is:
   
   ```
   spark-submit \                                                                                                                                                                                                                                                
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \                                                                                                                                                                                              
   --master spark://<SPARK_MASTER_URL>:<PORT> \                                                                                                                                                                                                                  
   --total-executor-cores 1 \                                                                                                                                                                                                                                    
   --executor-memory 4g \                                                                                                                                                                                                                                        
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \                                                                                                                                                                                      
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \                                                                                                                                                                                                         
   --conf spark.scheduler.mode=FAIR \                                                                                                                                                                                                                            
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \                                                                                                                                                
   --table-type COPY_ON_WRITE \                                                                                                                                                                                                                                  
   --target-base-path s3a://path/to/directory \                                                                                                                                                                                                                  
   --target-table <TABLE_NAME> \                                                                                                                                                                                                                                 
   --min-sync-interval-seconds 30 \                                                                                                                                                                                                                              
   --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \                                                                                                                                                                            
   --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \                                                                                                                                                                           
   --source-ordering-field _event_lsn \                                                                                                                                                                                                                          
   --op UPSERT \                                                                                                                                                                                                                                                 
   --continuous \                                                                                                                                                                                                                                                
   --source-limit 5000 \                                                                                                                                                                                                                                         
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \                                                                                                                                                                                               
   --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \                                                                                                                                                                                         
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO> \                                                                                                         
   --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer \                                                                                                                        
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \                                                                                                                                                                                         
   --hoodie-conf auto.offset.reset=earliest \                                                                                                                                                                                                                    
   --hoodie-conf hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \                                                                                                                                                                              
   --hoodie-conf hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \                                                                                                                                                                            
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \                                                                                                                                                                                          
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \                                                                                                                                                                                           
   --hoodie-conf hoodie.metadata.enable=true \                                                                                                                                                                                                                   
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \                                                                                                                                                                                                
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   ## What About Vanilla Kafka Consumption with DeltaStreamer
   
   This is most likely a [**Debezium <> DeltaStreamer**](https://hudi.apache.org/blog/2022/01/14/change-data-capture-with-debezium-and-apache-hudi/) specific issue. Because I tried to run a DeltaStreamer job to just consume the **same Kafka topic** and the run goes further along. And it only fails for a sensible and expected reason (the `UPSTREAM_DB_PKEY_FIELD` is NULL since I'm not longer using `PostgresDebeziumSource` so my Kafka events won't go through [this processing](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#L50-L85) and my provided `recordKey.field` no longer exists under that particular post-processed name). Here's the command I used to run a vanilla Kafka ingestion DeltaStreamer:
   
   ```
   spark-submit \                                                                                                                                                                                                                                                
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \                                                                                                                                                                                              
   --master spark://<SPARK_MASTER_URL>:<PORT> \                                                                                                                                                                                                                  
   --total-executor-cores 1 \                                                                                                                                                                                                                                    
   --executor-memory 4g \                                                                                                                                                                                                                                        
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \                                                                                                                                                                                      
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \                                                                                                                                                                                                         
   --conf spark.scheduler.mode=FAIR \                                                                                                                                                                                                                            
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \                                                                                                                                                
   --table-type COPY_ON_WRITE \                                                                                                                                                                                                                                  
   --target-base-path s3a://path/to/directory \                                                                                                                                                                                                                  
   --target-table <TABLE_NAME> \                                                                                                                                                                                                                                 
   --min-sync-interval-seconds 30 \                                                                                                                                                                                                                              
   --source-ordering-field _event_lsn \                                                                                                                                                                                                                          
   --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \                                                                                                                                                                              
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \                                                                                                                                                                                            
   --op BULK_INSERT \                                                                                                                                                                                                                                            
   --continuous \                                                                                                                                                                                                                                                
   --source-limit 5000 \                                                                                                                                                                                                                                         
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \                                                                                                                                                                                               
   --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \                                                                                                                                                                                         
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO> \                                                                                                         
   --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer \                                                                                                                        
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \                                                                                                                                                                                         
   --hoodie-conf auto.offset.reset=earliest \                                                                                                                                                                                                                    
   --hoodie-conf hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \                                                                                                                                                                              
   --hoodie-conf hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \                                                                                                                                                                            
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \                                                                                                                                                                                          
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \                                                                                                                                                                                           
   --hoodie-conf hoodie.metadata.enable=true \                                                                                                                                                                                                                   
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \                                                                                                                                                                                                
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   And NULL KEY ERROR I mentioned is:
   
   ```
   Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "<UPSTREAM_DB_PKEY_FIELD>" cannot be null or empty.
           at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:141)
           at org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50)
           at org.apache.hudi.keygen.SimpleKeyGenerator.getRecordKey(SimpleKeyGenerator.java:58)
           at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$b741bfe4$1(DeltaSync.java:495)
           at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271)
           at scala.collection.Iterator.foreach(Iterator.scala:941)
           at scala.collection.Iterator.foreach$(Iterator.scala:941)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
           at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
           at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
           at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
           at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
           at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
           at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
           at scala.collection.AbstractIterator.to(Iterator.scala:1429)
           at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
           at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
           at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
           at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
           at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
           at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
           at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
           at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:131)
           at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
           ... 3 more
   ```
   
   Any thoughts/help would be highly appreciated :smile: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1545048150

   I can reproduce this with a much simpler schema and corresponding Kafka key-value messages as well. Let's say we have this schema in our Confluent Schema Registry (SR):
   
   ```json
   {
     "type": "record",
     "name": "Envelope",
     "fields": [
       {
         "name": "before",
         "default": null,
         "type": [
           "null",
           {
             "name": "Value",
             "type": "record",
             "fields": [
               {
                 "name": "id",
                 "type": "int"
               },
               {
                 "name": "fst_name",
                 "type": "string"
               }
             ]
           }
         ]
       },
       {
         "name": "after",
         "default": null,
         "type": [
           "null",
           "Value"
         ]
       },
       {
         "name": "op",
         "type": "string"
       }
     ]
   }
   ```
   
   Then when we try to publish a message in the following format:
   
   ```json
   {
     "after": {
       "id": 10,
       "fst_name": "Bob"
     },  
     "before": null,
     "op": "c" 
   }
   ```
   
   The `kafka-avro-console-producer` throws up with this exception:
   
   ```
   Caused by: org.apache.avro.AvroTypeException: Unknown union branch id                                                                                                                                                                                         
           at org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:434)                                                                                                                                                                                     
           at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)                                                                                                                                                                           
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)                                                                                                                                                      
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)                                                                                                                                                                       
           at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)                                                                                                                                                                  
           at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)                                                                                                                                                                 
           at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)                                                                                                                                                      
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)                                                                                                                                                                       
           at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)                                                                                                                                                                       
           at io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.toObject(AvroSchemaUtils.java:214)                                                                                                                                                          
           at io.confluent.kafka.formatter.AvroMessageReader.readFrom(AvroMessageReader.java:124)                                                                                                                                                                
           ... 3 more
   ```
   
   Changing the input message to the following format leads to a successful serializing and publishing to Kafka (simply wrapping id & fst_name inside a `Value` object):
   
   ```json
   {
     "after": {
       "Value": {                                                                                                                                                                                                                                                
         "id": 10,
         "fst_name": "Bob"
       }
     },
     "before": null,
     "op": "c"
   }
   ```
   
   This is pretty much what `Debezium` is currently doing IIUC. However on the downstream, Hudi's expectation is something like this WRT `before` and `after` fields:
   
   ```json
    {
     "after": {
       "id": 10,
       "fst_name": "Bob"
     },  
     "before": null,
     "op": "c" 
   }
   ```
   
   The question is:
   
   1. How should one define an Avro schema which would allow for nullable named record types, so the non-Value based format mentioned above would work just fine?
   2. How should I get Debezium to do that instead of what it's currently doing which is what I've reproduced above?
   
   Regarding #2, I know others have managed to get the Debezium-Avro-serialization working without that extra `Value` object :disappointed:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] samserpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "samserpoosh (via GitHub)" <gi...@apache.org>.
samserpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1530412949

   I'm facing a **very similar** error/stack-trace when trying to leverage DeltaStreamer with `PostgresDebeziumSource` as well. In my case, I'm sure it's not due to DELETE/tombstone records because I'm testing the E2E Data-Flow via some dummy tables and I've only done INSERT into those dummy tables.
   
   Here's the command I'm executing to submit `DeltaStreamer` job:
   
   ```shell
   spark-submit \                                                                                                                                                                                                                                                
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,..." \                                       
   --master <spark_master_url> \                                                                                                                                                                                                                                 
   --total-executor-cores <executor_cnt> \                                                                                                                                                                                                                       
   --executor-memory <mem> \                                                                                                                                                                                                                                     
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \                                                                                                                                                                                      
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \                                                                                                                                                                                                         
   --conf spark.scheduler.mode=FAIR \                                                                                                                                                                                                                            
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \                                                                                                                                                
   --table-type COPY_ON_WRITE \                                                                                                                                                                                                                                  
   --target-base-path s3a://***/data_lake/cdc/<table_name> \                                                                                                                                                                                                     
   --target-table <table_name> \                                                                                                                                                                                                                                 
   --min-sync-interval-seconds 60 \                                                                                                                                                                                                                              
   --source-ordering-field _event_lsn \                                                                                                                                                                                                                          
   --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \                                                                                                                                                                            
   --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \                                                                                                                                                                           
   --op UPSERT \                                                                                                                                                                                                                                                 
   --continuous \                                                                                                                                                                                                                                                
   --source-limit 5000 \                                                                                                                                                                                                                                         
   --hoodie-conf bootstrap.servers=<kafka_bootstrap_server> \                                                                                                                                                                                                    
   --hoodie-conf schema.registry.url=<schema_registry> \                                                                                                                                                                                                         
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=<schema_registry>/api/artifacts/<kafka_topic>-value/versions/<version_no> \                                                                                                                    
   --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer \                                                                                                                        
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<kafka_topic> \                                                                                                                                                                                         
   --hoodie-conf auto.offset.reset=earliest \                                                                                                                                                                                                                    
   --hoodie-conf hoodie.datasource.write.recordkey.field=id \                                                                                                                                                                                                    
   --hoodie-conf hoodie.datasource.write.partitionpath.field=name \                                                                                                                                                                                              
   --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator \                                                                                                                                                          
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \                                                                                                                                                                                          
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \                                                                                                                                                                                           
   --hoodie-conf hoodie.metadata.enable=true \                                                                                                                                                                                                                   
   --hoodie-conf hoodie.metadata.index.bloom.filter.column.list=id \                                                                                                                                                                                             
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \                                                                                                                                                                                                
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   Am I missing any configuration property or the like which leads to this issue?
   
   ### One Potentially Issue Maybe?!
   
   One thing I'm suspicious of is related to **Schema Registry**, **Serializer** and **Deserializer** in this Data-Flow:
   
   - I leverage [Apicurio](https://debezium.io/documentation/reference/stable/configuration/avro.html#apicurio-registry) as the Schema Registry, and its `AvroCovnerter` serializer in the Debezium Connector settings
   - Then on the DeltaStreamer job, I'm using `org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer` as you see above
   
   Can this ^^ lead to issues? Or given the exception we're seeing about `DeltaSync`, it's not related to this at all?
   
   Thank you very much in advance, appreciate your help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] sydneyhoran commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "sydneyhoran (via GitHub)" <gi...@apache.org>.
sydneyhoran commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1529939285

   When we either turn off tombstones in Debezium, or filter them out in DebeziumSource.java, no null/tombstones are coming in (which is good). But we still get a "commit failed" and upon further inspection of the log I have found that before this error it says `Delta Sync found errors when writing. Errors/Total=14/9282`
   
   It seems that DeltaSync/WriteStatus.java is treating the deletes as "Errors". When I set `--commit-on-errors` to true, it allows the job to run but what happens to those "error" records? Shouldn't they be telling Deltastreamer to "delete" those records?
   
   ```
   23/05/01 13:49:26 ERROR org.apache.hudi.utilities.deltastreamer.DeltaSync: Delta Sync found errors when writing. Errors/Total=14/9282
   23/05/01 13:49:26 ERROR org.apache.hudi.utilities.deltastreamer.DeltaSync: Printing out the top 100 errors
   23/05/01 13:49:26 ERROR org.apache.hudi.utilities.deltastreamer.DeltaSync: Global error :
   23/05/01 13:49:26 ERROR org.apache.hudi.utilities.deltastreamer.DeltaSync: Global error :
   23/05/01 13:49:26 ERROR org.apache.hudi.utilities.deltastreamer.DeltaSync: Global error :
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] the-other-tim-brown commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "the-other-tim-brown (via GitHub)" <gi...@apache.org>.
the-other-tim-brown commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1550022043

   @Sam-Serpoosh We need to look to the source of the data. How are you running Debezium and what types of configurations do you have there?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1542619896

   Certainly Apicurio's Avro serialization and more importantly **inferred schema** was inaccurate as detailed in [this comment](https://github.com/apache/hudi/issues/8519#issuecomment-1540961546). So I ended up leveraging **Confluent's Schema Registry (SR)** instead. Especially since Hudi's source code mentions in multiple places that it's very much entangled with Confluent's SR style APIs/responses.
   
   Their SR did the right thing and I verified that the `source` field is properly serialized and its schema/type is inferred accurately now. However, when trying to run the `DeltaStreamer` job, I'm still getting the **same** error:
   
   ```
   Exception in thread "main" org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:191)
           at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:186)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:553)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
           at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
           at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
           at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
           at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
           at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
           at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
           at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException
           at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
           at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
           at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:189)
           ... 15 more
   Caused by: org.apache.hudi.exception.HoodieException
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:713)
           at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.NullPointerException
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:301)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679)
           ... 4 more
   ```
   
   The command I'm running is:
   
   ```
   spark-submit \
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \
   --master spark://<SPARK_MASTER_URL>:<PORT> \
   --total-executor-cores 1 \
   --executor-memory 4g \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \
   --conf spark.scheduler.mode=FAIR \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \
   --table-type COPY_ON_WRITE \
   --target-base-path s3a://path/to/directory \
   --target-table <TABLE_NAME> \
   --min-sync-interval-seconds 30 \
   --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
   --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \
   --source-ordering-field _event_lsn \
   --op UPSERT \
   --continuous \
   --source-limit 5000 \
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \
   --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO> \
   --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer \
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \
   --hoodie-conf hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \
   --hoodie-conf hoodie.metadata.enable=true \
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   ## What About Vanilla Kafka Consumption with DeltaStreamer
   
   This is most likely a [**Debezium <> DeltaStreamer**](https://hudi.apache.org/blog/2022/01/14/change-data-capture-with-debezium-and-apache-hudi/) specific issue. Because I tried to run a DeltaStreamer job to just consume the **same Kafka topic** and the run goes further along. And it only fails for a sensible and expected reason (the `UPSTREAM_DB_PKEY_FIELD` is NULL since I'm not longer using `PostgresDebeziumSource` so my Kafka events won't go through [this processing](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#L50-L85) and my provided `recordKey.field` no longer exists under that particular post-processed name). Here's the command I used to run a vanilla Kafka ingestion DeltaStreamer:
   
   ```
   spark-submit \
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \
   --master spark://<SPARK_MASTER_URL>:<PORT> \
   --total-executor-cores 1 \
   --executor-memory 4g \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \
   --conf spark.scheduler.mode=FAIR \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \
   --table-type COPY_ON_WRITE \
   --target-base-path s3a://path/to/directory \
   --target-table <TABLE_NAME> \
   --min-sync-interval-seconds 30 \
   --source-ordering-field _event_lsn \
   --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource \
   --op BULK_INSERT \
   --continuous \
   --source-limit 5000 \
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \
   --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO> \
   --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer \
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \
   --hoodie-conf hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \
   --hoodie-conf hoodie.metadata.enable=true \
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```
   
   And NULL KEY ERROR I mentioned is:
   
   ```
   Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "<UPSTREAM_DB_PKEY_FIELD>" cannot be null or empty.
           at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:141)
           at org.apache.hudi.keygen.SimpleAvroKeyGenerator.getRecordKey(SimpleAvroKeyGenerator.java:50)
           at org.apache.hudi.keygen.SimpleKeyGenerator.getRecordKey(SimpleKeyGenerator.java:58)
           at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:65)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$fetchFromSource$b741bfe4$1(DeltaSync.java:495)
           at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1070)
           at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
           at scala.collection.Iterator$SliceIterator.next(Iterator.scala:271)
           at scala.collection.Iterator.foreach(Iterator.scala:941)
           at scala.collection.Iterator.foreach$(Iterator.scala:941)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
           at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
           at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
           at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
           at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
           at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
           at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
           at scala.collection.AbstractIterator.to(Iterator.scala:1429)
           at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
           at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
           at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
           at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
           at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
           at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
           at org.apache.spark.rdd.RDD.$anonfun$take$2(RDD.scala:1449)
           at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
           at org.apache.spark.scheduler.Task.run(Task.scala:131)
           at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
           ... 3 more
   ```
   
   Any thoughts/help would be highly appreciated :smile:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] samserpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "samserpoosh (via GitHub)" <gi...@apache.org>.
samserpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1542967885

   @the-other-tim-brown There's a good chance that this is caused by the **input Kafka topic's events** and how they're serialized/deserialized (i.e. the way Debezium Connector is currently shaping and publishing the change-log messages to Kafka).
   
   I leveraged the `kafka-avro-console-consumer` that comes with Confluent's Schema Registry, and here's how my dummy/test table's change-log events look like:
   
   ```json
   {
     "before": null,
     "after": {
       "<topic_prefix>.<schema_name>.samser_customers.Value": {
         "id": 1,
         "name": "Bob",
         "age": 40,
         "created_at": {
           "long": 1683661733071814
         },
         "event_ts": {
           "long": 1681984800000
         }
       }
     },
     "source": {
       "version": "2.1.2.Final",
       "connector": "postgresql",
       "name": "pg_dev8",
       "ts_ms": 1683734195621,
       "snapshot": {
         "string": "first_in_data_collection"
       },
       "db": "<db_name>",
       "sequence": {
         "string": "[null,\"1213462492184\"]"
       },
       "schema": "public",
       "table": "samser_customers",
       "txId": {
         "long": 806227
       },
       "lsn": {
         "long": 1213462492184
       },
       "xmin": null
     },
     "op": "r",
     "ts_ms": {
       "long": 1683734196050
     },
     "transaction": null
   }
   ```
   
   And here's the corresponding schema which was established in the Schema Registry:
   
   ```json
   {
     "type": "record",
     "name": "Envelope",
     "namespace": "<topic_prefix>.<schema_name>.samser_customers",
     "fields": [
       {
         "name": "before",
         "type": [
           "null",
           {
             "type": "record",
             "name": "Value",
             "fields": [
               {
                 "name": "id",
                 "type": {
                   "type": "int",
                   "connect.default": 0
                 },
                 "default": 0
               },
               {
                 "name": "name",
                 "type": "string"
               },
               {
                 "name": "age",
                 "type": "int"
               },
               {
                 "name": "created_at",
                 "type": [
                   "null",
                   {
                     "type": "long",
                     "connect.version": 1,
                     "connect.name": "io.debezium.time.MicroTimestamp"
                   }
                 ],
                 "default": null
               },
               {
                 "name": "event_ts",
                 "type": [
                   "null",
                   "long"
                 ],
                 "default": null
               }
             ],
             "connect.name": "<topic_prefix>.<schema_name>.samser_customers.Value"
           }
         ],
         "default": null
       },
       {
         "name": "after",
         "type": [
           "null",
           "Value"
         ],
         "default": null
       },
       {
         "name": "source",
         "type": {
           "type": "record",
           "name": "Source",
           "namespace": "io.debezium.connector.postgresql",
           "fields": [
             {
               "name": "version",
               "type": "string"
             },
             {
               "name": "connector",
               "type": "string"
             },
             {
               "name": "name",
               "type": "string"
             },
             {
               "name": "ts_ms",
               "type": "long"
             },
             {
               "name": "snapshot",
               "type": [
                 {
                   "type": "string",
                   "connect.version": 1,
                   "connect.parameters": {
                     "allowed": "true,last,false,incremental"
                   },
                   "connect.default": "false",
                   "connect.name": "io.debezium.data.Enum"
                 },
                 "null"
               ],
               "default": "false"
             },
             {
               "name": "db",
               "type": "string"
             },
             {
               "name": "sequence",
               "type": [
                 "null",
                 "string"
               ],
               "default": null
             },
             {
               "name": "schema",
               "type": "string"
             },
             {
               "name": "table",
               "type": "string"
             },
             {
               "name": "txId",
               "type": [
                 "null",
                 "long"
               ],
               "default": null
             },
             {
               "name": "lsn",
               "type": [
                 "null",
                 "long"
               ],
               "default": null
             },
             {
               "name": "xmin",
               "type": [
                 "null",
                 "long"
               ],
               "default": null
             }
           ],
           "connect.name": "io.debezium.connector.postgresql.Source"
         }
       },
       {
         "name": "op",
         "type": "string"
       },
       {
         "name": "ts_ms",
         "type": [
           "null",
           "long"
         ],
         "default": null
       },
       {
         "name": "transaction",
         "type": [
           "null",
           {
             "type": "record",
             "name": "block",
             "namespace": "event",
             "fields": [
               {
                 "name": "id",
                 "type": "string"
               },
               {
                 "name": "total_order",
                 "type": "long"
               },
               {
                 "name": "data_collection_order",
                 "type": "long"
               }
             ],
             "connect.version": 1,
             "connect.name": "event.block"
           }
         ],
         "default": null
       }
     ],
     "connect.version": 1,
     "connect.name": "<topic_prefix>.<schema_name>.samser_customers.Envelope"
   }
   ```
   
   I **think** the extra nested level that we see due to the `<topic_prefix>.<schema_name>.samser_customers.Value` field **might** be causing this issue?! It looks like this:
   
   ```json
   {
     "before": null,
     "after": {
       "<topic_prefix>.<schema_name>.samser_customers.Value": {
         "id": 1,
         "name": "Bob",
         "age": 40,
         "created_at": {
           "long": 1683661733071814
         },
         "event_ts": {
           "long": 1681984800000
         }
       }
     },
     "source": {
       ...
     }
   ```
   
   However, according to [this line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#LL44C57-L44C156) in `PostgresDebeziumSource` which references the [Debezium docs](https://debezium.io/documentation/reference/1.4/connectors/postgresql.html#postgresql-create-events), it expects `before` and `after` fields won't be nested under anything else. IIUC, Hudi expects something like the following when it comes to `before` and `after` field:
   
   ```json
   {
     ...
     "after": {
       "id": 1,
       "name": "Bob",
       "age": 40,
       "created_at": {
         "long": 1683661733071814
       },
       "event_ts": {
         "long": 1681984800000
       }
     },
     ...
   }
   ```
   
   But we're giving it:
   
   ```json
   {
     ...
     "after": {
       "pg_dev8.public.samser_customers.Value": {
         "id": 1,
         "name": "Bob",
         "age": 40,
         "created_at": {
           "long": 1683661733071814
         },
         "event_ts": {
           "long": 1681984800000
         }
       }
     },
     ...
   }
   ```
   
   So when `after.*` is executed in the following snippet:
   
   https://github.com/apache/hudi/blob/622d27a099f5dec96f992fd423b666083da4b24a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#L55-L66
   
   Instead of getting all fields of `after` in a FLAT fashion (i.e. `id`, `name` and others), it gets **one OBJECT** named `<topic_prefix>.<schema_name>.samser_customers.Value` which has those fields nested under it.
   
   Do you think this is the issue or I'm mistaken?! Also, IF this is the case, I wonder what configuration I should apply to my **Debezium-Connector** so it won't create this extra nested layer under `after` and/or `before` fields. cc @sydneyhoran


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1540961546

   @the-other-tim-brown I've been trying to get a vanilla DeltaStreamer <> Kafka job to run in order to consume the Kafka topic which is populated by my **Debezium <> Postgres (PG) Connector**.
   
   I upgraded my [Apicurio Schema Registry](https://www.apicur.io/registry/docs/apicurio-registry/2.3.x/getting-started/assembly-intro-to-the-registry.html) to a later version `2.4.2.Final` which supports **Confluent Compatible APIs** since that's what's expected by Hudi's `SchemaRegistryProvider` as can be seen [here](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java#L41-L45) and [here](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java#L100).
   
   I also verified that the Debezium PG Connector does the follwing:
   
   - Avro-Serialize change-log events
   - Register the inferred Avro schema in the Apicurio registry
   - Publishes said serialized events into Kafka
   
   However, when I try to ingest these Kafka events using `DeltaStreamer`, I noticed the following ERROR:
   
   ```
   Exception in thread "main" org.apache.avro.SchemaParseException: "io.debezium.connector.postgresql.Source" is not a defined name. The type of the "source" field must be a defined name or a {"type": ...} expression.
           at org.apache.avro.Schema.parse(Schema.java:1265)
           at org.apache.avro.Schema$Parser.parse(Schema.java:1032)
           at org.apache.avro.Schema$Parser.parse(Schema.java:1020)
           at org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSchema(SchemaRegistryProvider.java:100)
           at org.apache.hudi.utilities.schema.SchemaRegistryProvider.getSourceSchema(SchemaRegistryProvider.java:107)
           at org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor.getSourceSchema(SchemaProviderWithPostProcessor.java:42)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.registerAvroSchemas(DeltaSync.java:868)
           at org.apache.hudi.utilities.deltastreamer.DeltaSync.<init>(DeltaSync.java:235)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.<init>(HoodieDeltaStreamer.java:650)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:142)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.<init>(HoodieDeltaStreamer.java:115)
           at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:553)
           at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
           at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
           at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
           at java.lang.reflect.Method.invoke(Method.java:498)
           at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
           at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
           at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
           at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
           at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
           at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
           at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
           at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   ```
   
   I believe this makes sense. When I inspected the **inferred Avro schema** stored in Apicurio registry, I noticed the following bit there:
   
   ```json
   {
     "type": "record",
     "name": "Envelope",
     "namespace": "<kafka_topic_name>",
     "fields": [
       ...,
       {
         "name": "source",
         "type": "io.debezium.connector.postgresql.Source"
       },
       {
         "name": "op",
         "type": "string"
       },
       ...
     ],
     ...
   }
   ```
   
   As you see, `source`'s type is **not** in the `{ "type": ... }` structure as expected by the Avro-Schema-Parser in [this line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java#L100). Does this mean `Apicurio`'s **Avro-Serializer** is not able to properly and thoroughly serialize AND infer the Avro schema? For reference, the relevant bits of my Debezium Connector's configuration are:
   
   ```yaml
   ...
   key.converter.schemas.enable: true
   key.converter: io.apicurio.registry.utils.converter.AvroConverter
   key.converter.apicurio.registry.url: http://<registry_URL>:8080/apis/registry/v2
   key.converter.apicurio.registry.auto-register: true
   key.converter.apicurio.registry.find-latest: true
   value.converter.schemas.enable: true
   value.converter: io.apicurio.registry.utils.converter.AvroConverter
   value.converter.apicurio.registry.url: http://<registry_URL>:8080/apis/registry/v2
   value.converter.apicurio.registry.auto-register: true
   value.converter.apicurio.registry.find-latest: true
   ...
   ```
   
   I wonder if `io.apicurio.registry.utils.converter.AvroConverter` is **not** able to infer the proper type for the `source` field so it ends up inferring `"type": "io.debezium.connector.postgresql.Source"` instead which leads to failure downstream. I'm curious to see if someone who's using `io.confluent.connect.avro.AvroConverter` instead have also seen this issue or in their case, the `source` field's type is inferred accurately and in the `{ "type": ... }` structure? cc @sydneyhoran


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] samserpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "samserpoosh (via GitHub)" <gi...@apache.org>.
samserpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1550060806

   @the-other-tim-brown Below I've laid out some information that you might find helpful ...
   
   ### DB Table I'm Testing With in PostgreSQL
   
   ```
   \d samser_customers
                                           Table "public.samser_customers"
      Column   |            Type             | Collation | Nullable |                   Default                    
   ------------+-----------------------------+-----------+----------+----------------------------------------------
    id         | integer                     |           | not null | nextval('samser_customers_id_seq'::regclass)
    name       | character varying(50)       |           | not null | 
    age        | integer                     |           | not null | 
    created_at | timestamp without time zone |           |          | 
    event_ts   | bigint                      |           |          | 
   Indexes:
       "samser_customers_pkey" PRIMARY KEY, btree (id)
   Referenced by:
       TABLE "samser_orders" CONSTRAINT "fk_customer" FOREIGN KEY (customer_id) REFERENCES samser_customers(id)
   Publications:
       "dbz_publication"
   ```
   
   The dummy data currently in the table:
   
   ```
   SELECT * FROM samser_customers;
    id |   name   | age |         created_at         |   event_ts    
   ----+----------+-----+----------------------------+---------------
     1 | Bob      |  40 | 2023-05-12 00:05:27.204463 | 1681984800000
     2 | Alice    |  30 | 2023-05-12 00:05:27.204463 | 1681988400000
     3 | John     |  37 | 2023-05-12 00:05:27.204463 | 1681992000000
     4 | Jon      |  25 | 2023-05-12 00:05:27.204463 | 1681995600000
     5 | David    |  20 | 2023-05-12 00:05:27.204463 | 1681999200000
     6 | Jack     |  70 | 2023-05-12 00:05:27.204463 | 1682002800000
     8 | Ali      |  35 | 2023-05-15 18:03:11.675954 | 1683712800000
     9 | Jonathan |  30 | 2023-05-15 18:03:11.675954 | 1683716400000
    10 | Daniel   |  37 | 2023-05-15 18:03:11.675954 | 1683720000000
    11 | Taylor   |  25 | 2023-05-15 18:03:11.675954 | 1683723600000
    12 | Lex      |  32 | 2023-05-15 18:03:11.675954 | 1683727200000
    13 | Shane    |  45 | 2023-05-15 18:03:11.675954 | 1683730800000
   (12 rows)
   ```
   
   ### Debezium Setup
   
   I'm leveraging [**Strimzi K8S Kafka Operator**](https://strimzi.io/) which includes `KafkaConnect` in it suite. Then I create a Docker Image as instructed in [this doc](https://strimzi.io/blog/2020/01/27/deploying-debezium-with-kafkaconnector-resource/) to create/set up the Debezium Connector within that KafkaConnect.
   
   I also include **Confluent's relevant JARs** in that image under the `libs` directory (more on Debezium <> Confluent [here](https://debezium.io/documentation/reference/2.1/configuration/avro.html#confluent-schema-registry)). Essentially this is my `Dockerfile`:
   
   ```Dockerfile
   FROM quay.io/strimzi/kafka:0.34.0-kafka-3.4.0
   USER root:root
   RUN mkdir -p /opt/kafka/plugins/debezium
   RUN mkdir -p /opt/kafka/libs
   COPY ./debezium-connector-postgres-2.2.0.Final/ /opt/kafka/plugins/debezium/                                                                                                                                                                                  
   COPY ./confluent-jars-7.3.3/*.jar /opt/kafka/libs/
   USER 1001
   ```
   
   ### Debezium Kafka Connector Configuration
   
   You can see some information in [that same doc](https://strimzi.io/blog/2020/01/27/deploying-debezium-with-kafkaconnector-resource/) under a section titled "Create The Connector". And my Debezium configuration is laid out below. And the **inferred/registered schema** and **sample messages consumed** are laid out in [my older comment](https://github.com/apache/hudi/issues/8519#issuecomment-1542967885) as you know.
   
   ```
   plugin.name: pgoutput                                                                                                                                                                                                                                         
   database.hostname: <DB_HOST>                                                                                                                                                                                                                                  
   database.port: 5432                                                                                                                                                                                                                                           
   database.user: <DB_USERNAME>                                                                                                                                                                                                                                  
   database.password: <DB_PWD>                                                                                                                                                                                                                                   
   database.dbname : <DB_NAME>                                                                                                                                                                                                                                   
   topic.prefix: <KAFKA_TOPIC_PREFIX>                                                                                                                                                                                                                            
   schema.include.list: public                                                                                                                                                                                                                                   
   key.converter: io.confluent.connect.avro.AvroConverter                                                                                                                                                                                                        
   key.converter.schema.registry.url: http://<REGISTRY_URL>:8081                                                                                                                                                                                                 
   value.converter: io.confluent.connect.avro.AvroConverter                                                                                                                                                                                                      
   value.converter.schema.registry.url: http://<REGISTRY_URL>:8081                                                                                                                                                                                               
   table.include.list: public.samser_customers                                                                                                                                                                                                                   
   topic.creation.enable: true                                                                                                                                                                                                                                   
   topic.creation.default.replication.factor: 1                                                                                                                                                                                                                  
   topic.creation.default.partitions: 1                                                                                                                                                                                                                          
   topic.creation.default.cleanup.policy: compact                                                                                                                                                                                                                
   topic.creation.default.compression.type: lz4                                                                                                                                                                                                                  
   decimal.handling.mode: double                                                                                                                                                                                                                                 
   tombstones.on.delete: false
   ```
   
   Thanks again @the-other-tim-brown for the help, I really appreciate it and please let me know in case there is anything else you need.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] sydneyhoran commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "sydneyhoran (via GitHub)" <gi...@apache.org>.
sydneyhoran commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1529944441

   Thank you @the-other-tim-brown for the explanation/confirmation. That is what we have assumed as well, but it seems with our configuration we are unable to parse the event with `before.*` and `op: "d"`, seems Deltastreamer just sees it as an error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1544320455

   @the-other-tim-brown This is done via:
   
   - Postgres (in AWS RDS) `11.16`
   - Debezium `2.1.2.Final`
   
   While we're at it, these `created_at` and `event_ts` fields are worrisome as well I **maybe** since they don't seem to be interpreted as simple/primitive `long` types?
   
   ```json
   {
     ...
     "after": {
       "<topic_prefix>.<schema_name>.samser_customers.Value": {
         "id": 1,
         "name": "Bob",
         "age": 40,
         "created_at": {
           "long": 1683661733071814
         },
         "event_ts": {
           "long": 1681984800000
         }
       }
     },
     ...
   }
   ```
   
   Their corresponding schema portion:
   
   ```json
   {
     ...
     {
       "name": "created_at",
       "type": [
         "null",
         {
           "type": "long",
           "connect.version": 1,
           "connect.name": "io.debezium.time.MicroTimestamp"
         }
       ],
       "default": null
     },
     {
       "name": "event_ts",
       "type": [
         "null",
         "long"
       ],
       "default": null
     }
     ...
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1546481863

   @the-other-tim-brown According to this [SO question/thread](https://stackoverflow.com/questions/76239689/non-nested-avro-schema-for-postgres-change-log-events-debezium-confluent-sch), it seems like **nesting** under the `Value` object in Avro (de)serialization is a **normal** behavior.
   
   Then I'm surprised that others haven't seen an issue with this when it comes to `Debezium <> Hudi`. Unless my hypothesis is inaccurate and Hudi is actually capable of gracefully handling incoming Kafka messages with a **nested Value** object in them?!
   
   In the meantime, I'm going to work on a simple patch which would change this line (same with its counterpart on BEFORE):
   
   https://github.com/apache/hudi/blob/622d27a099f5dec96f992fd423b666083da4b24a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java#L64
   
   To this (same with its counterpart on BEFORE):
   
   ```java
   String.format("%s.Value.*", DebeziumConstants.INCOMING_AFTER_FIELD)
   ```
   
   And see IF that'll resolve the issue or not?!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] jpechane commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "jpechane (via GitHub)" <gi...@apache.org>.
jpechane commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1545173516

   @Sam-Serpoosh Hi, Debezium does not do any serialization. It just prepares data structure described with Kafka Connect schema. The serialization itself is done by Avro converter provide by Confluent. Debezium is unable to influence the serialization in any way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1546035984

   @jpechane Thanks for the response 😄 Sorry, I should have been more careful with my wording, you're right that Debezium does not directly do any serialization. That is done by `Confluent's AvroConverter` along with Avro schema inference IIUC.
   
   I mainly need to figure out **how to configure** `AvroConverter` within the context of Debezium Kafka-Connector so I won't end up with this **extra** `Value` object and unnecessary nested structure which is causing issues downstream.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1542839452

   > for the first issue regarding the schema, this is because we are fetching that schema as a string. If that class is not defined in the string, we won't know how it is defined. Maybe there is some arg to pass to the api to get the schemas that this schema relies on as well?
   
   @the-other-tim-brown That makes perfect sense and I ended up resolving that issue by simply using **Confluent Schema Registry** instead of the `Apicurio` I was previously using. Since Confluent's includes everything correctly in **one place** so Hudi/DeltaStreamer can fetch it in one-swoop properly.
   
   > For the second, it is hard to tell without looking at your data. If you pull the data locally and step through, you may have a better shot of understanding. The main thing I have seen trip people up is the requirements for the delete records in the topic. You can also try out the same patch Sydney posted above for filtering out the tombstones in kafka.
   
   I **highly** doubt in my case it's caused by a tombstone record or the like. Because I'm testing this Data-Flow on a dummy/test Postgres table to which I've **only** applied `INSERT` operations so far.
   
   And BTW, I could **successfully** get a **vanilla Kafka ingestion** running end-to-end and populate a partitioned Hudi table as expected. So definitely the issue is specific to when I switch to `PostgresDebeziumSource` and `PostgresDebeziumAvroPayload`.
   
   Thank you very much for your input. I'll try to see what's the best way to debug this and how to figure out what's causing the exception I shared above when it comes to DeltaStreamer <> Debezium ...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] sydneyhoran commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "sydneyhoran (via GitHub)" <gi...@apache.org>.
sydneyhoran commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1524065639

   Just as an update, we were able to set tombstones.on.delete to `false` in a lower environment and still got the following error after a delete op:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] the-other-tim-brown commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "the-other-tim-brown (via GitHub)" <gi...@apache.org>.
the-other-tim-brown commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1530680623

   > Thank you @the-other-tim-brown for the explanation/confirmation. That is what we have assumed as well, but it seems with our configuration we are unable to parse the event with `before.*` and `op: "d"`, seems Deltastreamer just sees it as an error.
   > 
   > We are using COW table, Upsert mode if that makes a difference.
   
   @sydneyhoran that shouldn't make a difference. Can you expand your initial stacktrace to show what the NPE was caused by including the method that threw it. Can you also confirm that the `before` field in your kafka topic is set for all records? Without it the debezium logic in Hudi will break.
   
   Did you get any stacktrace for the errors? It could be due to a schema incompatibility issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] the-other-tim-brown commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "the-other-tim-brown (via GitHub)" <gi...@apache.org>.
the-other-tim-brown commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1530683969

   
   @samserpoosh you can confirm whether this is the case by deserializing the data with `org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer` without the extra postgres debezium logic. I am unfamiliar with the settings that come with that so it could be leading to issues parsing the records. Do you have a stacktrace?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan closed issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan closed issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException
URL: https://github.com/apache/hudi/issues/8519


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1550008181

   @the-other-tim-brown I was going to post some updates about this but you beat me to it :smile:
   
   > My question is simply why are you having this issue when tens of other users/companies including myself are able to get debezium topics without the extra level of nesting.
   
   Others not complaining about the **nested** issue also made me suspicious that something else must be going on here, and this whole nested `Value` object **might** be a **red herring**. So here are some updates ...
   
   ### Our Hudi Version/Patch & What I Realized So Far
   
   We're currently using a FORK based off Hudi `0.11.1` with **some patches/commits** applied to it such as the following (this setup predates me):
   
   - [commit-1](https://github.com/rohitmittapalli/hudi/commit/b118df63eedceb50d9fa7045fea6f6ee44289e81)
   - [commit-2](https://github.com/rohitmittapalli/hudi/commit/55f1f18033f23ae4fb93e9c1c2983dff2d57f3ad)
   - [commit-3](https://github.com/rohitmittapalli/hudi/commit/7ffa56bc3d549289507dcf70e3fc1f6487cd4b90)
     - Related to `userProvidedSchemaProvider`
   - Etc.
   
   I rebuilt that fork and used the JAR to run my job. Then noticed the **NPE** was thrown due to **no schema-provider class** since I was **not** passing `--schemaprovider-class` CLI argument. So this line in `DeltaSync.java`, was throwing  the exception (understandably since that provider was NULL):
   
   ```java
   userProvidedSchemaProvider.refresh();
   ```
   
   I was **not** passing that argument to my run/spark-submit command due to [this comment](https://github.com/apache/hudi/issues/6348#issuecomment-1223742672) I stumbled upon the other day. However, that comment is no longer valid (presumably due to changes to Hudi ever since). So I added back this line to my command:
   
   ```bash
   --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
   ```
   
   ## IllegalArgumentException At This Point
   
   Then I hit the same issue that @sydneyhoran has posted about in #8521. So I manually applied the **same changes** that @ad1happy2go proposed in [this PR](https://github.com/apache/hudi/pull/7225/files) and that got me a bit further along.
   
   ## Not Even Reaching PostgresDebeziumSource
   
   As you see, the hypothesis on `PostgresDebeziumSource` and **nested Value object** was a **red herring** since we haven't even gotten that far in the execution path. With the aforementioned changes, I'm now seeing a new exception which is thrown by [this line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala#L123) (apparently) and that logic is invoked by [this line](https://github.com/apache/hudi/blob/release-0.11.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java#L157). Here's the exception details:
   
   ```
   Driver stacktrace:
     at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
     at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
     at scala.Option.foreach(Option.scala:407)
     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
     at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
     at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
     at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557)
     at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
     at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557)
     at org.apache.hudi.AvroConversionUtils$.createDataFrame(AvroConversionUtils.scala:123)
     at org.apache.hudi.AvroConversionUtils.createDataFrame(AvroConversionUtils.scala)
     at org.apache.hudi.utilities.sources.debezium.DebeziumSource.toDataset(DebeziumSource.java:169)
     at org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:134)
     at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
     at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
     at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:69)
     at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:465)
     at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:398)
     at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
     at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:679)
     ... 4 more
   Caused by: java.io.IOException: unexpected exception type
     at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
     at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2093)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1655)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
     at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
     at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
     at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
     at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
     at org.apache.spark.scheduler.Task.run(Task.scala:131)
     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
     ... 3 more
   Caused by: java.lang.reflect.InvocationTargetException
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     java.lang.reflect.Method.invoke(Method.java:498)
     at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
     ... 53 more
   Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
     at org.apache.hudi.utilities.sources.debezium.DebeziumSource.$deserializeLambda$(DebeziumSource.java:64)
     ... 63 more
   ```
   
   ## Is This One Really Due To Avro Schema Now?!
   
   As I mentioned, the current Hudi setup we have is a tad strange/out-dated. We utilize `0.11.1` with some patches/commits applied to it. I **think** the best move on my end would be:
   
   - Build a JAR with **latest stable Hudi** (0.13.0) (assuming that's doable with our setup/ecosystem)
   - Try running the **same DeltaStreamer job** with this new JAR and see what happens
   - Post updates here ...
   
   P.S. For reference, this is my spark-submit command BTW:
   
   ```shell
   $ spark-submit \
   --jars "opt/spark/jars/hudi-utilities-bundle.jar,<other_jars>" \
   --master spark://<SPARK_MASTER_URL>:<PORT> \
   --total-executor-cores 1 \
   --executor-memory 4g \
   --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
   --conf spark.hadoop.fs.s3a.connection.maximum=10000 \
   --conf spark.scheduler.mode=FAIR \
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \
   --table-type COPY_ON_WRITE \
   --target-base-path s3a://path/to/directory \
   --target-table <TABLE_NAME> \
   --min-sync-interval-seconds 30 \
   --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \
   --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \
   --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \
   --source-ordering-field _event_lsn \
   --op UPSERT \
   --continuous \
   --source-limit 5000 \
   --hoodie-conf bootstrap.servers=<KAFKA_BOOTSTRAP_SERVER>:9092 \
   --hoodie-conf group.id=FOO-deltastreamer \
   --hoodie-conf schema.registry.url=http://<SCHEMA_REGISTRY_URL>:8081 \
   --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://<SCHEMA_REGISTRY_URL>:8081/subjects/<KAFKA_TOPIC>-value/versions/<VERSION_NO> \
   --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer \
   --hoodie-conf hoodie.deltastreamer.source.kafka.topic=<KAFKA_TOPIC> \
   --hoodie-conf auto.offset.reset=earliest \
   --hoodie-conf hoodie.datasource.write.recordkey.field=<UPSTREAM_DB_PKEY_FIELD> \
   --hoodie-conf hoodie.datasource.write.partitionpath.field=<PARTITION_FIELD_NAME> \
   --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
   --hoodie-conf hoodie.datasource.write.precombine.field=_event_lsn \
   --hoodie-conf hoodie.metadata.enable=true \
   --hoodie-conf hoodie.metadata.index.column.stats.enable=true \
   --hoodie-conf hoodie.parquet.small.file.limit=134217728
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] Sam-Serpoosh commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "Sam-Serpoosh (via GitHub)" <gi...@apache.org>.
Sam-Serpoosh commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1544940333

   @jpechane This seems to be related to `Debezium` IIUC and how it's serializing the CDC events prior to publishing them to Kafka. As detailed in [this comment](https://github.com/apache/hudi/issues/8519#issuecomment-1542967885) and [this one](https://github.com/apache/hudi/issues/8519#issuecomment-1544320455), there is an **extra/nested** object/record field named `Value` under `after` or `before` and not sure why that's the case.
   
   The `before` and `after` fields' type is a **union type** looking like:
   
   ```json
   {
         "name": "before",
         "type": [
           "null",
           {
             "type": "record",
             "name": "Value",
             "fields": [
               {
                 "name": "id",
                 "type": {
                   "type": "int",
                   "connect.default": 0
                 },
                 "default": 0
               },
               {
                 "name": "name",
                 "type": "string"
               },
               {
                 "name": "age",
                 "type": "int"
               },
               {
                 "name": "created_at",
                 "type": [
                   "null",
                   {
                     "type": "long",
                     "connect.version": 1,
                     "connect.name": "io.debezium.time.MicroTimestamp"
                   }
                 ],
                 "default": null
               },
               {
                 "name": "event_ts",
                 "type": [
                   "null",
                   "long"
                 ],
                 "default": null
               }
             ],
             "connect.name": "<topic_prefix>.<schema_name>.samser_customers.Value"
           }
         ],
         "default": null
       },
       {
         "name": "after",
         "type": [
           "null",
           "Value"
         ],
         "default": null
       },
       ...
   }
   ```
   
   However when I consume/deserialize events using Confluent's `kafka-avro-console-consumer`, I see the `before` field has an/a **OBJECT/RECORD** field named `Value` under it and then fields (e.g. `id` and `name`) are associated with that instead of directly being associated with the `before` field. According to the aforementioned Avro schema, **Value** is just the TYPE of the `before` field. But for some reason it comes out as a **field** so we end up with `before.Value.id` (or `after.Value.id`) instead of `after.id`.
   
   Any thoughts on why this is happening? We don't see this behavior in the case of the `source` field (whose types is **also** a **record**) and that field is showing the correct behavior. In case needed, here's my Debezium Connector configuration:
   
   ```
   schema.include.list: public
   key.converter: io.confluent.connect.avro.AvroConverter
   key.converter.schema.registry.url: http://<REGISTYR_URL>:8081
   value.converter: io.confluent.connect.avro.AvroConverter
   value.converter.schema.registry.url: http://<REGISTRY_URL>:8081
   table.include.list: public.samser_customers
   topic.creation.enable: true
   topic.creation.default.replication.factor: 1
   topic.creation.default.partitions: 1
   topic.creation.default.cleanup.policy: compact
   topic.creation.default.compression.type: lz4
   decimal.handling.mode: double
   tombstones.on.delete: false
   ```
   
   Thank you very much in advance appreciate your help here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] the-other-tim-brown commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "the-other-tim-brown (via GitHub)" <gi...@apache.org>.
the-other-tim-brown commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1548675221

   @Sam-Serpoosh let's clarify a few things. The avro schema is valid and yes hudi can handle it. My question is simply why are you having this issue when tens of other users/companies including myself are able to get debezium topics without the extra level of nesting. You can see in the link I posted previously that there is an expected schema coming from debezium but your data is not ending up in that format. How are you deploying debezium and what are your configs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] sydneyhoran commented on issue #8519: [SUPPORT] Deltastreamer AvroDeserializer failing with java.lang.NullPointerException

Posted by "sydneyhoran (via GitHub)" <gi...@apache.org>.
sydneyhoran commented on issue #8519:
URL: https://github.com/apache/hudi/issues/8519#issuecomment-1524068006

   My team is trying to develop a Custom Transformer class that can skip over null (tombstone) records from PostgresDebezium Kafka Source to address this. We are attempting along the lines of:
   
   ```
   public class TombstoneTransformer implements Transformer {
   
     private static final Logger LOG = LoggerFactory.getLogger(TombstoneTransformer.class);
   
     @Override
     public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
                               TypedProperties properties) {
       LOG.info("TombstoneTransformer " + rowDataset);
       // // NullPointerException happens on the following line:
       // List<Row> rowList = rowDataset.collectAsList();
       // // NullPointerException happens on the following line:
       // newRowSet.collectAsList().stream().limit(50).forEach(x -> LOG.info(x.toString()));
       // // Later in DeltaSync, tombstone records appear to still be present and results in NullPointerException later
       // Dataset<Row> newRowSet = rowDataset.filter("_change_operation_type is not null");
       // // Later in DeltaSync, tombstone records appear to still be present and results in NullPointerException later
       Dataset<Row> newRowSet = rowDataset.filter(Objects::nonNull);
       return newRowSet;
     }
   }
   ```
   
   However, none of the attempts at filtering the rowDataset get rid of the NullPointerException later in the Deltastreamer ingestion. Moreso, many of the attempts to log/view the individual records in rowDataset result in NullPointerException . And so we are wondering if there is something earlier in the code (maybe the PostgresDebeziumSource.java that flattens messages?) that runs that could be allowing malformed Row objects to get passed to the Custom Transformer classes - that somehow is not allowing us to read/access the Rows and filter out the ones that are null (tombstone) records.
   Anyone that might have an idea for how to make this class work? Side note - we also tried SqlQueryBasedTransformer with “SELECT * FROM <SRC> a WHERE a.id is not null” and it also did not filter out Tombstones (still had NPE later during ingestion).
   
   Could someone explain what is happening with delete operations on `PostgresDebeziumSource` and `PostgresDebeziumAvroPayload` and why they potentially aren't being handled well? Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org