You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sydneyhoran (via GitHub)" <gi...@apache.org> on 2023/04/20 18:44:48 UTC
[GitHub] [hudi] sydneyhoran opened a new issue, #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
sydneyhoran opened a new issue, #8521:
URL: https://github.com/apache/hudi/issues/8521
**_Tips before filing an issue_**
- Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
- Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.
- If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
**Describe the problem you faced**
Hi there, I am curious on the compatibility of KafkaAvroSchemaDeserializer with PostgresDebeziumSource. Do these two options work together?
Trying to follow [this article](https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer/) to use Deltastreamer with Kafka topics where the schema is evolving. For some reason, I am unable to pass the config:
```
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
```
I am also providing the schema provider class:
```
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
```
The job fails to start due to:
```
Caused by: org.apache.hudi.exception.HoodieException: java.lang.IllegalArgumentException: Property hoodie.deltastreamer.source.kafka.value.deserializer.schema not found
```
It appears that this line of code in [AvroKafkaSource.java](https://github.com/apache/hudi/blob/6082e9c9c46ee8da6aa779c2f009fc50f83467b7/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java#L53) changes the config to `.schema` instead of `.class`, and then the job is subsequently unable to find it in the configs.
The job at least starts up with the default param (KafkaAvroDeserializer), but on certain topics I eventually get ArrayIndexOutOfBounds exception (`Caused by: java.lang.ArrayIndexOutOfBoundsException: 10`) which we believe is due to schema evolution mid-batch, so I'm trying to use the custom KafkaAvroSchemaDeserializer instead.
We are using Deltastreamer with a PostgresDebeziumSource and consuming data from confluent Kafka, using Confluent Schema Registry as the Schema Provider. The Kafka consumer fails to construct when we pass the config `hoodie.deltastreamer.source.kafka.value.deserializer.class`. Without this config, it runs fine for a while and then all of a sudden fails with ArrayIndexOutOfBoundsException.
Thanks for any input!
**To Reproduce**
Steps to reproduce the behavior:
1. Run Deltastreamer job with PostgresDebeziumSource using the following params:
```
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
--payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
<...>
--hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
```
2. Kafka consumer fails to construct
**Expected behavior**
The job should accept this config to be able to handle schema changes from Debezium -> Kafka gracefully.
**Environment Description**
* Hudi version : 0.13.0
* Spark version : 3.1
* Hive version : N/A
* Hadoop version : N/A
* Storage (HDFS/S3/GCS..) : S3
* Running on Docker? (yes/no) : both
**Additional context**
**hoodie configs:**
```
--target-base-path s3a://{{ bucket }}/{{ table_path }}
--target-table {{ table_name }}
--continuous
--props gs://path/to/tablename.properties
--min-sync-interval-seconds 15
--source-ordering-field updated_at
--source-limit 5000
--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider
--table-type COPY_ON_WRITE
--op UPSERT
--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource
--payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
```
**tablename.properties**
```
hoodie.deltastreamer.source.kafka.value.deserializer.class=org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer
hoodie.deltastreamer.schemaprovider.registry.url={{ schema_url }}.confluent.cloud/subjects/{{ topic }}-value/versions/latest
hoodie.deltastreamer.source.kafka.topic=some.topic
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=inserted_at
hoodie.datasource.write.precombine.field=updated_at
schema.registry.url={{ schema_url }}
schema.registry.basic.auth.user.info={{ schema_user }}:{{ schema_key }}
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ kafka_user }}' password='{{ kafka_key }}';
bootstrap.servers={{ bootstrap_server }}
hoodie.embed.timeline.server=false
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
group.id=hudi-deltastreamer
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
basic.auth.credentials.source=USER_INFO
heartbeat.interval.ms=5000
session.timeout.ms=120000
request.timeout.ms=900000
retry.backoff.ms=500
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true
max.rounds.without.new.data.to.shutdown=5
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.client.heartbeat.interval_in_ms=120000
hoodie.client.heartbeat.tolerable.misses=10
hoodie.write.lock.client.wait_time_ms_between_retry=1000
hoodie.write.lock.max_wait_time_ms_between_retry=1000
hoodie.write.lock.wait_time_ms_between_retry=500
hoodie.write.lock.wait_time_ms=5000
hoodie.write.lock.client.num_retries=10
hoodie.metadata.enable=false
```
**Stacktrace**
```
Exception in thread "main" org.apache.hudi.utilities.ingestion.HoodieIngestionException: Ingestion service was shut down with exception.
at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:66)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:212)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:575)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Failed to construct kafka consumer
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
at org.apache.hudi.utilities.ingestion.HoodieIngestionService.startIngestion(HoodieIngestionService.java:64)
... 15 more
Caused by: org.apache.hudi.exception.HoodieException: Failed to construct kafka consumer
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:757)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:193)
at org.apache.hudi.utilities.sources.debezium.DebeziumSource.fetchNextBatch(DebeziumSource.java:113)
at org.apache.hudi.utilities.sources.RowSource.fetchNewData(RowSource.java:43)
at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:76)
at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInAvroFormat(SourceFormatAdapter.java:176)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:585)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:493)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:401)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$1(HoodieDeltaStreamer.java:717)
... 4 more
Caused by: org.apache.hudi.exception.HoodieException: java.lang.IllegalArgumentException: Property hoodie.deltastreamer.source.kafka.value.deserializer.schema not found
at org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.configure(KafkaAvroSchemaDeserializer.java:53)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:721)
... 15 more
Caused by: java.lang.IllegalArgumentException: Property hoodie.deltastreamer.source.kafka.value.deserializer.schema not found
at org.apache.hudi.common.config.TypedProperties.checkKey(TypedProperties.java:67)
at org.apache.hudi.common.config.TypedProperties.getString(TypedProperties.java:72)
at org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer.configure(KafkaAvroSchemaDeserializer.java:51)
... 16 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] sydneyhoran commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
Posted by "sydneyhoran (via GitHub)" <gi...@apache.org>.
sydneyhoran commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1539033823
@ad1happy2go I can try the patch, but can you expand on the comment linked above by @samserpoosh that @rmahindra123 mentioned we should not set `--schemaprovider-class` with Debezium source? I see that `SchemaProvider has to be set to use KafkaAvroSchemaDeserializer` so wondering if it's safe to use.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] ad1happy2go commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1523233107
JIRA to track - https://issues.apache.org/jira/browse/HUDI-6141
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] ad1happy2go commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1568028064
@sydneyhoran
Yes, KafkaAvroSchemaDeserializer is not supported with Debezium Source as of now. It should not be used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] ad1happy2go commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1568033156
JIRA to track for the support - https://issues.apache.org/jira/browse/HUDI-6285
Feel free to close this issue as we have a tracking jira for improvement.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] danielfordfc commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
Posted by "danielfordfc (via GitHub)" <gi...@apache.org>.
danielfordfc commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1517444809
I have been able to essentially replicate this in our own setup and would love to see a fix!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] samserpoosh commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
Posted by "samserpoosh (via GitHub)" <gi...@apache.org>.
samserpoosh commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1529090886
@sydneyhoran You might have seen this already, but just in case, I stumbled upon [this comment](https://github.com/apache/hudi/issues/6348#issuecomment-1223742672) which mentioned you should **not** provide `schemaprovider-class` when dealing with `XXXDebeziumSource`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] xushiyan closed issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan closed issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
URL: https://github.com/apache/hudi/issues/8521
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] ad1happy2go commented on issue #8521: [SUPPORT] Deltastreamer not recognizing config `hoodie.deltastreamer.source.kafka.value.deserializer.class` with PostgresDebeziumSource
Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8521:
URL: https://github.com/apache/hudi/issues/8521#issuecomment-1535940781
@sydneyhoran Can you try with this patch. This should fix the issue. https://github.com/apache/hudi/pull/7225/files
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org