You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/10 08:24:33 UTC

[GitHub] [hudi] vamshigv opened a new pull request, #6905: column name sanitization for row source

vamshigv opened a new pull request, #6905:
URL: https://github.com/apache/hudi/pull/6905

   ### Change Logs
   
   _Describe context and summary for this change. Highlight if any code was copied._
   
   ### Impact
   
   _Describe any public API or user-facing feature change or any performance impact._
   
   **Risk level: none | low | medium | high**
   
   _Choose one. If medium or high, explain what verification was done to mitigate the risks._
   
   ### Documentation Update
   
   _Describe any necessary documentation update if there is any new feature, config, or user-facing change_
   
   - _The config description must be updated if new configs are added or the default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
     ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make
     changes to the website._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1322629345

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063",
       "triggerID" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13161",
       "triggerID" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9861c569fbd0e75c261b15716772a91206dce997",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9861c569fbd0e75c261b15716772a91206dce997",
       "triggerType" : "PUSH"
     }, {
       "hash" : "54d091ed449d76037e719a19d49cbc3ae0858aa8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "54d091ed449d76037e719a19d49cbc3ae0858aa8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 18d53fe47c2da60844d6c1982c47fc5ba591f8da Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13161) 
   * 9861c569fbd0e75c261b15716772a91206dce997 UNKNOWN
   * 54d091ed449d76037e719a19d49cbc3ae0858aa8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
