You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Vinoth Chandar (Jira)" <ji...@apache.org> on 2021/10/04 14:30:00 UTC
[jira] [Updated] (HUDI-2374) AvroDFSSource does not use the
overridden schema to deserialize Avro binaries.
[ https://issues.apache.org/jira/browse/HUDI-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vinoth Chandar updated HUDI-2374:
---------------------------------
Labels: sev:critical (was: patch)
> AvroDFSSource does not use the overridden schema to deserialize Avro binaries.
> ------------------------------------------------------------------------------
>
> Key: HUDI-2374
> URL: https://issues.apache.org/jira/browse/HUDI-2374
> Project: Apache Hudi
> Issue Type: Bug
> Components: DeltaStreamer
> Affects Versions: 0.9.0
> Reporter: Xuan Huy Pham
> Priority: Major
> Labels: sev:critical
> Fix For: 0.10.0
>
> Original Estimate: 48h
> Remaining Estimate: 48h
>
> Hi,
> I am not sure if the AvroDFSSource is intended to ignore the source schema from designated schema provider class, but the current logic always uses the Avro writer schema as reader schema.
> Logic as of release-0.9.0, Class: {{org.apache.hudi.utilities.sources.AvroDFSSource}}
> {code:java}
> public class AvroDFSSource extends AvroSource {
> private final DFSPathSelector pathSelector;
> public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
> SchemaProvider schemaProvider) throws IOException {
> super(props, sparkContext, sparkSession, schemaProvider);
> this.pathSelector = DFSPathSelector
> .createSourceSelector(props, sparkContext.hadoopConfiguration());
> }
> @Override
> protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
> Pair<Option<String>, String> selectPathsWithMaxModificationTime =
> pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit);
> return selectPathsWithMaxModificationTime.getLeft()
> .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
> .orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
> }
> private JavaRDD<GenericRecord> fromFiles(String pathStr) {
> sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro data from files");
> JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
> AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration());
> return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
> }
> }
> {code}
> The {{schemaProvider}} parameter is completely ignored in the constructor, making {{AvroKeyInputFormat}} always use writer schema to read.
> As a result, we often see this from DeltaStream logs:
> {code:java}
> 21/08/30 10:17:24 WARN AvroKeyInputFormat: Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.
> 21/08/30 10:17:24 INFO AvroKeyInputFormat: Using a reader schema equal to the writer schema.
> {code}
> This [https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer] is a nice blog writing for AvroKafkaSource that supports BACKWARD_TRANSITIVE schema evolution. For DFS data, I see this is the main blocker. If we pass the source schema from {{schemaProvider}}, we should be able to have the same BACKWARD_TRANSITIVE schema evolution feature for DFS avro data.
>
> Suggested Fix: Pass the source schema from {{schemaProvider}} to hadoop configuration key {{avro.schema.input.key}}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)