You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/08/05 21:45:15 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #2927: [HUDI-1129] Improving schema evolution support in hudi

vinothchandar commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r683796125



##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {

Review comment:
       you can just use default values, without having to create method overloads in scala.

##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
     val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
-    createRdd(df, avroSchema, structName, recordNamespace)
+    // if upgradeToLatestSchemaIfNeeded is set to true and latestSchema is not null, then try to leverage latestSchema

Review comment:
       can we just determine the latestSchema here with something like and make one call to `createRdd` 
   
   ```
   val reconciledSchema /* or something like that */ = if (...) else ..  
   ```

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -414,6 +414,13 @@ public static void initializeBootstrapDirsIfNotExists(Configuration hadoopConf,
     return fs.listStatus(metaPath, nameFilter);
   }
 
+  /**
+   * @return {@code true} if any commits are found, else {@code false}.
+   */
+  public boolean isTimelineNonEmpty() {
+    return getCommitsTimeline().getInstants().collect(Collectors.toList()).size() > 0;

Review comment:
       double check : that this just returns completed instants.

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -286,6 +286,16 @@ object DataSourceWriteOptions {
     .defaultValue(classOf[HiveSyncTool].getName)
     .withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.")
 
+  val HANDLE_SCHEMA_MISMATCH: ConfigProperty[Boolean] = ConfigProperty
+    .key("hoodie.datasource.write.handle.schema.mismatch")

Review comment:
       lets make this consist with every boolean flag used to denote this. So either this becoms `hoodie.datasource.write.reconcile.schema` or the variable is `handleSchemaMismatch` 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
##########
@@ -40,8 +40,8 @@ public SparkAvroPostProcessor(TypedProperties props, JavaSparkContext jssc) {
 
   @Override
   public Schema processSchema(Schema schema) {
-    return AvroConversionUtils.convertStructTypeToAvroSchema(
+    return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(

Review comment:
       null?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -391,11 +395,14 @@ public void refreshTimeline() throws IOException {
             transformed
                 .map(r -> (SchemaProvider) new DelegatingSchemaProvider(props, jssc,
                     dataAndCheckpoint.getSchemaProvider(),
-                    UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc)))
+                    handleSchemaMismatch

Review comment:
       please pull the schema determination into a separate variable above
   

##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
     val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)

Review comment:
       you probably want to rename this to sth like `writeSchema` - given this is the schema of the incoming write. and latestSchema is probably just the `latestTableSchema` right

##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, upgradeToLatestSchemaIfNeeded)

Review comment:
       `null` as sentinel?

##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
     val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, recordNamespace)
-    createRdd(df, avroSchema, structName, recordNamespace)
+    // if upgradeToLatestSchemaIfNeeded is set to true and latestSchema is not null, then try to leverage latestSchema
+    // this code path will handle situations where records are serialized in schema1, but callers wish to convert to
+    // Rdd[GenericRecord] using different schema(could be evolved schema or could be latest table schema)
+    if (upgradeToLatestSchemaIfNeeded && latestSchema != null) {
+      createRdd(df, avroSchema, latestSchema, structName, recordNamespace)
+    } else {
+      // there are paths where callers wish to use latestSchema to convert to Rdd[GenericRecords] and not use row's schema
+      // So use latestSchema is not null. if not available, fallback to using row's schema.
+      createRdd(df, if(latestSchema != null) latestSchema else avroSchema, null, structName, recordNamespace)
+    }
   }
 
-  def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String)
+  def createRdd(df: DataFrame, avroSchema: Schema, latestSchema: Schema, structName: String, recordNamespace: String)
   : RDD[GenericRecord] = {
     // Use the Avro schema to derive the StructType which has the correct nullability information
     val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
     val encoder = RowEncoder.apply(dataType).resolveAndBind()
     val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
+    // if records were serialized with old schema, but an evolved schema was passed in with latestSchema, we need
+    // latestSchema equivalent datatype to be passed in to AvroConversionHelper.createConverterToAvro()
+    val latestDataType =
+      if (latestSchema != null) SchemaConverters.toSqlType(latestSchema).dataType.asInstanceOf[StructType] else null
+    // Note: deserializer.deserializeRow(row) is not capable of handling evolved schema. i.e. if Row was serialized in
+    // old schema, but deserializer was created with an encoder with evolved schema, deserialization fails.
+    // Hence we always need to deserialize in the same schema as serialized schema.
     df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
       .mapPartitions { records =>
         if (records.isEmpty) Iterator.empty
         else {
-          val convertor = AvroConversionHelper.createConverterToAvro(dataType, structName, recordNamespace)
+          // if records were serialized with old schema, but an evolved schema was passed in with latestSchema, we need
+          // latestSchema equivalent datatype to be passed in to AvroConversionHelper.createConverterToAvro()
+          val convertor = AvroConversionHelper.createConverterToAvro(
+            if (latestDataType != null) latestDataType else dataType, structName, recordNamespace)

Review comment:
       same naming terminology? writeStructType, latestTableStructType

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
##########
@@ -425,11 +429,53 @@ public static SchemaProviderWithPostProcessor wrapSchemaProviderWithPostProcesso
   }
 
   public static SchemaProvider createRowBasedSchemaProvider(StructType structType,
-      TypedProperties cfg, JavaSparkContext jssc) {
+                                                            TypedProperties cfg, JavaSparkContext jssc) {
     SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
     return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, null);
   }
 
+  /**
+   * Create latest schema provider for Target schema.
+   *
+   * @param structType spark data type of incoming batch.
+   * @param jssc       instance of {@link JavaSparkContext}.
+   * @param fs         instance of {@link FileSystem}.
+   * @param basePath   base path of the table.
+   * @return the schema provider where target schema refers to latest schema(either incoming schema or table schema).
+   */
+  public static SchemaProvider createLatestSchemaProvider(StructType structType,
+                                                          JavaSparkContext jssc, FileSystem fs, String basePath) {
+    SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
+    Schema incomingSchema = rowSchemaProvider.getTargetSchema();
+    Schema latestSchema = incomingSchema;
+
+    try {
+      if (FSUtils.isTableExists(basePath, fs)) {
+        HoodieTableMetaClient tableMetaClient = HoodieTableMetaClient.builder().setConf(jssc.sc().hadoopConfiguration()).setBasePath(basePath).build();
+        TableSchemaResolver
+            tableSchemaResolver = new TableSchemaResolver(tableMetaClient);
+        latestSchema = tableSchemaResolver.getLatestSchema(incomingSchema, true, (Function1<Schema, Schema>) v1 -> AvroConversionUtils.convertStructTypeToAvroSchema(
+            AvroConversionUtils.convertAvroSchemaToStructType(v1), RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
+            RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE));
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not fetch table schema. Falling back to writer schema");
+    }
+
+    final Schema finalLatestSchema = latestSchema;
+    return new SchemaProvider(null) {

Review comment:
       null?

##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {

Review comment:
       rename `upgrade` to `reconcile`  and drop `IfNeeded`. just `reconcileToLatestSchema`

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
##########
@@ -425,11 +429,53 @@ public static SchemaProviderWithPostProcessor wrapSchemaProviderWithPostProcesso
   }
 
   public static SchemaProvider createRowBasedSchemaProvider(StructType structType,
-      TypedProperties cfg, JavaSparkContext jssc) {
+                                                            TypedProperties cfg, JavaSparkContext jssc) {
     SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
     return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, null);
   }
 
+  /**
+   * Create latest schema provider for Target schema.
+   *
+   * @param structType spark data type of incoming batch.
+   * @param jssc       instance of {@link JavaSparkContext}.
+   * @param fs         instance of {@link FileSystem}.
+   * @param basePath   base path of the table.
+   * @return the schema provider where target schema refers to latest schema(either incoming schema or table schema).
+   */
+  public static SchemaProvider createLatestSchemaProvider(StructType structType,
+                                                          JavaSparkContext jssc, FileSystem fs, String basePath) {
+    SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
+    Schema incomingSchema = rowSchemaProvider.getTargetSchema();

Review comment:
       writeSchema/tableSchema?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org