You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Vinoth Chandar (Jira)" <ji...@apache.org> on 2021/10/04 14:30:00 UTC

[jira] [Updated] (HUDI-2374) AvroDFSSource does not use the overridden schema to deserialize Avro binaries.

     [ https://issues.apache.org/jira/browse/HUDI-2374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Vinoth Chandar updated HUDI-2374:
---------------------------------
    Labels: sev:critical  (was: patch)

> AvroDFSSource does not use the overridden schema to deserialize Avro binaries.
> ------------------------------------------------------------------------------
>
>                 Key: HUDI-2374
>                 URL: https://issues.apache.org/jira/browse/HUDI-2374
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer
>    Affects Versions: 0.9.0
>            Reporter: Xuan Huy Pham
>            Priority: Major
>              Labels: sev:critical
>             Fix For: 0.10.0
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Hi,
> I am not sure if the AvroDFSSource is intended to ignore the source schema from designated schema provider class, but the current logic always uses the Avro writer schema as reader schema.
>  Logic as of release-0.9.0, Class: {{org.apache.hudi.utilities.sources.AvroDFSSource}}
> {code:java}
> public class AvroDFSSource extends AvroSource {
>   private final DFSPathSelector pathSelector;
>   public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
>       SchemaProvider schemaProvider) throws IOException {
>     super(props, sparkContext, sparkSession, schemaProvider);
>     this.pathSelector = DFSPathSelector
>         .createSourceSelector(props, sparkContext.hadoopConfiguration());
>   }
>   @Override
>   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
>     Pair<Option<String>, String> selectPathsWithMaxModificationTime =
>         pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit);
>     return selectPathsWithMaxModificationTime.getLeft()
>         .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
>         .orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
>   }
>   private JavaRDD<GenericRecord> fromFiles(String pathStr) {
>     sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro data from files");
>     JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
>         AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration());
>     return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
>   }
> }
> {code}
> The {{schemaProvider}} parameter is completely ignored in the constructor, making {{AvroKeyInputFormat}} always use writer schema to read.
> As a result, we often see this from DeltaStream logs:
> {code:java}
> 21/08/30 10:17:24 WARN AvroKeyInputFormat: Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.
> 21/08/30 10:17:24 INFO AvroKeyInputFormat: Using a reader schema equal to the writer schema.
> {code}
> This [https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer] is a nice blog writing for AvroKafkaSource that supports BACKWARD_TRANSITIVE schema evolution. For DFS data, I see this is the main blocker. If we pass the source schema from {{schemaProvider}}, we should be able to have the same  BACKWARD_TRANSITIVE schema evolution feature for DFS avro data.
>  
> Suggested Fix: Pass the source schema from {{schemaProvider}} to hadoop configuration key {{avro.schema.input.key}}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)