You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sydneyhoran (via GitHub)" <gi...@apache.org> on 2023/04/20 18:44:48 UTC

[GitHub] [hudi] sydneyhoran opened a new issue, #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource

sydneyhoran opened a new issue, #8521:
URL: https://github.com/apache/hudi/issues/8521

   **_Tips before filing an issue_**
   
   - Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   - Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
   
   - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   
   **Describe the problem you faced**
   
   Hi there, I am curious on the compatibility of KafkaAvroSchemaDeserializer with PostgresDebeziumSource. Do these two options work together?
   
   Trying to follow [this article](https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer/) to use Deltastreamer with Kafka topics where the schema is evolving. For some reason, I am unable to pass the config:
   
   ```
   hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
   ```
   
   I am also providing the schema provider class:
   ```
   --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
   ```
   
   The job fails to start due to:
   ```
   Caused by: org.apache.hudi.exception.HoodieException: java.lang.IllegalArgumentException: Property hoodie.deltastreamer.source.kafka.value.deserializer.schema not found
   ```
   
   It appears that this line of code in [AvroKafkaSource.java](https://github.com/apache/hudi/blob/6082e9c9c46ee8da6aa779c2f009fc50f83467b7/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java#L53) changes the config to `.schema` instead of `.class`, and then the job is subsequently unable to find it in the configs.
   
   The job at least starts up with the default param (KafkaAvroDeserializer), but on certain topics I eventually get ArrayIndexOutOfBounds exception (`Caused by: java.lang.ArrayIndexOutOfBoundsException: 10`) which we believe is due to schema evolution mid-batch, so I'm trying to use the custom KafkaAvroSchemaDeserializer instead.
   
   We are using Deltastreamer with a PostgresDebeziumSource and consuming data from confluent Kafka, using Confluent Schema Registry as the Schema Provider. The Kafka consumer fails to construct when we pass the config `hoodie.deltastreamer.source.kafka.value.deserializer.class`. Without this config, it runs fine for a while and then all of a sudden fails with ArrayIndexOutOfBoundsException.
   
   Thanks for any input!
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Run Deltastreamer job with PostgresDebeziumSource using the following params:
   ``` 
   --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
   --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
   --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
   <...>
   --hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
   ```
   2. Kafka consumer fails to construct
   
   
   **Expected behavior**
   
   The job should accept this config to be able to handle schema changes from Debezium -> Kafka gracefully.
   
   
   **Environment Description**
   
   * Hudi version : 0.13.0
   
   * Spark version : 3.1
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : both
   
   
   **Additional context**
   
   **hoodie configs:**
   ```
   --target-base-path s3a://{{ bucket }}/{{ table_path }}
   --target-table {{ table_name }}
   --continuous
   --props gs://path/to/tablename.properties
   --min-sync-interval-seconds 15
   --source-ordering-field updated_at
   --source-limit 5000
   --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
   --table-type COPY_ON_WRITE
   --op UPSERT
   --source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
   --payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
   ```
   
   **tablename.properties**
   ```
   hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
   hoodie.deltastreamer.schemaprovider.registry.url={{ schema_url }}.confluent.cloud/subjects/{{ topic }}-value/versions/latest
   hoodie.deltastreamer.source.kafka.topic=some.topic
   hoodie.datasource.write.recordkey.field=id
   hoodie.datasource.write.partitionpath.field=inserted_at
   hoodie.datasource.write.precombine.field=updated_at
   schema.registry.url={{ schema_url }}
   schema.registry.basic.auth.user.info={{ schema_user }}:{{ schema_key }}
   sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ kafka_user }}' password='{{ kafka_key }}';
   bootstrap.servers={{ bootstrap_server }}
   hoodie.embed.timeline.server=false
   hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
   hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
   hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
   group.id=hudi-deltastreamer
   security.protocol=SASL_SSL
   sasl.mechanism=PLAIN
   basic.auth.credentials.source=USER_INFO
   heartbeat.interval.ms=5000
   session.timeout.ms=120000
   request.timeout.ms=900000
   retry.backoff.ms=500
   hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true
   max.rounds.without.new.data.to.shutdown=5
   hoodie.write.concurrency.mode=optimistic_concurrency_control
   hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
   hoodie.cleaner.policy.failed.writes=LAZY
   hoodie.client.heartbeat.interval_in_ms=120000
   hoodie.client.heartbeat.tolerable.misses=10
   hoodie.write.lock.client.wait_time_ms_between_retry=1000
   hoodie.write.lock.max_wait_time_ms_between_retry=1000
   hoodie.write.lock.wait_time_ms_between_retry=500
   hoodie.write.lock.wait_time_ms=5000
   hoodie.write.lock.client.num_retries=10
   hoodie.metadata.enable=false
   ```
   
   
   **Stacktrace**
   
   ```
   Exception in thread "main" org.apache.hudi.utilities.ingestion.HoodieIngestionException: Ingestion service was shut down with exception.
   	at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:66)
   	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:212)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:575)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
   	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
   	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
   	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
   	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
   	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
   	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
   	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Failed to construct kafka consumer
   	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
   	at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
   	at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:64)
   	... 15 more
   Caused by: org.apache.hudi.exception.HoodieException: Failed to construct kafka consumer
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:757)
   	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
   	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
   	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
   	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
   	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:193)
   	at org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:113)
   	at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
   	at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
   	at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:176)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:585)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:493)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:401)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:717)
   	... 4 more
   Caused by: org.apache.hudi.exception.HoodieException: java.lang.IllegalArgumentException: Property hoodie.deltastreamer.source.kafka.value.deserializer.schema not found
   	at org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.configure(KafkaAvroSchemaDeserializer.java:53)
   	at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:721)
   	... 15 more
   Caused by: java.lang.IllegalArgumentException: Property hoodie.deltastreamer.source.kafka.value.deserializer.schema not found
   	at org.apache.hudi.common.config.TypedProperties.checkKey(TypedProperties.java:67)
   	at org.apache.hudi.common.config.TypedProperties.getString(TypedProperties.java:72)
   	at org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.configure(KafkaAvroSchemaDeserializer.java:51)
   	... 16 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] sydneyhoran commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource

Posted by "sydneyhoran (via GitHub)" <gi...@apache.org>.
sydneyhoran commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1539033823

   @ad1happy2go I can try the patch, but can you expand on the comment linked above by @samserpoosh that @rmahindra123  mentioned we should not set `--schemaprovider-class` with Debezium source? I see that `SchemaProvider has to be set to use KafkaAvroSchemaDeserializer` so wondering if it's safe to use.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1523233107

   JIRA to track - https://issues.apache.org/jira/browse/HUDI-6141


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1568028064

   @sydneyhoran 
   Yes, KafkaAvroSchemaDeserializer is not supported with Debezium Source as of now. It should not be used.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1568033156

   JIRA to track for the support - https://issues.apache.org/jira/browse/HUDI-6285
   
   Feel free to close this issue as we have a tracking jira for improvement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] danielfordfc commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource

Posted by "danielfordfc (via GitHub)" <gi...@apache.org>.
danielfordfc commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1517444809

   I have been able to essentially replicate this in our own setup and would love to see a fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] samserpoosh commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource

Posted by "samserpoosh (via GitHub)" <gi...@apache.org>.
samserpoosh commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1529090886

   @sydneyhoran You might have seen this already, but just in case, I stumbled upon [this comment](https://github.com/apache/hudi/issues/6348#issuecomment-1223742672) which mentioned you should **not** provide `schemaprovider-class` when dealing with `XXXDebeziumSource`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan closed issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan closed issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
URL: https://github.com/apache/hudi/issues/8521


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1535940781

   @sydneyhoran Can you try with this patch. This should fix the issue. https://github.com/apache/hudi/pull/7225/files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org