vamshigv commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1316069077

   @alexeykudinkin has reviewed it already


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1317935519

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063",
       "triggerID" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef153fa665c0cdf75d4716dce2292c3ff536a015 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1272998511

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fbedff4ed9dcc262ca69651448b294328b9d8627 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv commented on a diff in pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
vamshigv commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1028453016


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -68,7 +186,8 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
             r.getCheckpointForNextBatch(), r.getSchemaProvider());
       }
       case ROW: {
-        InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
+        InputBatch<Dataset<Row>> r = trySanitizeFieldNames(((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit),

Review Comment:
   Any source will benefit from this. We have hit the situations for RowSource and tackled it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv commented on pull request #6905: column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
vamshigv commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1284964742

   > 
   
   JIRA is already created and I updated the description both in the PR and in JIRA (https://issues.apache.org/jira/browse/HUDI-5001). There are some finishing touches so marked the PR as draft.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv commented on a diff in pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
vamshigv commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1028451987


##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -704,10 +705,25 @@ public static Schema getNullSchema() {
    * @return sanitized name
    */
   public static String sanitizeName(String name) {
-    if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
-      name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
+    return sanitizeName(name, MASK_FOR_INVALID_CHARS_IN_NAMES);

Review Comment:
   This is used in `HoodieSparkBootstrapSchemaProvider` and `AvroConversionUtils.Scala`. Can't remove this. `AVRO_FIELD_NAME_INVALID_CHAR_MASK` is scoped to deltastreamer source as we tackle this santization in limited cases only. So I believe uniification can be done when the sanitization is implemented for all cases
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv commented on a diff in pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
vamshigv commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1028472918


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
  */
 public final class SourceFormatAdapter implements Closeable {
 
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes invalid columns both in the data read from source and also in the schema.
+    // invalid definition here goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.column.names")
+        .defaultValue(false)
+        .withDocumentation("Sanitizes invalid column names both in the data and also in the schema");
+
+    public static final ConfigProperty<String> AVRO_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+        .defaultValue("__")
+        .withDocumentation("Character mask to be used as replacement for invalid field names");
+
+    public SourceFormatAdapterConfig() {
+      super();
+    }
+
+    public SourceFormatAdapterConfig(TypedProperties props) {
+      super(props);
+    }
+  }
+
   private final Source source;
+  private final SourceFormatAdapterConfig config;
 
   public SourceFormatAdapter(Source source) {
+    this(source, Option.empty());
+  }
+
+  public SourceFormatAdapter(Source source,
+                             Option<TypedProperties> props) {
     this.source = source;
+    this.config = props.isPresent() ? new SourceFormatAdapterConfig(props.get()) : new SourceFormatAdapterConfig();
+  }
+
+  /**
+   * Config that automatically sanitizes the field names as per avro naming rules.
+   * @return enabled status.
+   */
+  private boolean isNameSanitizingEnabled() {
+    return config.getBooleanOrDefault(SourceFormatAdapterConfig.SANITIZE_AVRO_FIELD_NAMES);
+  }
+
+  /**
+   * Replacement mask for invalid characters encountered in avro names.
+   * @return sanitized value.
+   */
+  private String getInvalidCharMask() {
+    return config.getStringOrDefault(SourceFormatAdapterConfig.AVRO_FIELD_NAME_INVALID_CHAR_MASK);
+  }
+
+  private static DataType sanitizeDataTypeForAvro(DataType dataType, String invalidCharMask) {
+    if (dataType instanceof ArrayType) {
+      ArrayType arrayType = (ArrayType) dataType;
+      DataType sanitizedDataType = sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+      return new ArrayType(sanitizedDataType, arrayType.containsNull());
+    } else if (dataType instanceof MapType) {
+      MapType mapType = (MapType) dataType;
+      DataType sanitizedKeyDataType = sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+      DataType sanitizedValueDataType = sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+      return new MapType(sanitizedKeyDataType, sanitizedValueDataType, mapType.valueContainsNull());
+    } else if (dataType instanceof StructType) {
+      return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+    }
+    return dataType;
+  }
+
+  // TODO: Rebase this to use InternalSchema when it is ready.

Review Comment:
   done and ref in TODO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1322623775

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063",
       "triggerID" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13161",
       "triggerID" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9861c569fbd0e75c261b15716772a91206dce997",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9861c569fbd0e75c261b15716772a91206dce997",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef153fa665c0cdf75d4716dce2292c3ff536a015 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063) 
   * 18d53fe47c2da60844d6c1982c47fc5ba591f8da Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13161) 
   * 9861c569fbd0e75c261b15716772a91206dce997 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
vamshigv commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1322586754

   @nsivabalan I think that spark-writer flows will need this kind of santitzation too (if they go through avro transformation at some stage which I believe they do). But it is not in scope of tackling on our side. Support for Json sources is in scope though


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1322635489

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063",
       "triggerID" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13161",
       "triggerID" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9861c569fbd0e75c261b15716772a91206dce997",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9861c569fbd0e75c261b15716772a91206dce997",
       "triggerType" : "PUSH"
     }, {
       "hash" : "54d091ed449d76037e719a19d49cbc3ae0858aa8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13162",
       "triggerID" : "54d091ed449d76037e719a19d49cbc3ae0858aa8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 18d53fe47c2da60844d6c1982c47fc5ba591f8da Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13161) 
   * 9861c569fbd0e75c261b15716772a91206dce997 UNKNOWN
   * 54d091ed449d76037e719a19d49cbc3ae0858aa8 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13162) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1322559352

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063",
       "triggerID" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef153fa665c0cdf75d4716dce2292c3ff536a015 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063) 
   * 18d53fe47c2da60844d6c1982c47fc5ba591f8da UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1317802830

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063",
       "triggerID" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fbedff4ed9dcc262ca69651448b294328b9d8627 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102) 
   * ef153fa665c0cdf75d4716dce2292c3ff536a015 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1272992008

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fbedff4ed9dcc262ca69651448b294328b9d8627 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1273163957

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fbedff4ed9dcc262ca69651448b294328b9d8627 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1317797845

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fbedff4ed9dcc262ca69651448b294328b9d8627 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102) 
   * ef153fa665c0cdf75d4716dce2292c3ff536a015 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1322791162

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063",
       "triggerID" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13161",
       "triggerID" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9861c569fbd0e75c261b15716772a91206dce997",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9861c569fbd0e75c261b15716772a91206dce997",
       "triggerType" : "PUSH"
     }, {
       "hash" : "54d091ed449d76037e719a19d49cbc3ae0858aa8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13162",
       "triggerID" : "54d091ed449d76037e719a19d49cbc3ae0858aa8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9861c569fbd0e75c261b15716772a91206dce997 UNKNOWN
   * 54d091ed449d76037e719a19d49cbc3ae0858aa8 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13162) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1025891341


