You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2023/03/02 15:16:05 UTC

[hudi] branch master updated: [HUDI-4442] [HUDI-5001] Sanitize JsonConversion and RowSource (#8010)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b178ce978c [HUDI-4442] [HUDI-5001] Sanitize JsonConversion and RowSource (#8010)
6b178ce978c is described below

commit 6b178ce978ce324361cf708ce711cd0c46452dd6
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Thu Mar 2 10:15:48 2023 -0500

    [HUDI-4442] [HUDI-5001] Sanitize JsonConversion and RowSource (#8010)
    
    Updates the MercifulJsonConverter to have more mercy by:
    
    Replaces field names with illegal avro name chars with __
    Leverages aliases in the avro schema if present
    This issue is within the scope of row sources. The actual issue is that if the names of the columns in the row sources contain invalid avro characters ref [here|https://avro.apache.org/docs/1.10.2/spec.html#names] then using configuration set we can sanitize the column names both in the schema and actual data and the data ingestion to hudi isn't failed. The schema provider is scoped out to filebasedschemaregistry as other schema registries might not allow to register invalid schema in [...]
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  23 ++-
 .../apache/hudi/avro/MercifulJsonConverter.java    | 120 +++++++++---
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  |  11 ++
 .../hudi/avro/TestMercifulJsonConverter.java       | 102 +++++++++++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   2 +-
 .../deltastreamer/SourceFormatAdapter.java         |  94 ++++++++--
 .../utilities/schema/FilebasedSchemaProvider.java  |  26 ++-
 .../utilities/sources/helpers/AvroConvertor.java   |  19 +-
 .../sources/helpers/SanitizationUtils.java         | 202 +++++++++++++++++++++
 .../deltastreamer/TestSourceFormatAdapter.java     | 189 +++++++++++++++++++
 .../schema/TestFilebasedSchemaProvider.java        |  99 ++++++++++
 .../utilities/sources/TestJsonKafkaSource.java     |   4 +-
 .../sources/helpers/TestSanitizationUtils.java     | 141 ++++++++++++++
 .../utilities/testutils/SanitizationTestUtils.java | 173 ++++++++++++++++++
 .../src/test/resources/data/avro_sanitization.json |   2 +
 .../data/avro_sanitization_bad_naming_in.json      |   2 +
 ...ro_sanitization_bad_naming_nested_array_in.json |   2 +
 ...o_sanitization_bad_naming_nested_array_out.json |   2 +
 ...avro_sanitization_bad_naming_nested_map_in.json |   2 +
 ...vro_sanitization_bad_naming_nested_map_out.json |   2 +
 .../data/avro_sanitization_bad_naming_out.json     |   2 +
 .../file_schema_provider_invalid.avsc              |  50 +++++
 .../file_schema_provider_valid.avsc                |  49 +++++
 23 files changed, 1255 insertions(+), 63 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index aa19fac10b2..8955b477254 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -82,6 +82,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.Properties;
 import java.util.TimeZone;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import static org.apache.avro.Schema.Type.UNION;
@@ -106,8 +107,8 @@ public class HoodieAvroUtils {
   public static final Conversions.DecimalConversion DECIMAL_CONVERSION = new Conversions.DecimalConversion();
 
   // As per https://avro.apache.org/docs/current/spec.html#names
-  private static final String INVALID_AVRO_CHARS_IN_NAMES = "[^A-Za-z0-9_]";
-  private static final String INVALID_AVRO_FIRST_CHAR_IN_NAMES = "[^A-Za-z_]";
+  private static final Pattern INVALID_AVRO_CHARS_IN_NAMES_PATTERN = Pattern.compile("[^A-Za-z0-9_]");
+  private static final Pattern INVALID_AVRO_FIRST_CHAR_IN_NAMES_PATTERN = Pattern.compile("[^A-Za-z_]");
   private static final String MASK_FOR_INVALID_CHARS_IN_NAMES = "__";
 
   // All metadata fields are optional strings.
@@ -721,10 +722,22 @@ public class HoodieAvroUtils {
    * @return sanitized name
    */
   public static String sanitizeName(String name) {
-    if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
-      name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
+    return sanitizeName(name, MASK_FOR_INVALID_CHARS_IN_NAMES);
+  }
+
+  /**
+   * Sanitizes Name according to Avro rule for names.
+   * Removes characters other than the ones mentioned in https://avro.apache.org/docs/current/spec.html#names .
+   *
+   * @param name input name
+   * @param invalidCharMask replacement for invalid characters.
+   * @return sanitized name
+   */
+  public static String sanitizeName(String name, String invalidCharMask) {
+    if (INVALID_AVRO_FIRST_CHAR_IN_NAMES_PATTERN.matcher(name.substring(0, 1)).matches()) {
+      name = INVALID_AVRO_FIRST_CHAR_IN_NAMES_PATTERN.matcher(name).replaceFirst(invalidCharMask);
     }
-    return name.replaceAll(INVALID_AVRO_CHARS_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
+    return INVALID_AVRO_CHARS_IN_NAMES_PATTERN.matcher(name).replaceAll(invalidCharMask);
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
index 2207961c0ba..cdf0f15d80d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java
@@ -36,6 +36,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Converts Json record to Avro Generic Record.
@@ -44,8 +45,14 @@ public class MercifulJsonConverter {
 
   private static final Map<Schema.Type, JsonToAvroFieldProcessor> FIELD_TYPE_PROCESSORS = getFieldTypeProcessors();
 
+  // For each schema (keyed by full name), stores a mapping of schema field name to json field name to account for sanitization of fields
+  private static final Map<String, Map<String, String>> SANITIZED_FIELD_MAPPINGS = new ConcurrentHashMap<>();
+
   private final ObjectMapper mapper;
 
+  private final String invalidCharMask;
+  private final boolean shouldSanitize;
+  
   /**
    * Build type processor map for each avro type.
    */
@@ -72,18 +79,30 @@ public class MercifulJsonConverter {
    * Uses a default objectMapper to deserialize a json string.
    */
   public MercifulJsonConverter() {
-    this(new ObjectMapper());
+    this(false, "__");
+  }
+
+
+  /**
+   * Allows enabling sanitization and allows choice of invalidCharMask for sanitization
+   */
+  public MercifulJsonConverter(boolean shouldSanitize, String invalidCharMask) {
+    this(new ObjectMapper(), shouldSanitize, invalidCharMask);
   }
 
   /**
    * Allows a configured ObjectMapper to be passed for converting json records to avro record.
    */
-  public MercifulJsonConverter(ObjectMapper mapper) {
+  public MercifulJsonConverter(ObjectMapper mapper, boolean shouldSanitize, String invalidCharMask) {
     this.mapper = mapper;
+    this.shouldSanitize = shouldSanitize;
+    this.invalidCharMask = invalidCharMask;
   }
 
   /**
    * Converts json to Avro generic record.
+   * NOTE: if sanitization is needed for avro conversion, the schema input to this method is already sanitized.
+   *       During the conversion here, we sanitize the fields in the data
    *
    * @param json Json record
    * @param schema Schema
@@ -91,23 +110,56 @@ public class MercifulJsonConverter {
   public GenericRecord convert(String json, Schema schema) {
     try {
       Map<String, Object> jsonObjectMap = mapper.readValue(json, Map.class);
-      return convertJsonToAvro(jsonObjectMap, schema);
+      return convertJsonToAvro(jsonObjectMap, schema, shouldSanitize, invalidCharMask);
     } catch (IOException e) {
       throw new HoodieIOException(e.getMessage(), e);
     }
   }
 
-  private static GenericRecord convertJsonToAvro(Map<String, Object> inputJson, Schema schema) {
+  /**
+   * Clear between fetches. If the schema changes or if two tables have the same schemaFullName then
+   * can be issues
+   */
+  public static void clearCache(String schemaFullName) {
+    SANITIZED_FIELD_MAPPINGS.remove(schemaFullName);
+  }
+
+  private static GenericRecord convertJsonToAvro(Map<String, Object> inputJson, Schema schema, boolean shouldSanitize, String invalidCharMask) {
     GenericRecord avroRecord = new GenericData.Record(schema);
     for (Schema.Field f : schema.getFields()) {
-      Object val = inputJson.get(f.name());
+      Object val = shouldSanitize ? getFieldFromJson(f, inputJson, schema.getFullName(), invalidCharMask) : inputJson.get(f.name());
       if (val != null) {
-        avroRecord.put(f.pos(), convertJsonToAvroField(val, f.name(), f.schema()));
+        avroRecord.put(f.pos(), convertJsonToAvroField(val, f.name(), f.schema(), shouldSanitize, invalidCharMask));
       }
     }
     return avroRecord;
   }
 
+  private static Object getFieldFromJson(final Schema.Field fieldSchema, final Map<String, Object> inputJson, final String schemaFullName, final String invalidCharMask) {
+    Map<String, String> schemaToJsonFieldNames = SANITIZED_FIELD_MAPPINGS.computeIfAbsent(schemaFullName, unused -> new ConcurrentHashMap<>());
+    if (!schemaToJsonFieldNames.containsKey(fieldSchema.name())) {
+      // if we don't have field mapping, proactively populate as many as possible based on input json
+      for (String inputFieldName : inputJson.keySet()) {
+        // we expect many fields won't need sanitization so check if un-sanitized field name is already present
+        if (!schemaToJsonFieldNames.containsKey(inputFieldName)) {
+          String sanitizedJsonFieldName = HoodieAvroUtils.sanitizeName(inputFieldName, invalidCharMask);
+          schemaToJsonFieldNames.putIfAbsent(sanitizedJsonFieldName, inputFieldName);
+        }
+      }
+    }
+    Object match = inputJson.get(schemaToJsonFieldNames.getOrDefault(fieldSchema.name(), fieldSchema.name()));
+    if (match != null) {
+      return match;
+    }
+    // Check if there is an alias match
+    for (String alias : fieldSchema.aliases()) {
+      if (inputJson.containsKey(alias)) {
+        return inputJson.get(alias);
+      }
+    }
+    return null;
+  }
+
   private static Schema getNonNull(Schema schema) {
     List<Schema> types = schema.getTypes();
     Schema.Type firstType = types.get(0).getType();
@@ -120,7 +172,7 @@ public class MercifulJsonConverter {
             || schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
   }
 
-  private static Object convertJsonToAvroField(Object value, String name, Schema schema) {
+  private static Object convertJsonToAvroField(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
 
     if (isOptional(schema)) {
       if (value == null) {
@@ -130,12 +182,12 @@ public class MercifulJsonConverter {
       }
     } else if (value == null) {
       // Always fail on null for non-nullable schemas
-      throw new HoodieJsonToAvroConversionException(null, name, schema);
+      throw new HoodieJsonToAvroConversionException(null, name, schema, shouldSanitize, invalidCharMask);
     }
 
     JsonToAvroFieldProcessor processor = FIELD_TYPE_PROCESSORS.get(schema.getType());
     if (null != processor) {
-      return processor.convertToAvro(value, name, schema);
+      return processor.convertToAvro(value, name, schema, shouldSanitize, invalidCharMask);
     }
     throw new IllegalArgumentException("JsonConverter cannot handle type: " + schema.getType());
   }
@@ -145,21 +197,21 @@ public class MercifulJsonConverter {
    */
   private abstract static class JsonToAvroFieldProcessor implements Serializable {
 
-    public Object convertToAvro(Object value, String name, Schema schema) {
-      Pair<Boolean, Object> res = convert(value, name, schema);
+    public Object convertToAvro(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
+      Pair<Boolean, Object> res = convert(value, name, schema, shouldSanitize, invalidCharMask);
       if (!res.getLeft()) {
-        throw new HoodieJsonToAvroConversionException(value, name, schema);
+        throw new HoodieJsonToAvroConversionException(value, name, schema, shouldSanitize, invalidCharMask);
       }
       return res.getRight();
     }
 
-    protected abstract Pair<Boolean, Object> convert(Object value, String name, Schema schema);
+    protected abstract Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask);
   }
 
   private static JsonToAvroFieldProcessor generateBooleanTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         if (value instanceof Boolean) {
           return Pair.of(true, value);
         }
@@ -171,7 +223,7 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateIntTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         if (value instanceof Number) {
           return Pair.of(true, ((Number) value).intValue());
         } else if (value instanceof String) {
@@ -185,7 +237,7 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateDoubleTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         if (value instanceof Number) {
           return Pair.of(true, ((Number) value).doubleValue());
         } else if (value instanceof String) {
@@ -199,7 +251,7 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateFloatTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         if (value instanceof Number) {
           return Pair.of(true, ((Number) value).floatValue());
         } else if (value instanceof String) {
@@ -213,7 +265,7 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateLongTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         if (value instanceof Number) {
           return Pair.of(true, ((Number) value).longValue());
         } else if (value instanceof String) {
@@ -227,7 +279,7 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateStringTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         return Pair.of(true, value.toString());
       }
     };
@@ -236,7 +288,7 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateBytesTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         // Should return ByteBuffer (see GenericData.isBytes())
         return Pair.of(true, ByteBuffer.wrap(value.toString().getBytes()));
       }
@@ -246,7 +298,7 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateFixedTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         // The ObjectMapper use List to represent FixedType
         // eg: "decimal_val": [0, 0, 14, -63, -52] will convert to ArrayList<Integer>
         List<Integer> converval = (List<Integer>) value;
@@ -264,12 +316,12 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateEnumTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         if (schema.getEnumSymbols().contains(value.toString())) {
           return Pair.of(true, new GenericData.EnumSymbol(schema, value.toString()));
         }
         throw new HoodieJsonToAvroConversionException(String.format("Symbol %s not in enum", value.toString()),
-            schema.getFullName(), schema);
+            schema.getFullName(), schema, shouldSanitize, invalidCharMask);
       }
     };
   }
@@ -277,9 +329,9 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateRecordTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         GenericRecord result = new GenericData.Record(schema);
-        return Pair.of(true, convertJsonToAvro((Map<String, Object>) value, schema));
+        return Pair.of(true, convertJsonToAvro((Map<String, Object>) value, schema, shouldSanitize, invalidCharMask));
       }
     };
   }
@@ -287,11 +339,11 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateArrayTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         Schema elementSchema = schema.getElementType();
         List<Object> listRes = new ArrayList<>();
         for (Object v : (List) value) {
-          listRes.add(convertJsonToAvroField(v, name, elementSchema));
+          listRes.add(convertJsonToAvroField(v, name, elementSchema, shouldSanitize, invalidCharMask));
         }
         return Pair.of(true, new GenericData.Array<>(schema, listRes));
       }
@@ -301,11 +353,11 @@ public class MercifulJsonConverter {
   private static JsonToAvroFieldProcessor generateMapTypeHandler() {
     return new JsonToAvroFieldProcessor() {
       @Override
-      public Pair<Boolean, Object> convert(Object value, String name, Schema schema) {
+      public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
         Schema valueSchema = schema.getValueType();
         Map<String, Object> mapRes = new HashMap<>();
         for (Map.Entry<String, Object> v : ((Map<String, Object>) value).entrySet()) {
-          mapRes.put(v.getKey(), convertJsonToAvroField(v.getValue(), name, valueSchema));
+          mapRes.put(v.getKey(), convertJsonToAvroField(v.getValue(), name, valueSchema, shouldSanitize, invalidCharMask));
         }
         return Pair.of(true, mapRes);
       }
@@ -321,14 +373,22 @@ public class MercifulJsonConverter {
     private String fieldName;
     private Schema schema;
 
-    public HoodieJsonToAvroConversionException(Object value, String fieldName, Schema schema) {
+    private boolean shouldSanitize;
+    private String invalidCharMask;
+
+    public HoodieJsonToAvroConversionException(Object value, String fieldName, Schema schema, boolean shouldSanitize, String invalidCharMask) {
       this.value = value;
       this.fieldName = fieldName;
       this.schema = schema;
+      this.shouldSanitize = shouldSanitize;
+      this.invalidCharMask = invalidCharMask;
     }
 
     @Override
     public String toString() {
+      if (shouldSanitize) {
+        return String.format("Json to Avro Type conversion error for field %s, %s for %s. Field sanitization is enabled with a mask of %s.", fieldName, value, schema, invalidCharMask);
+      }
       return String.format("Json to Avro Type conversion error for field %s, %s for %s", fieldName, value, schema);
     }
   }
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 896843b58f2..4b3c6a34985 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -42,6 +42,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
+import static org.apache.hudi.avro.HoodieAvroUtils.sanitizeName;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -426,4 +427,14 @@ public class TestHoodieAvroUtils {
     int days = HoodieAvroUtils.fromJavaDate(now);
     assertEquals(now.toLocalDate(), HoodieAvroUtils.toJavaDate(days).toLocalDate());
   }
+
+  @Test
+  public void testSanitizeName() {
+    assertEquals("__23456", sanitizeName("123456"));
+    assertEquals("abcdef", sanitizeName("abcdef"));
+    assertEquals("_1", sanitizeName("_1"));
+    assertEquals("a*bc", sanitizeName("a.bc", "*"));
+    assertEquals("abcdef___", sanitizeName("abcdef_."));
+    assertEquals("__ab__cd__", sanitizeName("1ab*cd?"));
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
new file mode 100644
index 00000000000..660890009fd
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestMercifulJsonConverter {
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final MercifulJsonConverter CONVERTER = new MercifulJsonConverter(true,"__");
+
+  @Test
+  public void basicConversion() throws IOException {
+    Schema simpleSchema = SchemaTestUtil.getSimpleSchema();
+    String name = "John Smith";
+    int number = 1337;
+    String color = "Blue. No yellow!";
+    Map<String, Object> data = new HashMap<>();
+    data.put("name", name);
+    data.put("favorite_number", number);
+    data.put("favorite_color", color);
+    String json = MAPPER.writeValueAsString(data);
+
+    GenericRecord rec = new GenericData.Record(simpleSchema);
+    rec.put("name", name);
+    rec.put("favorite_number", number);
+    rec.put("favorite_color", color);
+
+    Assertions.assertEquals(rec, CONVERTER.convert(json, simpleSchema));
+  }
+
+  @Test
+  public void conversionWithFieldNameSanitization() throws IOException {
+    String sanitizedSchemaString = "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"__name\", \"type\": \"string\"}, "
+        + "{\"name\": \"favorite__number\", \"type\": \"int\"}, {\"name\": \"favorite__color__\", \"type\": \"string\"}]}";
+    Schema sanitizedSchema = Schema.parse(sanitizedSchemaString);
+    String name = "John Smith";
+    int number = 1337;
+    String color = "Blue. No yellow!";
+    Map<String, Object> data = new HashMap<>();
+    data.put("$name", name);
+    data.put("favorite-number", number);
+    data.put("favorite.color!", color);
+    String json = MAPPER.writeValueAsString(data);
+
+    GenericRecord rec = new GenericData.Record(sanitizedSchema);
+    rec.put("__name", name);
+    rec.put("favorite__number", number);
+    rec.put("favorite__color__", color);
+
+    Assertions.assertEquals(rec, CONVERTER.convert(json, sanitizedSchema));
+  }
+
+  @Test
+  public void conversionWithFieldNameAliases() throws IOException {
+    String schemaStringWithAliases = "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\", \"aliases\": [\"$name\"]}, "
+        + "{\"name\": \"favorite_number\",  \"type\": \"int\", \"aliases\": [\"unused\", \"favorite-number\"]}, {\"name\": \"favorite_color\", \"type\": \"string\", \"aliases\": "
+        + "[\"favorite.color!\"]}, {\"name\": \"unmatched\", \"type\": \"string\", \"default\": \"default_value\"}]}";
+    Schema sanitizedSchema = Schema.parse(schemaStringWithAliases);
+    String name = "John Smith";
+    int number = 1337;
+    String color = "Blue. No yellow!";
+    Map<String, Object> data = new HashMap<>();
+    data.put("$name", name);
+    data.put("favorite-number", number);
+    data.put("favorite.color!", color);
+    String json = MAPPER.writeValueAsString(data);
+
+    GenericRecord rec = new GenericData.Record(sanitizedSchema);
+    rec.put("name", name);
+    rec.put("favorite_number", number);
+    rec.put("favorite_color", color);
+
+    Assertions.assertEquals(rec, CONVERTER.convert(json, sanitizedSchema));
+  }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index dc780ee9a5d..2c9c30ac465 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -298,7 +298,7 @@ public class DeltaSync implements Serializable, Closeable {
     }
     this.formatAdapter = new SourceFormatAdapter(
         UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, metrics),
-        this.errorTableWriter);
+        this.errorTableWriter, Option.of(props));
   }
 
   /**
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index fdc2af66d9f..afc9c3416fc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -20,6 +20,8 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.avro.MercifulJsonConverter;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.utilities.UtilHelpers;
@@ -29,6 +31,7 @@ import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
 import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.Source;
 import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
+import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
 
 import com.google.protobuf.Message;
 import org.apache.avro.Schema;
@@ -54,6 +57,8 @@ import scala.util.Either;
 import static org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
+import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES;
+import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
 
 /**
  * Adapts data-format provided by the source to the data-format required by the client (DeltaStreamer).
@@ -61,16 +66,55 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
 public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
+  private boolean shouldSanitize = SANITIZE_SCHEMA_FIELD_NAMES.defaultValue();
+  private String invalidCharMask = SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();
 
   private Option<BaseErrorTableWriter> errorTableWriter = Option.empty();
 
   public SourceFormatAdapter(Source source) {
-    this(source, Option.empty());
+    this(source, Option.empty(), Option.empty());
   }
 
-  public SourceFormatAdapter(Source source, Option<BaseErrorTableWriter> errorTableWriter) {
-    this.errorTableWriter = errorTableWriter;
+  public SourceFormatAdapter(Source source, Option<BaseErrorTableWriter> errorTableWriter, Option<TypedProperties> props) {
     this.source = source;
+    this.errorTableWriter = errorTableWriter;
+    if (props.isPresent()) {
+      this.shouldSanitize = SanitizationUtils.getShouldSanitize(props.get());
+      this.invalidCharMask = SanitizationUtils.getInvalidCharMask(props.get());
+    }
+    if (this.shouldSanitize && source.getSourceType() == Source.SourceType.PROTO) {
+      throw new IllegalArgumentException("PROTO cannot be sanitized");
+    }
+  }
+
+  /**
+   * Config that automatically sanitizes the field names as per avro naming rules.
+   * @return enabled status.
+   */
+  private boolean isFieldNameSanitizingEnabled() {
+    return shouldSanitize;
+  }
+
+  /**
+   * Replacement mask for invalid characters encountered in avro names.
+   * @return sanitized value.
+   */
+  private String getInvalidCharMask() {
+    return invalidCharMask;
+  }
+
+  /**
+   * Sanitize all columns including nested ones as per Avro conventions.
+   * @param srcBatch
+   * @return sanitized batch.
+   */
+  private InputBatch<Dataset<Row>> maybeSanitizeFieldNames(InputBatch<Dataset<Row>> srcBatch) {
+    if (!isFieldNameSanitizingEnabled() || !srcBatch.getBatch().isPresent()) {
+      return srcBatch;
+    }
+    Dataset<Row> srcDs = srcBatch.getBatch().get();
+    Dataset<Row> targetDs = SanitizationUtils.sanitizeColumnNamesForAvro(srcDs, getInvalidCharMask());
+    return new InputBatch<>(Option.ofNullable(targetDs), srcBatch.getCheckpointForNextBatch(), srcBatch.getSchemaProvider());
   }
 
   /**
@@ -79,7 +123,8 @@ public final class SourceFormatAdapter implements Closeable {
    * @return
    */
   private JavaRDD<GenericRecord> transformJsonToGenericRdd(InputBatch<JavaRDD<String>> inputBatch) {
-    AvroConvertor convertor = new AvroConvertor(inputBatch.getSchemaProvider().getSourceSchema());
+    MercifulJsonConverter.clearCache(inputBatch.getSchemaProvider().getSourceSchema().getFullName());
+    AvroConvertor convertor = new AvroConvertor(inputBatch.getSchemaProvider().getSourceSchema(), isFieldNameSanitizingEnabled(), getInvalidCharMask());
     return inputBatch.getBatch().map(rdd -> {
       if (errorTableWriter.isPresent()) {
         JavaRDD<Either<GenericRecord,String>> javaRDD = rdd.map(convertor::fromJsonWithError);
@@ -118,14 +163,17 @@ public final class SourceFormatAdapter implements Closeable {
   public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
       case AVRO:
+        //don't need to sanitize because it's already avro
         return ((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
       case JSON: {
+        //sanitizing is done inside the convertor in transformJsonToGenericRdd if enabled
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
         JavaRDD<GenericRecord> eventsRdd = transformJsonToGenericRdd(r);
         return new InputBatch<>(Option.ofNullable(eventsRdd),r.getCheckpointForNextBatch(), r.getSchemaProvider());
       }
       case ROW: {
-        InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
+        //we do the sanitizing here if enabled
+        InputBatch<Dataset<Row>> r = maybeSanitizeFieldNames(((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit));
         return new InputBatch<>(Option.ofNullable(r.getBatch().map(
             rdd -> {
                 SchemaProvider originalProvider = UtilHelpers.getOriginalSchemaProvider(r.getSchemaProvider());
@@ -141,6 +189,7 @@ public final class SourceFormatAdapter implements Closeable {
             .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider());
       }
       case PROTO: {
+        //TODO([HUDI-5830]) implement field name sanitization
         InputBatch<JavaRDD<Message>> r = ((Source<JavaRDD<Message>>) source).fetchNext(lastCkptStr, sourceLimit);
         AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
         return new InputBatch<>(Option.ofNullable(r.getBatch().map(rdd -> rdd.map(convertor::fromProtoMessage)).orElse(null)),
@@ -151,30 +200,42 @@ public final class SourceFormatAdapter implements Closeable {
     }
   }
 
+  private InputBatch<Dataset<Row>> avroDataInRowFormat(InputBatch<JavaRDD<GenericRecord>> r) {
+    Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
+    return new InputBatch<>(
+        Option
+            .ofNullable(
+                r.getBatch()
+                    .map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
+                        source.getSparkSession())
+                    )
+                    .orElse(null)),
+        r.getCheckpointForNextBatch(), r.getSchemaProvider());
+  }
+
   /**
    * Fetch new data in row format. If the source provides data in different format, they are translated to Row format
    */
   public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
       case ROW:
-        InputBatch<Dataset<Row>> datasetInputBatch = ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
+        //we do the sanitizing here if enabled
+        InputBatch<Dataset<Row>> datasetInputBatch = maybeSanitizeFieldNames(((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit));
         return new InputBatch<>(processErrorEvents(datasetInputBatch.getBatch(),
             ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE),
             datasetInputBatch.getCheckpointForNextBatch(), datasetInputBatch.getSchemaProvider());
       case AVRO: {
+        //don't need to sanitize because it's already avro
         InputBatch<JavaRDD<GenericRecord>> r = ((Source<JavaRDD<GenericRecord>>) source).fetchNext(lastCkptStr, sourceLimit);
-        Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-        return new InputBatch<>(
-            Option
-                .ofNullable(
-                    r.getBatch()
-                        .map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
-                            source.getSparkSession())
-                        )
-                        .orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        return avroDataInRowFormat(r);
       }
       case JSON: {
+        if (isFieldNameSanitizingEnabled()) {
+          //leverage the json -> avro sanitizing. TODO([HUDI-5829]) Optimize by sanitizing during direct conversion
+          InputBatch<JavaRDD<GenericRecord>> r = fetchNewDataInAvroFormat(lastCkptStr, sourceLimit);
+          return avroDataInRowFormat(r);
+
+        }
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
         if (errorTableWriter.isPresent()) {
@@ -200,6 +261,7 @@ public final class SourceFormatAdapter implements Closeable {
         }
       }
       case PROTO: {
+        //TODO([HUDI-5830]) implement field name sanitization
         InputBatch<JavaRDD<Message>> r = ((Source<JavaRDD<Message>>) source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
         AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index 7542755b249..fe8ed3432d7 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -21,9 +21,12 @@ package org.apache.hudi.utilities.schema;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
 
 import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -54,15 +57,12 @@ public class FilebasedSchemaProvider extends SchemaProvider {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
     String sourceFile = props.getString(Config.SOURCE_SCHEMA_FILE_PROP);
+    boolean shouldSanitize = SanitizationUtils.getShouldSanitize(props);
+    String invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
     this.fs = FSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), true);
-    try {
-      this.sourceSchema = new Schema.Parser().parse(this.fs.open(new Path(sourceFile)));
-      if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
-        this.targetSchema =
-            new Schema.Parser().parse(fs.open(new Path(props.getString(Config.TARGET_SCHEMA_FILE_PROP))));
-      }
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error reading schema", ioe);
+    this.sourceSchema = readAvroSchemaFromFile(sourceFile, this.fs, shouldSanitize, invalidCharMask);
+    if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
+      this.targetSchema = readAvroSchemaFromFile(props.getString(Config.TARGET_SCHEMA_FILE_PROP), this.fs, shouldSanitize, invalidCharMask);
     }
   }
 
@@ -79,4 +79,14 @@ public class FilebasedSchemaProvider extends SchemaProvider {
       return super.getTargetSchema();
     }
   }
+
+  private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem fs, boolean sanitizeSchema, String invalidCharMask) {
+    String schemaStr;
+    try (FSDataInputStream in = fs.open(new Path(schemaPath))) {
+      schemaStr = FileIOUtils.readAsUTFString(in);
+    } catch (IOException ioe) {
+      throw new HoodieIOException(String.format("Error reading schema from file %s", schemaPath), ioe);
+    }
+    return SanitizationUtils.parseAvroSchema(schemaStr, sanitizeSchema, invalidCharMask);
+  }
 }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index 3906d0ce7cc..eeb08a7fd75 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -38,6 +38,9 @@ import scala.util.Either;
 import scala.util.Left;
 import scala.util.Right;
 
+import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES;
+import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+
 /**
  * Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy fields to circumvent issues around
  * serializing these objects from driver to executors
@@ -51,6 +54,8 @@ public class AvroConvertor implements Serializable {
   private transient Schema schema;
 
   private final String schemaStr;
+  private final String invalidCharMask;
+  private final boolean shouldSanitize;
 
   /**
    * To be lazily inited on executors.
@@ -64,12 +69,24 @@ public class AvroConvertor implements Serializable {
   private transient Injection<GenericRecord, byte[]> recordInjection;
 
   public AvroConvertor(String schemaStr) {
+    this(schemaStr, SANITIZE_SCHEMA_FIELD_NAMES.defaultValue(), SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue());
+  }
+
+  public AvroConvertor(String schemaStr, boolean shouldSanitize, String invalidCharMask) {
     this.schemaStr = schemaStr;
+    this.shouldSanitize = shouldSanitize;
+    this.invalidCharMask = invalidCharMask;
   }
 
   public AvroConvertor(Schema schema) {
+    this(schema, SANITIZE_SCHEMA_FIELD_NAMES.defaultValue(), SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue());
+  }
+
+  public AvroConvertor(Schema schema, boolean shouldSanitize, String invalidCharMask) {
     this.schemaStr = schema.toString();
     this.schema = schema;
+    this.shouldSanitize = shouldSanitize;
+    this.invalidCharMask = invalidCharMask;
   }
 
   private void initSchema() {
@@ -87,7 +104,7 @@ public class AvroConvertor implements Serializable {
 
   private void initJsonConvertor() {
     if (jsonConverter == null) {
-      jsonConverter = new MercifulJsonConverter();
+      jsonConverter = new MercifulJsonConverter(this.shouldSanitize, this.invalidCharMask);
     }
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
new file mode 100644
index 00000000000..9eeee8f5c92
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Avro schemas have restrictions (https://avro.apache.org/docs/current/spec.html#names) on field names
+ * that other schema formats do not have. This class provides utilities to help with sanitizing
+ */
+public class SanitizationUtils {
+
+  private static final ObjectMapper OM = new ObjectMapper();
+
+  public static class Config {
+
+    public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.schema.field.names")
+        .defaultValue(false)
+        .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+            + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+            + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+    public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+        .defaultValue("__")
+        .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+  }
+
+  private static final String AVRO_FIELD_NAME_KEY = "name";
+
+  public static boolean getShouldSanitize(TypedProperties props) {
+    return props.getBoolean(Config.SANITIZE_SCHEMA_FIELD_NAMES.key(),Config.SANITIZE_SCHEMA_FIELD_NAMES.defaultValue());
+  }
+
+  public static String getInvalidCharMask(TypedProperties props) {
+    return props.getString(Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(),Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue());
+  }
+
+  private static DataType sanitizeDataTypeForAvro(DataType dataType, String invalidCharMask) {
+    if (dataType instanceof ArrayType) {
+      ArrayType arrayType = (ArrayType) dataType;
+      DataType sanitizedDataType = sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+      return new ArrayType(sanitizedDataType, arrayType.containsNull());
+    } else if (dataType instanceof MapType) {
+      MapType mapType = (MapType) dataType;
+      DataType sanitizedKeyDataType = sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+      DataType sanitizedValueDataType = sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+      return new MapType(sanitizedKeyDataType, sanitizedValueDataType, mapType.valueContainsNull());
+    } else if (dataType instanceof StructType) {
+      return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+    }
+    return dataType;
+  }
+
+  // TODO(HUDI-5256): Refactor this to use InternalSchema when it is ready.
+  private static StructType sanitizeStructTypeForAvro(StructType structType, String invalidCharMask) {
+    StructType sanitizedStructType = new StructType();
+    StructField[] structFields = structType.fields();
+    for (StructField s : structFields) {
+      DataType currFieldDataTypeSanitized = sanitizeDataTypeForAvro(s.dataType(), invalidCharMask);
+      StructField structFieldCopy = new StructField(HoodieAvroUtils.sanitizeName(s.name(), invalidCharMask),
+          currFieldDataTypeSanitized, s.nullable(), s.metadata());
+      sanitizedStructType = sanitizedStructType.add(structFieldCopy);
+    }
+    return sanitizedStructType;
+  }
+
+  public static Dataset<Row> sanitizeColumnNamesForAvro(Dataset<Row> inputDataset, String invalidCharMask) {
+    StructField[] inputFields = inputDataset.schema().fields();
+    Dataset<Row> targetDataset = inputDataset;
+    for (StructField sf : inputFields) {
+      DataType sanitizedFieldDataType = sanitizeDataTypeForAvro(sf.dataType(), invalidCharMask);
+      if (!sanitizedFieldDataType.equals(sf.dataType())) {
+        // Sanitizing column names for nested types can be thought of as going from one schema to another
+        // which are structurally similar except for actual column names itself. So casting is safe and sufficient.
+        targetDataset = targetDataset.withColumn(sf.name(), targetDataset.col(sf.name()).cast(sanitizedFieldDataType));
+      }
+      String possibleRename = HoodieAvroUtils.sanitizeName(sf.name(), invalidCharMask);
+      if (!sf.name().equals(possibleRename)) {
+        targetDataset = targetDataset.withColumnRenamed(sf.name(), possibleRename);
+      }
+    }
+    return targetDataset;
+  }
+
+  /*
+   * We first rely on Avro to parse and then try to rename only for those failed.
+   * This way we can improve our parsing capabilities without breaking existing functionality.
+   * For example, we don't yet support multiple named schemas defined in a file.
+   */
+  public static Schema parseAvroSchema(String schemaStr, boolean shouldSanitize, String invalidCharMask) {
+    try {
+      return new Schema.Parser().parse(schemaStr);
+    } catch (SchemaParseException spe) {
+      // if sanitizing is not enabled rethrow the exception.
+      if (!shouldSanitize) {
+        throw spe;
+      }
+      // Rename avro fields and try parsing once again.
+      Option<Schema> parseResult = parseSanitizedAvroSchemaNoThrow(schemaStr, invalidCharMask);
+      if (!parseResult.isPresent()) {
+        // throw original exception.
+        throw spe;
+      }
+      return parseResult.get();
+    }
+  }
+
+  /**
+   * Parse list for sanitizing
+   * @param src - deserialized schema
+   * @param invalidCharMask - mask to replace invalid characters with
+   */
+  private static List<Object> transformList(List<Object> src, String invalidCharMask) {
+    return src.stream().map(obj -> {
+      if (obj instanceof List) {
+        return transformList((List<Object>) obj, invalidCharMask);
+      } else if (obj instanceof Map) {
+        return transformMap((Map<String, Object>) obj, invalidCharMask);
+      } else {
+        return obj;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  /**
+   * Parse map for sanitizing. If we have a string in the map, and it is an avro field name key, then we sanitize the name.
+   * Otherwise, we keep recursively going through the schema.
+   * @param src - deserialized schema
+   * @param invalidCharMask - mask to replace invalid characters with
+   */
+  private static Map<String, Object> transformMap(Map<String, Object> src, String invalidCharMask) {
+    return src.entrySet().stream()
+        .map(kv -> {
+          if (kv.getValue() instanceof List) {
+            return Pair.of(kv.getKey(), transformList((List<Object>) kv.getValue(), invalidCharMask));
+          } else if (kv.getValue() instanceof Map) {
+            return Pair.of(kv.getKey(), transformMap((Map<String, Object>) kv.getValue(), invalidCharMask));
+          } else if (kv.getValue() instanceof String) {
+            String currentStrValue = (String) kv.getValue();
+            if (kv.getKey().equals(AVRO_FIELD_NAME_KEY)) {
+              return Pair.of(kv.getKey(), HoodieAvroUtils.sanitizeName(currentStrValue, invalidCharMask));
+            }
+            return Pair.of(kv.getKey(), currentStrValue);
+          } else {
+            return Pair.of(kv.getKey(), kv.getValue());
+          }
+        }).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+  }
+
+  /**
+   * Sanitizes illegal field names in the schema using recursive calls to transformMap and transformList
+   */
+  private static Option<Schema> parseSanitizedAvroSchemaNoThrow(String schemaStr, String invalidCharMask) {
+    try {
+      OM.enable(JsonParser.Feature.ALLOW_COMMENTS);
+      Map<String, Object> objMap = OM.readValue(schemaStr, Map.class);
+      Map<String, Object> modifiedMap = transformMap(objMap, invalidCharMask);
+      return Option.of(new Schema.Parser().parse(OM.writeValueAsString(modifiedMap)));
+    } catch (Exception ex) {
+      return Option.empty();
+    }
+  }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
new file mode 100644
index 00000000000..caa54c01ebf
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.Source;
+import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
+import org.apache.hudi.utilities.testutils.SanitizationTestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.avro.SchemaConverters;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestSourceFormatAdapter {
+
+  protected static SparkSession spark;
+  protected static JavaSparkContext jsc;
+  private static final String DUMMY_CHECKPOINT = "dummy_checkpoint";
+  private TestRowDataSource testRowDataSource;
+  private TestJsonDataSource testJsonDataSource;
+
+  @BeforeAll
+  public static void start() {
+    spark = SparkSession
+        .builder()
+        .master("local[*]")
+        .appName(TestSourceFormatAdapter.class.getName())
+        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+        .getOrCreate();
+    jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+  }
+
+  @AfterAll
+  public static void shutdown() {
+    jsc.close();
+    spark.close();
+  }
+
+  // Forces to initialize object before every test.
+  @AfterEach
+  public void teardown() {
+    testRowDataSource = null;
+    testJsonDataSource = null;
+  }
+
+  private void setupRowSource(Dataset<Row> ds) {
+    SchemaProvider nullSchemaProvider = new InputBatch.NullSchemaProvider();
+    InputBatch<Dataset<Row>> batch = new InputBatch<>(Option.of(ds), DUMMY_CHECKPOINT, nullSchemaProvider);
+    testRowDataSource = new TestRowDataSource(new TypedProperties(), jsc, spark, nullSchemaProvider, batch);
+  }
+
+  private void setupJsonSource(JavaRDD<String> ds, Schema schema) {
+    SchemaProvider basicSchemaProvider = new BasicSchemaProvider(schema);
+    InputBatch<JavaRDD<String>> batch = new InputBatch<>(Option.of(ds), DUMMY_CHECKPOINT, basicSchemaProvider);
+    testJsonDataSource = new TestJsonDataSource(new TypedProperties(), jsc, spark, basicSchemaProvider, batch);
+  }
+
+  private InputBatch<Dataset<Row>> fetchRowData(JavaRDD<String> rdd, StructType unsanitizedSchema) {
+    TypedProperties typedProperties = new TypedProperties();
+    typedProperties.put(SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
+    typedProperties.put(SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
+    setupRowSource(spark.read().schema(unsanitizedSchema).json(rdd));
+    SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(testRowDataSource, Option.empty(), Option.of(typedProperties));
+    return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(DUMMY_CHECKPOINT), 10L);
+  }
+
+  private InputBatch<Dataset<Row>> fetchJsonData(JavaRDD<String> rdd, StructType sanitizedSchema) {
+    TypedProperties typedProperties = new TypedProperties();
+    typedProperties.put(SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
+    typedProperties.put(SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
+    setupJsonSource(rdd, SchemaConverters.toAvroType(sanitizedSchema, false, "record", ""));
+    SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(testJsonDataSource, Option.empty(), Option.of(typedProperties));
+    return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(DUMMY_CHECKPOINT), 10L);
+  }
+
+  private void verifySanitization(InputBatch<Dataset<Row>> inputBatch, String sanitizedDataFile, StructType sanitizedSchema) {
+    JavaRDD<String> expectedRDD = jsc.textFile(sanitizedDataFile);
+    assertTrue(inputBatch.getBatch().isPresent());
+    Dataset<Row> ds = inputBatch.getBatch().get();
+    assertEquals(2, ds.collectAsList().size());
+    assertEquals(sanitizedSchema, ds.schema());
+    assertEquals(expectedRDD.collect(), ds.toJSON().collectAsList());
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideDataFiles")
+  public void testRowSanitization(String unsanitizedDataFile, String sanitizedDataFile, StructType unsanitizedSchema, StructType sanitizedSchema) {
+    JavaRDD<String> unsanitizedRDD = jsc.textFile(unsanitizedDataFile);
+    verifySanitization(fetchRowData(unsanitizedRDD, unsanitizedSchema), sanitizedDataFile, sanitizedSchema);
+
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideDataFiles")
+  public void testJsonSanitization(String unsanitizedDataFile, String sanitizedDataFile, StructType unsanitizedSchema, StructType sanitizedSchema) {
+    JavaRDD<String> unsanitizedRDD = jsc.textFile(unsanitizedDataFile);
+    verifySanitization(fetchJsonData(unsanitizedRDD, sanitizedSchema), sanitizedDataFile, sanitizedSchema);
+  }
+
+  public static class TestRowDataSource extends Source<Dataset<Row>> {
+    private final InputBatch<Dataset<Row>> batch;
+
+    public TestRowDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                             SchemaProvider schemaProvider, InputBatch<Dataset<Row>> batch) {
+      super(props, sparkContext, sparkSession, schemaProvider, SourceType.ROW);
+      this.batch = batch;
+    }
+
+    @Override
+    protected InputBatch<Dataset<Row>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
+      return batch;
+    }
+  }
+
+  public static class TestJsonDataSource extends Source<JavaRDD<String>> {
+    private final InputBatch<JavaRDD<String>> batch;
+
+    public TestJsonDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
+                             SchemaProvider schemaProvider, InputBatch<JavaRDD<String>> batch) {
+      super(props, sparkContext, sparkSession, schemaProvider, SourceType.JSON);
+      this.batch = batch;
+    }
+
+    @Override
+    protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
+      return batch;
+    }
+  }
+
+  public static class BasicSchemaProvider extends SchemaProvider {
+
+    private final Schema schema;
+
+    public BasicSchemaProvider(Schema schema) {
+      this(null, null, schema);
+    }
+
+    public BasicSchemaProvider(TypedProperties props, JavaSparkContext jssc, Schema schema) {
+      super(props, jssc);
+      this.schema = schema;
+    }
+
+    @Override
+    public Schema getSourceSchema() {
+      return schema;
+    }
+  }
+
+  private static Stream<Arguments> provideDataFiles() {
+    return SanitizationTestUtils.provideDataFiles();
+  }
+
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
new file mode 100644
index 00000000000..fe6f0ff9f2f
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.SchemaParseException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES;
+import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateProperFormattedSchema;
+import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateRenamedSchemaWithConfiguredReplacement;
+import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateRenamedSchemaWithDefaultReplacement;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Unit tests for {@link FilebasedSchemaProvider}.
+ */
+public class TestFilebasedSchemaProvider extends UtilitiesTestBase {
+
+  private FilebasedSchemaProvider schemaProvider;
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initTestServices(false, false, false);
+  }
+
+  @AfterAll
+  public static void cleanUpUtilitiesTestServices() {
+    UtilitiesTestBase.cleanUpUtilitiesTestServices();
+  }
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+  }
+
+  @Test
+  public void properlyFormattedNestedSchemaTest() throws IOException {
+    this.schemaProvider = new FilebasedSchemaProvider(
+        Helpers.setupSchemaOnDFS("delta-streamer-config", "file_schema_provider_valid.avsc"), jsc);
+    assertEquals(this.schemaProvider.getSourceSchema(), generateProperFormattedSchema());
+  }
+
+  @Test
+  public void renameBadlyFormattedSchemaTest() throws IOException {
+    TypedProperties props = Helpers.setupSchemaOnDFS("delta-streamer-config", "file_schema_provider_invalid.avsc");
+    props.put(SANITIZE_SCHEMA_FIELD_NAMES.key(), "true");
+    this.schemaProvider = new FilebasedSchemaProvider(props, jsc);
+    assertEquals(this.schemaProvider.getSourceSchema(), generateRenamedSchemaWithDefaultReplacement());
+  }
+
+  @Test
+  public void renameBadlyFormattedSchemaWithProperyDisabledTest() {
+    assertThrows(SchemaParseException.class, () -> {
+      new FilebasedSchemaProvider(
+          Helpers.setupSchemaOnDFS("delta-streamer-config", "file_schema_provider_invalid.avsc"), jsc);
+    });
+  }
+
+  @Test
+  public void renameBadlyFormattedSchemaWithAltCharMaskConfiguredTest() throws IOException {
+    TypedProperties props = Helpers.setupSchemaOnDFS("delta-streamer-config", "file_schema_provider_invalid.avsc");
+    props.put(SANITIZE_SCHEMA_FIELD_NAMES.key(), "true");
+    props.put(SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "_");
+    this.schemaProvider = new FilebasedSchemaProvider(props, jsc);
+    assertEquals(this.schemaProvider.getSourceSchema(), generateRenamedSchemaWithConfiguredReplacement());
+  }
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 6e2f4d6e73b..f05107f2c25 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -245,7 +245,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
     props.put("hoodie.base.path","/tmp/json_kafka_row_events");
     Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
     Option<BaseErrorTableWriter> errorTableWriter = Option.of(getAnonymousErrorTableWriter(props));
-    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource, errorTableWriter);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource, errorTableWriter, Option.of(props));
     assertEquals(1000, kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE).getBatch().get().count());
     assertEquals(2,((JavaRDD)errorTableWriter.get().getErrorEvents(
         HoodieActiveTimeline.createNewInstantTime(), Option.empty()).get()).count());
@@ -276,7 +276,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
 
     Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
     Option<BaseErrorTableWriter> errorTableWriter = Option.of(getAnonymousErrorTableWriter(props));
-    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource, errorTableWriter);
+    SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource, errorTableWriter, Option.of(props));
     InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(),Long.MAX_VALUE);
     assertEquals(1000,fetch1.getBatch().get().count());
     assertEquals(2, ((JavaRDD)errorTableWriter.get().getErrorEvents(
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSanitizationUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSanitizationUtils.java
new file mode 100644
index 00000000000..8ebfc21c7e8
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestSanitizationUtils.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.deltastreamer.TestSourceFormatAdapter;
+import org.apache.hudi.utilities.testutils.SanitizationTestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateProperFormattedSchema;
+import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateRenamedSchemaWithConfiguredReplacement;
+import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateRenamedSchemaWithDefaultReplacement;
+import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.invalidCharMask;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestSanitizationUtils {
+
+  protected static SparkSession spark;
+  protected static JavaSparkContext jsc;
+
+  @BeforeAll
+  public static void start() {
+    spark = SparkSession
+        .builder()
+        .master("local[*]")
+        .appName(TestSourceFormatAdapter.class.getName())
+        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+        .getOrCreate();
+    jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
+  }
+
+  @AfterAll
+  public static void shutdown() {
+    jsc.close();
+    spark.close();
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideDataFiles")
+  public void testSanitizeDataset(String unsanitizedDataFile, String sanitizedDataFile, StructType unsanitizedSchema, StructType sanitizedSchema) {
+    Dataset<Row> expectedSantitizedDataset = spark.read().schema(sanitizedSchema).format("json").load(sanitizedDataFile);
+    Dataset<Row> unsanitizedDataset = spark.read().schema(unsanitizedSchema).format("json").load(unsanitizedDataFile);
+    Dataset<Row> sanitizedDataset = SanitizationUtils.sanitizeColumnNamesForAvro(unsanitizedDataset,invalidCharMask);
+    assertEquals(unsanitizedDataset.count(), sanitizedDataset.count());
+    assertEquals(expectedSantitizedDataset.schema(), sanitizedDataset.schema());
+    assertEquals(expectedSantitizedDataset.collectAsList(), sanitizedDataset.collectAsList());
+  }
+
+  private void testSanitizeSchema(String unsanitizedSchema, Schema expectedSanitizedSchema) {
+    testSanitizeSchema(unsanitizedSchema, expectedSanitizedSchema, true);
+  }
+
+  private void testSanitizeSchema(String unsanitizedSchema, Schema expectedSanitizedSchema, boolean shouldSanitize) {
+    testSanitizeSchema(unsanitizedSchema, expectedSanitizedSchema, shouldSanitize, invalidCharMask);
+  }
+
+  private void testSanitizeSchema(String unsanitizedSchema, Schema expectedSanitizedSchema, boolean shouldSanitize, String charMask) {
+    Schema sanitizedSchema = SanitizationUtils.parseAvroSchema(unsanitizedSchema, shouldSanitize, charMask);
+    assertEquals(sanitizedSchema, expectedSanitizedSchema);
+  }
+
+  @Test
+  public void testGoodAvroSchema() {
+    String goodJson = getJson("src/test/resources/delta-streamer-config/file_schema_provider_valid.avsc");
+    testSanitizeSchema(goodJson,generateProperFormattedSchema());
+  }
+
+  @Test
+  public void testBadAvroSchema() {
+    String badJson = getJson("src/test/resources/delta-streamer-config/file_schema_provider_invalid.avsc");
+    testSanitizeSchema(badJson,generateRenamedSchemaWithDefaultReplacement());
+  }
+
+  @Test
+  public void testBadAvroSchemaAltCharMask() {
+    String badJson = getJson("src/test/resources/delta-streamer-config/file_schema_provider_invalid.avsc");
+    testSanitizeSchema(badJson,generateRenamedSchemaWithConfiguredReplacement(),true, "_");
+  }
+
+  @Test
+  public void testBadAvroSchemaDisabledTest() {
+    String badJson = getJson("src/test/resources/delta-streamer-config/file_schema_provider_invalid.avsc");
+    assertThrows(SchemaParseException.class, () -> testSanitizeSchema(badJson,generateRenamedSchemaWithDefaultReplacement(), false));
+  }
+
+  @Test
+  private String getJson(String path) {
+    FileSystem fs = FSUtils.getFs(path, jsc.hadoopConfiguration(), true);
+    String schemaStr;
+    try (FSDataInputStream in = fs.open(new Path(path))) {
+      schemaStr = FileIOUtils.readAsUTFString(in);
+    } catch (IOException e) {
+      throw new HoodieIOException("can't read schema file", e);
+    }
+    return schemaStr;
+  }
+
+  private static Stream<Arguments> provideDataFiles() {
+    return SanitizationTestUtils.provideDataFiles();
+  }
+
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
new file mode 100644
index 00000000000..d54b0b54e67
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.testutils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.params.provider.Arguments;
+
+import java.util.stream.Stream;
+
+import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+
+public class SanitizationTestUtils {
+  public static String invalidCharMask = SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();
+
+  private static String sanitizeIfNeeded(String src, boolean shouldSanitize) {
+    return shouldSanitize ? HoodieAvroUtils.sanitizeName(src, invalidCharMask) : src;
+  }
+
+  protected static StructType getSchemaWithProperNaming() {
+    StructType addressStruct = new StructType(new StructField[] {
+        new StructField("state", DataTypes.StringType, true, Metadata.empty()),
+        new StructField("street", DataTypes.StringType, true, Metadata.empty()),
+        new StructField("zip", DataTypes.LongType, true, Metadata.empty()),
+    });
+
+    StructType personStruct = new StructType(new StructField[] {
+        new StructField("address", addressStruct, true, Metadata.empty()),
+        new StructField("name", DataTypes.StringType, true, Metadata.empty()),
+        new StructField("occupation", DataTypes.StringType, true, Metadata.empty()),
+        new StructField("place", DataTypes.StringType, true, Metadata.empty())
+    });
+    return personStruct;
+  }
+
+  protected static StructType getSchemaWithBadAvroNamingForStructType(boolean shouldSanitize) {
+    StructType addressStruct = new StructType(new StructField[] {
+        new StructField(sanitizeIfNeeded("@state.", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("@@stree@t@", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("8@_zip", shouldSanitize),
+            DataTypes.LongType, true, Metadata.empty())
+    });
+
+    StructType personStruct = new StructType(new StructField[] {
+        new StructField(sanitizeIfNeeded("@_addr*$ess", shouldSanitize),
+            addressStruct, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("9name", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("_occu9pation", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("@plac.e.", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty())
+    });
+    return personStruct;
+  }
+
+  protected static StructType getSchemaWithBadAvroNamingForArrayType(boolean shouldSanitize) {
+    StructType addressStruct = new StructType(new StructField[] {
+        new StructField(sanitizeIfNeeded("@state.", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("@@stree@t@", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("8@_zip", shouldSanitize),
+            DataTypes.LongType, true, Metadata.empty())
+    });
+
+    StructType personStruct = new StructType(new StructField[] {
+        new StructField(sanitizeIfNeeded("@name", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("@arr@", shouldSanitize),
+            new ArrayType(addressStruct, true), true, Metadata.empty())
+    });
+    return personStruct;
+  }
+
+  protected static StructType getSchemaWithBadAvroNamingForMapType(boolean shouldSanitize) {
+    StructType addressStruct = new StructType(new StructField[] {
+        new StructField(sanitizeIfNeeded("@state.", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("@@stree@t@", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("8@_zip", shouldSanitize),
+            DataTypes.LongType, true, Metadata.empty())
+    });
+
+    StructType personStruct = new StructType(new StructField[] {
+        new StructField(sanitizeIfNeeded("@name", shouldSanitize),
+            DataTypes.StringType, true, Metadata.empty()),
+        new StructField(sanitizeIfNeeded("@map9", shouldSanitize),
+            new MapType(DataTypes.StringType, addressStruct, true), true, Metadata.empty()),
+    });
+    return personStruct;
+  }
+
+  public static Schema generateProperFormattedSchema() {
+    Schema addressSchema = SchemaBuilder.record("Address").fields()
+        .requiredString("streetaddress")
+        .requiredString("city")
+        .endRecord();
+    Schema personSchema = SchemaBuilder.record("Person").fields()
+        .requiredString("firstname")
+        .requiredString("lastname")
+        .name("address").type(addressSchema).noDefault()
+        .endRecord();
+    return personSchema;
+  }
+
+  public static Schema generateRenamedSchemaWithDefaultReplacement() {
+    Schema addressSchema = SchemaBuilder.record("__Address").fields()
+        .nullableString("__stree9add__ress", "@@@any_address")
+        .requiredString("cit__y__")
+        .endRecord();
+    Schema personSchema = SchemaBuilder.record("Person").fields()
+        .requiredString("__firstname")
+        .requiredString("__lastname")
+        .name("address").type(addressSchema).noDefault()
+        .endRecord();
+    return personSchema;
+  }
+
+  public static Schema generateRenamedSchemaWithConfiguredReplacement() {
+    Schema addressSchema = SchemaBuilder.record("_Address").fields()
+        .nullableString("_stree9add_ress", "@@@any_address")
+        .requiredString("cit_y_")
+        .endRecord();
+    Schema personSchema = SchemaBuilder.record("Person").fields()
+        .requiredString("_firstname")
+        .requiredString("_lastname")
+        .name("address").type(addressSchema).noDefault()
+        .endRecord();
+    return personSchema;
+  }
+
+  public static Stream<Arguments> provideDataFiles() {
+    return Stream.of(
+        Arguments.of("src/test/resources/data/avro_sanitization.json", "src/test/resources/data/avro_sanitization.json",
+            getSchemaWithProperNaming(), getSchemaWithProperNaming()),
+        Arguments.of("src/test/resources/data/avro_sanitization_bad_naming_in.json", "src/test/resources/data/avro_sanitization_bad_naming_out.json",
+            getSchemaWithBadAvroNamingForStructType(false), getSchemaWithBadAvroNamingForStructType(true)),
+        Arguments.of("src/test/resources/data/avro_sanitization_bad_naming_nested_array_in.json", "src/test/resources/data/avro_sanitization_bad_naming_nested_array_out.json",
+            getSchemaWithBadAvroNamingForArrayType(false), getSchemaWithBadAvroNamingForArrayType(true)),
+        Arguments.of("src/test/resources/data/avro_sanitization_bad_naming_nested_map_in.json", "src/test/resources/data/avro_sanitization_bad_naming_nested_map_out.json",
+            getSchemaWithBadAvroNamingForMapType(false), getSchemaWithBadAvroNamingForMapType(true))
+    );
+  }
+}
diff --git a/hudi-utilities/src/test/resources/data/avro_sanitization.json b/hudi-utilities/src/test/resources/data/avro_sanitization.json
new file mode 100644
index 00000000000..9b56f3acc6d
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/avro_sanitization.json
@@ -0,0 +1,2 @@
+{"address":{"state":"some state","street":"some street","zip":123},"name":"random name","occupation":"some occupation","place":"some place"}
+{"address":{"state":"some state","street":"some street","zip":123},"name":"random name","occupation":"some occupation","place":"some place"}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_in.json b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_in.json
new file mode 100644
index 00000000000..344de2d2371
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_in.json
@@ -0,0 +1,2 @@
+{"@_addr*$ess":{"@state.":"some state","@@stree@t@":"some street","8@_zip":123},"9name":"random name","_occu9pation":"some occupation","@plac.e.":"some place"}
+{"@_addr*$ess":{"@state.":"some state","@@stree@t@":"some street","8@_zip":123},"9name":"random name","_occu9pation":"some occupation","@plac.e.":"some place"}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_array_in.json b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_array_in.json
new file mode 100644
index 00000000000..141eb48a255
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_array_in.json
@@ -0,0 +1,2 @@
+{"@name":"somename","@arr@":[{"@state.":"some state","@@stree@t@":"some street","8@_zip":123},{"@state.":"some state","@@stree@t@":"some street","8@_zip":123},{"@state.":"some state","@@stree@t@":"some street","8@_zip":123}]}
+{"@name":"somename","@arr@":[{"@state.":"some state","@@stree@t@":"some street","8@_zip":123},{"@state.":"some state","@@stree@t@":"some street","8@_zip":123},{"@state.":"some state","@@stree@t@":"some street","8@_zip":123}]}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_array_out.json b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_array_out.json
new file mode 100644
index 00000000000..ea9f08beb4c
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_array_out.json
@@ -0,0 +1,2 @@
+{"__name":"somename","__arr__":[{"__state__":"some state","____stree__t__":"some street","_____zip":123},{"__state__":"some state","____stree__t__":"some street","_____zip":123},{"__state__":"some state","____stree__t__":"some street","_____zip":123}]}
+{"__name":"somename","__arr__":[{"__state__":"some state","____stree__t__":"some street","_____zip":123},{"__state__":"some state","____stree__t__":"some street","_____zip":123},{"__state__":"some state","____stree__t__":"some street","_____zip":123}]}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_map_in.json b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_map_in.json
new file mode 100644
index 00000000000..75702ea8694
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_map_in.json
@@ -0,0 +1,2 @@
+{"@name":"somename","@map9":{"address1":{"@state.":"some state","@@stree@t@":"some street","8@_zip":123}}}
+{"@name":"somename","@map9":{"address2":{"@state.":"some state","@@stree@t@":"some street","8@_zip":123}}}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_map_out.json b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_map_out.json
new file mode 100644
index 00000000000..8c8d11d0e26
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_nested_map_out.json
@@ -0,0 +1,2 @@
+{"__name":"somename","__map9":{"address1":{"__state__":"some state","____stree__t__":"some street","_____zip":123}}}
+{"__name":"somename","__map9":{"address2":{"__state__":"some state","____stree__t__":"some street","_____zip":123}}}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_out.json b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_out.json
new file mode 100644
index 00000000000..fb25aca9236
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/avro_sanitization_bad_naming_out.json
@@ -0,0 +1,2 @@
+{"___addr____ess":{"__state__":"some state","____stree__t__":"some street","_____zip":123},"__name":"random name","_occu9pation":"some occupation","__plac__e__":"some place"}
+{"___addr____ess":{"__state__":"some state","____stree__t__":"some street","_____zip":123},"__name":"random name","_occu9pation":"some occupation","__plac__e__":"some place"}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/file_schema_provider_invalid.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/file_schema_provider_invalid.avsc
new file mode 100644
index 00000000000..19fe1c19193
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/file_schema_provider_invalid.avsc
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+  "type": "record",
+  "name": "Person",
+  "fields": [
+    {
+      "name": "@firstname",
+      "type": "string"
+    },
+    {
+      "name": "9lastname",
+      "type": "string"
+    },
+    {
+      "name": "address",
+      "type": {
+        "type": "record",
+        "name": "@Address",
+        "fields": [
+          {
+            "name": "@stree9add*ress",
+            "type": ["string", "null"],
+            "default": "@@@any_address"
+          },
+          {
+            "name": "cit.y.",
+            "type": "string"
+          }
+        ]
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/file_schema_provider_valid.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/file_schema_provider_valid.avsc
new file mode 100644
index 00000000000..23ce5036f9f
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/file_schema_provider_valid.avsc
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+  "type": "record",
+  "name": "Person",
+  "fields": [
+    {
+      "name": "firstname",
+      "type": "string"
+    },
+    {
+      "name": "lastname",
+      "type": "string"
+    },
+    {
+      "name": "address",
+      "type": {
+        "type": "record",
+        "name": "Address",
+        "fields": [
+          {
+            "name": "streetaddress",
+            "type": "string"
+          },
+          {
+            "name": "city",
+            "type": "string"
+          }
+        ]
+      }
+    }
+  ]
+}
\ No newline at end of file