##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -704,10 +705,25 @@ public static Schema getNullSchema() {
    * @return sanitized name
    */
   public static String sanitizeName(String name) {
-    if (name.substring(0, 1).matches(INVALID_AVRO_FIRST_CHAR_IN_NAMES)) {
-      name = name.replaceFirst(INVALID_AVRO_FIRST_CHAR_IN_NAMES, MASK_FOR_INVALID_CHARS_IN_NAMES);
+    return sanitizeName(name, MASK_FOR_INVALID_CHARS_IN_NAMES);

Review Comment:
   where is this method used. should we remove this. or use the default value for config AVRO_FIELD_NAME_INVALID_CHAR_MASK



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -68,7 +186,8 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
             r.getCheckpointForNextBatch(), r.getSchemaProvider());
       }
       case ROW: {
-        InputBatch<Dataset<Row>> r = ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
+        InputBatch<Dataset<Row>> r = trySanitizeFieldNames(((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit),

Review Comment:
   why is this applicable only for Row source? what incase someone wants to sanitize for other sources as well? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
  */
 public final class SourceFormatAdapter implements Closeable {
 
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes invalid columns both in the data read from source and also in the schema.
+    // invalid definition here goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES = ConfigProperty

Review Comment:
   minor. SANITIZE_SCHEMA_FIELD_NAMES



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
  */
 public final class SourceFormatAdapter implements Closeable {
 
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes invalid columns both in the data read from source and also in the schema.
+    // invalid definition here goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.column.names")
+        .defaultValue(false)
+        .withDocumentation("Sanitizes invalid column names both in the data and also in the schema");
+
+    public static final ConfigProperty<String> AVRO_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+        .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
+        .defaultValue("__")
+        .withDocumentation("Character mask to be used as replacement for invalid field names");
+
+    public SourceFormatAdapterConfig() {
+      super();
+    }
+
+    public SourceFormatAdapterConfig(TypedProperties props) {
+      super(props);
+    }
+  }
+
   private final Source source;
+  private final SourceFormatAdapterConfig config;
 
   public SourceFormatAdapter(Source source) {
+    this(source, Option.empty());
+  }
+
+  public SourceFormatAdapter(Source source,
+                             Option<TypedProperties> props) {
     this.source = source;
+    this.config = props.isPresent() ? new SourceFormatAdapterConfig(props.get()) : new SourceFormatAdapterConfig();
+  }
+
+  /**
+   * Config that automatically sanitizes the field names as per avro naming rules.
+   * @return enabled status.
+   */
+  private boolean isNameSanitizingEnabled() {
+    return config.getBooleanOrDefault(SourceFormatAdapterConfig.SANITIZE_AVRO_FIELD_NAMES);
+  }
+
+  /**
+   * Replacement mask for invalid characters encountered in avro names.
+   * @return sanitized value.
+   */
+  private String getInvalidCharMask() {
+    return config.getStringOrDefault(SourceFormatAdapterConfig.AVRO_FIELD_NAME_INVALID_CHAR_MASK);
+  }
+
+  private static DataType sanitizeDataTypeForAvro(DataType dataType, String invalidCharMask) {
+    if (dataType instanceof ArrayType) {
+      ArrayType arrayType = (ArrayType) dataType;
+      DataType sanitizedDataType = sanitizeDataTypeForAvro(arrayType.elementType(), invalidCharMask);
+      return new ArrayType(sanitizedDataType, arrayType.containsNull());
+    } else if (dataType instanceof MapType) {
+      MapType mapType = (MapType) dataType;
+      DataType sanitizedKeyDataType = sanitizeDataTypeForAvro(mapType.keyType(), invalidCharMask);
+      DataType sanitizedValueDataType = sanitizeDataTypeForAvro(mapType.valueType(), invalidCharMask);
+      return new MapType(sanitizedKeyDataType, sanitizedValueDataType, mapType.valueContainsNull());
+    } else if (dataType instanceof StructType) {
+      return sanitizeStructTypeForAvro((StructType) dataType, invalidCharMask);
+    }
+    return dataType;
+  }
+
+  // TODO: Rebase this to use InternalSchema when it is ready.

Review Comment:
   can you file a follow up jira for this? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
  */
 public final class SourceFormatAdapter implements Closeable {
 
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes invalid columns both in the data read from source and also in the schema.
+    // invalid definition here goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES = ConfigProperty

Review Comment:
   you can change the config key accordingly as well



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -50,19 +67,98 @@ public static class Config {
 
   protected Schema targetSchema;
 
+  private static List<Object> transformList(List<Object> src, String invalidCharMask) {
+    return src.stream().map(obj -> {
+      if (obj instanceof List) {
+        return transformList((List<Object>) obj, invalidCharMask);
+      } else if (obj instanceof Map) {
+        return transformMap((Map<String, Object>) obj, invalidCharMask);
+      } else {
+        return obj;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private static Map<String, Object> transformMap(Map<String, Object> src, String invalidCharMask) {
+    return src.entrySet().stream()
+        .map(kv -> {
+          if (kv.getValue() instanceof List) {
+            return Pair.of(kv.getKey(), transformList((List<Object>) kv.getValue(), invalidCharMask));
+          } else if (kv.getValue() instanceof Map) {
+            return Pair.of(kv.getKey(), transformMap((Map<String, Object>) kv.getValue(), invalidCharMask));
+          } else if (kv.getValue() instanceof String) {
+            String currentStrValue = (String) kv.getValue();
+            if (kv.getKey().equals(AVRO_FIELD_NAME_KEY)) {
+              return Pair.of(kv.getKey(), HoodieAvroUtils.sanitizeName(currentStrValue, invalidCharMask));
+            }
+            return Pair.of(kv.getKey(), currentStrValue);
+          } else {
+            return Pair.of(kv.getKey(), kv.getValue());
+          }
+        }).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+  }
+
+  private static Option<Schema> parseSanitizedAvroSchemaNoThrow(String schemaStr, String invalidCharMask) {
+    try {
+      ObjectMapper objectMapper = new ObjectMapper();
+      objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
+      Map<String, Object> objMap = objectMapper.readValue(schemaStr, Map.class);
+      Map<String, Object> modifiedMap = transformMap(objMap, invalidCharMask);
+      return Option.of(new Schema.Parser().parse(objectMapper.writeValueAsString(modifiedMap)));
+    } catch (Exception ex) {
+      return Option.empty();
+    }
+  }
+
+  /*
+   * We first rely on Avro to parse and then try to rename only for those failed.
+   * This way we can improve our parsing capabilities without breaking existing functionality.
+   * For example we don't yet support multiple named schemas defined in a file.
+   */
+  private static Schema parseAvroSchema(String schemaStr, boolean sanitizeSchema, String invalidCharMask) {
+    try {
+      return new Schema.Parser().parse(schemaStr);
+    } catch (SchemaParseException spe) {
+      // if sanitizing is not enabled rethrow the exception.
+      if (!sanitizeSchema) {
+        throw spe;
+      }
+      // Rename avro fields and try parsing once again.
+      Option<Schema> parseResult = parseSanitizedAvroSchemaNoThrow(schemaStr, invalidCharMask);
+      if (!parseResult.isPresent()) {
+        // throw original exception.
+        throw spe;
+      }
+      return parseResult.get();
+    }
+  }
+
+  private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem fs, boolean sanitizeSchema, String invalidCharMask) {
+    String schemaStr;
+    FSDataInputStream in = null;
+    try {

Review Comment:
   try w/ resource might reduce the num lines. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
hudi-bot commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1322618043

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=12102",
       "triggerID" : "fbedff4ed9dcc262ca69651448b294328b9d8627",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063",
       "triggerID" : "ef153fa665c0cdf75d4716dce2292c3ff536a015",
       "triggerType" : "PUSH"
     }, {
       "hash" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13161",
       "triggerID" : "18d53fe47c2da60844d6c1982c47fc5ba591f8da",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef153fa665c0cdf75d4716dce2292c3ff536a015 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13063) 
   * 18d53fe47c2da60844d6c1982c47fc5ba591f8da Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=13161) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv commented on a diff in pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
vamshigv commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1028470927


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java:
##########
@@ -50,19 +67,98 @@ public static class Config {
 
   protected Schema targetSchema;
 
+  private static List<Object> transformList(List<Object> src, String invalidCharMask) {
+    return src.stream().map(obj -> {
+      if (obj instanceof List) {
+        return transformList((List<Object>) obj, invalidCharMask);
+      } else if (obj instanceof Map) {
+        return transformMap((Map<String, Object>) obj, invalidCharMask);
+      } else {
+        return obj;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private static Map<String, Object> transformMap(Map<String, Object> src, String invalidCharMask) {
+    return src.entrySet().stream()
+        .map(kv -> {
+          if (kv.getValue() instanceof List) {
+            return Pair.of(kv.getKey(), transformList((List<Object>) kv.getValue(), invalidCharMask));
+          } else if (kv.getValue() instanceof Map) {
+            return Pair.of(kv.getKey(), transformMap((Map<String, Object>) kv.getValue(), invalidCharMask));
+          } else if (kv.getValue() instanceof String) {
+            String currentStrValue = (String) kv.getValue();
+            if (kv.getKey().equals(AVRO_FIELD_NAME_KEY)) {
+              return Pair.of(kv.getKey(), HoodieAvroUtils.sanitizeName(currentStrValue, invalidCharMask));
+            }
+            return Pair.of(kv.getKey(), currentStrValue);
+          } else {
+            return Pair.of(kv.getKey(), kv.getValue());
+          }
+        }).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
+  }
+
+  private static Option<Schema> parseSanitizedAvroSchemaNoThrow(String schemaStr, String invalidCharMask) {
+    try {
+      ObjectMapper objectMapper = new ObjectMapper();
+      objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
+      Map<String, Object> objMap = objectMapper.readValue(schemaStr, Map.class);
+      Map<String, Object> modifiedMap = transformMap(objMap, invalidCharMask);
+      return Option.of(new Schema.Parser().parse(objectMapper.writeValueAsString(modifiedMap)));
+    } catch (Exception ex) {
+      return Option.empty();
+    }
+  }
+
+  /*
+   * We first rely on Avro to parse and then try to rename only for those failed.
+   * This way we can improve our parsing capabilities without breaking existing functionality.
+   * For example we don't yet support multiple named schemas defined in a file.
+   */
+  private static Schema parseAvroSchema(String schemaStr, boolean sanitizeSchema, String invalidCharMask) {
+    try {
+      return new Schema.Parser().parse(schemaStr);
+    } catch (SchemaParseException spe) {
+      // if sanitizing is not enabled rethrow the exception.
+      if (!sanitizeSchema) {
+        throw spe;
+      }
+      // Rename avro fields and try parsing once again.
+      Option<Schema> parseResult = parseSanitizedAvroSchemaNoThrow(schemaStr, invalidCharMask);
+      if (!parseResult.isPresent()) {
+        // throw original exception.
+        throw spe;
+      }
+      return parseResult.get();
+    }
+  }
+
+  private static Schema readAvroSchemaFromFile(String schemaPath, FileSystem fs, boolean sanitizeSchema, String invalidCharMask) {
+    String schemaStr;
+    FSDataInputStream in = null;
+    try {

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] vamshigv commented on a diff in pull request #6905: [HUDI-5001] column name sanitization for row source

Posted by GitBox <gi...@apache.org>.
vamshigv commented on code in PR #6905:
URL: https://github.com/apache/hudi/pull/6905#discussion_r1028472659


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -48,10 +56,120 @@
  */
 public final class SourceFormatAdapter implements Closeable {
 
+  public static class SourceFormatAdapterConfig extends HoodieConfig {
+    // sanitizes invalid columns both in the data read from source and also in the schema.
+    // invalid definition here goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).
+    public static final ConfigProperty<Boolean> SANITIZE_AVRO_FIELD_NAMES = ConfigProperty

Review Comment:
   yeah updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [HUDI-5001] column name sanitization for row source [hudi]

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on PR #6905:
URL: https://github.com/apache/hudi/pull/6905#issuecomment-1986736518

   @jonvex Do you think if there is still value to add this feature?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org