You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "alexeykudinkin (via GitHub)" <gi...@apache.org> on 2023/01/28 03:07:14 UTC

[GitHub] [hudi] alexeykudinkin opened a new pull request, #7769: [MINOR] Fixing performance regression in `HoodieSparkRecord`

alexeykudinkin opened a new pull request, #7769:
URL: https://github.com/apache/hudi/pull/7769

   ### Change Logs
   
   This change addresses a few performance regressions in `HoodieSparkRecord` identified during our recent benchmarking
   
   ### Impact
   
   TBA
   
   ### Risk level (write none, low medium or high below)
   
   Low
   
   ### Documentation Update
   
   N/A
   
   - _The config description must be updated if new configs are added or the default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
     ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make
     changes to the website._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407328863

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 80d38554649038cb9e668be4edc3a3c0a2c4373f Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693) 
   * 36a0d9aeab63f713dff106ed9a76411aceb900b2 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1410032054

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14784",
       "triggerID" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "triggerType" : "PUSH"
     }, {
       "hash" : "325244765016f67034fd8f364942028fe217ecb5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14788",
       "triggerID" : "325244765016f67034fd8f364942028fe217ecb5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "24020a964671b35fb9aa7b86748771fd71512495",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14800",
       "triggerID" : "24020a964671b35fb9aa7b86748771fd71512495",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5 UNKNOWN
   * 24020a964671b35fb9aa7b86748771fd71512495 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14800) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407314821

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ece5561859923b8773d6ff9fa633f014c104300 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688) 
   * 80d38554649038cb9e668be4edc3a3c0a2c4373f Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693) 
   * 36a0d9aeab63f713dff106ed9a76411aceb900b2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1409581411

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774) 
   * 9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5 UNKNOWN
   * 62f1095775d8c3effa93175cdcc6423fca9a3968 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1409641872

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14784",
       "triggerID" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "triggerType" : "PUSH"
     }, {
       "hash" : "325244765016f67034fd8f364942028fe217ecb5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14788",
       "triggerID" : "325244765016f67034fd8f364942028fe217ecb5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5 UNKNOWN
   * 62f1095775d8c3effa93175cdcc6423fca9a3968 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14784) 
   * 325244765016f67034fd8f364942028fe217ecb5 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14788) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] wzx140 commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "wzx140 (via GitHub)" <gi...@apache.org>.
wzx140 commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089872194


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -190,9 +194,10 @@ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
-    UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType).apply(rewriteRecord);
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, targetStructType, Collections.emptyMap());

Review Comment:
   rewriteRecord is used to do compatible with avro schema evolution. And rewriteRecordWithNewSchema is used to do hudi schema evolution. They have different logic in type change.
   For example, we can not change IntegerType -> DecimalType in rewriteRecord. But we can change it in rewriteRecordWithNewSchema.
   
   Should we keep this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1090044032


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -94,8 +92,8 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
 
     // In case Advanced Schema Evolution is enabled we might need to rewrite currently
     // persisted records to adhere to an evolved schema
-    Option<Pair<Function<Schema, Function<HoodieRecord, HoodieRecord>>, Schema>> schemaEvolutionTransformerOpt =
-        composeSchemaEvolutionTransformer(writerSchema, baseFile, writeConfig, table.getMetaClient());
+    Option<Function<HoodieRecord, HoodieRecord>> schemaEvolutionTransformerOpt =

Review Comment:
   Simplifying implementation by supplying reader-scheme into the method



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java:
##########
@@ -38,19 +38,7 @@
 
 public class SparkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
 
-  private boolean useWriterSchema;
-
-  public SparkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,

Review Comment:
   Dead code



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -150,8 +151,9 @@ public String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator> keyGene
       return getRecordKey();
     }
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
-    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get())
-        .getRecordKey(data, structType).toString() : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+    return keyGeneratorOpt.isPresent()

Review Comment:
   This code is unchanged (there was a change but it got reverted)



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -18,304 +18,413 @@
 
 package org.apache.spark.sql
 
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
 import org.apache.avro.Schema
 import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
 import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
 import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections, Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
 
 object HoodieInternalRowUtils {
 
-  // Projection are all thread local. Projection is not thread-safe
-  val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType, StructType), UnsafeProjection]] =
-    ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType), UnsafeProjection]] {
-      override def get(): HashMap[(StructType, StructType), UnsafeProjection] = new HashMap[(StructType, StructType), UnsafeProjection]
+  private type RenamedColumnMap = JMap[String, String]
+  private type UnsafeRowWriter = InternalRow => UnsafeRow

Review Comment:
   Utilities in this class had to be essentially reimplemented to support decoupling of generation of `RowWriter` transforming rows from one schema into another.
   
   Decoupling is implemented in a following way (mirroring that one implemented in Spark's Avro Serializer/Deserializer):
   
     - When `RowWriter` is created we traverse both new and old schemas and determine how the fields are mapped
     - For every new field we create a field-writer (returned by `newWriterRenaming`) that accepts `CatalystFieldUpdater` and current value of the field and transforms into the new schema by either renaming, converting or keeping it intact
     - For fields held w/in a struct all field-writers are assembled into a single struct's `RowWriter` (returned by `genUnsafeStructWriter`)



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java:
##########
@@ -127,7 +127,9 @@ public void testInterruptExecutor() {
           @Override
           public void consume(HoodieRecord record) {
             try {
-              Thread.currentThread().wait();
+              synchronized (this) {

Review Comment:
   That's fixing the flaky test



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala:
##########
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.testutils.HoodieClientTestUtils
-
-import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{HoodieInternalRowUtils, Row, SparkSession}
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
-class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAfterAll {

Review Comment:
   These are merged into another test suite



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -186,48 +192,39 @@ public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
   }
 
   @Override
-  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema) throws IOException {
+  public HoodieRecord prependMetaFields(Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
-    UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType).apply(rewriteRecord);
-
-    boolean containMetaFields = hasMetaFields(targetStructType);
-    UTF8String[] metaFields = tryExtractMetaFields(unsafeRow, targetStructType);
-    HoodieInternalRow internalRow = new HoodieInternalRow(metaFields, unsafeRow, containMetaFields);
+    HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(this.data, structType);
+    updateMetadataValuesInternal(updatableRow, metadataValues);
 
-    return new HoodieSparkRecord(getKey(), internalRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, false);
+    return new HoodieSparkRecord(getKey(), updatableRow, targetStructType, getOperation(), this.currentLocation, this.newLocation, false);
   }
 
   @Override
-  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) throws IOException {
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema, Map<String, String> renameCols) {
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType, newStructType, renameCols);
-    UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType, newStructType).apply(rewriteRecord);
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, newStructType, renameCols);
 
-    boolean containMetaFields = hasMetaFields(newStructType);

Review Comment:
   Wrapping into `HoodieInternalRow` has been removed and abstracted to only occur in `prependMetaFields` API (considerably simplifying the impl here)



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -18,304 +18,413 @@
 
 package org.apache.spark.sql
 
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
 import org.apache.avro.Schema
 import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
 import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
 import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections, Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
 
 object HoodieInternalRowUtils {
 
-  // Projection are all thread local. Projection is not thread-safe
-  val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType, StructType), UnsafeProjection]] =
-    ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType), UnsafeProjection]] {
-      override def get(): HashMap[(StructType, StructType), UnsafeProjection] = new HashMap[(StructType, StructType), UnsafeProjection]
+  private type RenamedColumnMap = JMap[String, String]
+  private type UnsafeRowWriter = InternalRow => UnsafeRow
+
+  // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe
+  private val unsafeWriterThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] {
+      override def get(): mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter] =
+        new mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]
     })
-  val schemaMap = new ConcurrentHashMap[Schema, StructType]
-  val orderPosListMap = new ConcurrentHashMap[(StructType, String), NestedFieldPath]
 
-  /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
-   */
-  def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = {
-    val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
-
-    for ((field, pos) <- newSchema.fields.zipWithIndex) {
-      var oldValue: AnyRef = null
-      var oldType: DataType = null
-      if (existField(oldSchema, field.name)) {
-        val oldField = oldSchema(field.name)
-        val oldPos = oldSchema.fieldIndex(field.name)
-        oldType = oldField.dataType
-        oldValue = oldRecord.get(oldPos, oldType)
-      }
-      if (oldValue != null) {
-        field.dataType match {
-          case structType: StructType =>
-            val oldType = oldSchema(field.name).dataType.asInstanceOf[StructType]
-            val newValue = rewriteRecord(oldValue.asInstanceOf[InternalRow], oldType, structType)
-            newRow.update(pos, newValue)
-          case decimalType: DecimalType =>
-            val oldFieldSchema = oldSchema(field.name).dataType.asInstanceOf[DecimalType]
-            if (decimalType.scale != oldFieldSchema.scale || decimalType.precision != oldFieldSchema.precision) {
-              newRow.update(pos, Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-              )
-            } else {
-              newRow.update(pos, oldValue)
-            }
-          case t if t == oldType => newRow.update(pos, oldValue)
-          // Type promotion
-          case _: ShortType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toShort)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: IntegerType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toInt)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toInt)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: LongType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toLong)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toLong)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toLong)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: FloatType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toFloat)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toFloat)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toFloat)
-              case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toFloat)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: DoubleType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toDouble)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toDouble)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toDouble)
-              case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toDouble)
-              case _: FloatType => newRow.update(pos, oldValue.asInstanceOf[Float].toDouble)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: BinaryType if oldType.isInstanceOf[StringType] => newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
-          case _ => newRow.update(pos, oldValue)
-        }
-      } else {
-        // TODO default value in newSchema
-      }
-    }
+  // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe
+  private val unsafeProjectionThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType), UnsafeProjection]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType), UnsafeProjection]] {
+      override def get(): mutable.HashMap[(StructType, StructType), UnsafeProjection] =
+        new mutable.HashMap[(StructType, StructType), UnsafeProjection]
+    })
 
-    newRow
-  }
+  private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+  private val orderPosListMap = new ConcurrentHashMap[(StructType, String), Option[NestedFieldPath]]
 
   /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord, org.apache.avro.Schema, java.util.Map)
+   * Provides cached instance of [[UnsafeProjection]] transforming provided [[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * For more details regarding its semantic, please check corresponding scala-doc for
+   * [[HoodieCatalystExpressionUtils.generateUnsafeProjection]]
    */
-  def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: java.util.Map[String, String]): InternalRow = {
-    rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new java.util.LinkedList[String]).asInstanceOf[InternalRow]
+  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
+    unsafeProjectionThreadLocal.get()
+      .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
   }
 
   /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object, org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque)
+   * Provides cached instance of [[UnsafeRowWriter]] transforming provided [[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * Unlike [[UnsafeProjection]] requiring that [[from]] has to be a proper subset of [[to]] schema,
+   * [[UnsafeRowWriter]] is able to perform whole spectrum of schema-evolution transformations including:
+   *
+   * <ul>
+   *   <li>Transforming nested structs/maps/arrays</li>
+   *   <li>Handling type promotions (int -> long, etc)</li>
+   *   <li>Handling (field) renames</li>
+   * </ul>
    */
-  private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: java.util.Map[String, String], fieldNames: java.util.Deque[String]): Any = {
-    if (oldRecord == null) {
-      null
-    } else {
-      newSchema match {
-        case targetSchema: StructType =>
-          if (!oldRecord.isInstanceOf[InternalRow]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldRow = oldRecord.asInstanceOf[InternalRow]
-          val helper = mutable.Map[Integer, Any]()
-
-          val oldStrucType = oldSchema.asInstanceOf[StructType]
-          targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
-            fieldNames.push(field.name)
-            if (existField(oldStrucType, field.name)) {
-              val oldField = oldStrucType(field.name)
-              val oldPos = oldStrucType.fieldIndex(field.name)
-              helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-            } else {
-              val fieldFullName = createFullName(fieldNames)
-              val colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.")
-              val lastColNameFromOldSchema = colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
-              // deal with rename
-              if (!existField(oldStrucType, field.name) && existField(oldStrucType, lastColNameFromOldSchema)) {
-                // find rename
-                val oldField = oldStrucType(lastColNameFromOldSchema)
-                val oldPos = oldStrucType.fieldIndex(lastColNameFromOldSchema)
-                helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-              }
-            }
-            fieldNames.pop()
-          }
-          val newRow = new GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
-          targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
-            if (helper.contains(i)) {
-              newRow.update(i, helper(i))
-            } else {
-              // TODO add default val
-              newRow.update(i, null)
-            }
-          }
+  def getCachedUnsafeRowWriter(from: StructType, to: StructType, renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()): UnsafeRowWriter = {
+    unsafeWriterThreadLocal.get()
+      .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from, to, renamedColumnsMap))
+  }
 
-          newRow
-        case targetSchema: ArrayType =>
-          if (!oldRecord.isInstanceOf[ArrayData]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
-          val oldArray = oldRecord.asInstanceOf[ArrayData]
-          val newElementType = targetSchema.elementType
-          val newArray = new GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
-          fieldNames.push("element")
-          oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case (value, i) => newArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, newElementType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newArray
-        case targetSchema: MapType =>
-          if (!oldRecord.isInstanceOf[MapData]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldValueType = oldSchema.asInstanceOf[MapType].valueType
-          val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
-          val oldMap = oldRecord.asInstanceOf[MapData]
-          val newValueType = targetSchema.valueType
-          val newKeyArray = new GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newValueArray = new GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
-          fieldNames.push("value")
-          oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case (value, i) => newKeyArray.update(i, rewritePrimaryType(value, oldKeyType, oldKeyType)) }
-          oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { case (value, i) => newValueArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, newValueType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newMap
-        case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
-      }
+  def getCachedPosList(structType: StructType, field: String): Option[NestedFieldPath] = {
+    val nestedFieldPathOpt = orderPosListMap.get((structType, field))
+    // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path
+    if (nestedFieldPathOpt != null) {
+      nestedFieldPathOpt
+    } else {
+      orderPosListMap.computeIfAbsent((structType, field), new JFunction[(StructType, String), Option[NestedFieldPath]] {
+        override def apply(t: (StructType, String)): Option[NestedFieldPath] =
+          composeNestedFieldPath(structType, field)
+      })
     }
   }
 
-  def getCachedPosList(structType: StructType, field: String): NestedFieldPath = {
-    val schemaPair = (structType, field)
-    if (!orderPosListMap.containsKey(schemaPair)) {
-      val posList = HoodieUnsafeRowUtils.composeNestedFieldPath(structType, field)
-      orderPosListMap.put(schemaPair, posList)
+  def getCachedSchema(schema: Schema): StructType = {
+    val structType = schemaMap.get(schema)
+    // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path
+    if (structType != null) {
+      structType
+    } else {
+      schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
+        override def apply(t: Schema): StructType =
+          convertAvroSchemaToStructType(schema)
+      })
     }
-    orderPosListMap.get(schemaPair)
   }
 
-  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
-    val schemaPair = (from, to)
-    val map = unsafeProjectionThreadLocal.get()
-    if (!map.containsKey(schemaPair)) {
-      val projection = HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to)
-      map.put(schemaPair, projection)
+  private[sql] def genUnsafeRowWriter(prevSchema: StructType,
+                                      newSchema: StructType,
+                                      renamedColumnsMap: JMap[String, String]): UnsafeRowWriter = {
+    val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, new JArrayDeque[String]())
+    val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+    val phonyUpdater = new CatalystDataUpdater {
+      var value: InternalRow = _
+
+      override def set(ordinal: Int, value: Any): Unit =
+        this.value = value.asInstanceOf[InternalRow]
     }
-    map.get(schemaPair)
-  }
 
-  def getCachedSchema(schema: Schema): StructType = {
-    if (!schemaMap.containsKey(schema)) {
-      val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
-      schemaMap.put(schema, structType)
+    oldRow => {
+      writer(phonyUpdater, 0, oldRow)
+      unsafeProjection(phonyUpdater.value)
     }
-    schemaMap.get(schema)
   }
 
-  def existField(structType: StructType, name: String): Boolean = {
-    try {
-      HoodieUnsafeRowUtils.composeNestedFieldPath(structType, name)
-      true
-    } catch {
-      case _: IllegalArgumentException => false
+  private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+  private def genUnsafeStructWriter(prevStructType: StructType,
+                                    newStructType: StructType,
+                                    renamedColumnsMap: JMap[String, String],
+                                    fieldNamesStack: JDeque[String]): (CatalystDataUpdater, Any) => Unit = {
+    // TODO need to canonicalize schemas (casing)
+    val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+    val positionMap = ArrayBuffer.empty[Int]
+
+    for (newField <- newStructType.fields) {
+      fieldNamesStack.push(newField.name)
+
+      val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+        prevStructType.getFieldIndex(newField.name) match {
+          case Some(prevFieldPos) =>
+            val prevField = prevStructType(prevFieldPos)
+            (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+          case None =>
+            val newFieldQualifiedName = createFullName(fieldNamesStack)
+            val prevFieldName: String = lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+            // Handle rename
+            prevStructType.getFieldIndex(prevFieldName) match {
+              case Some(prevFieldPos) =>
+                val prevField = prevStructType.fields(prevFieldPos)
+                (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+              case None =>
+                val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => fieldUpdater.setNullAt(ordinal)
+                (updater, -1)
+            }
+        }
+
+      fieldWriters += fieldWriter
+      positionMap += prevFieldPos
+
+      fieldNamesStack.pop()
     }
-  }
 
-  private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = {
-    if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
-      oldSchema match {
-        case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | DateType | TimestampType | BinaryType =>
-          oldValue
-        // Copy UTF8String before putting into GenericInternalRow
-        case StringType => UTF8String.fromString(oldValue.toString)
-        case DecimalType() =>
-          Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-        case _ =>
-          throw new HoodieException("Unknown schema type: " + newSchema)
+    (fieldUpdater, row) => {
+      var pos = 0
+      while (pos < fieldWriters.length) {
+        val prevPos = positionMap(pos)
+        val prevValue = if (prevPos >= 0) {
+          row.asInstanceOf[InternalRow].get(prevPos, prevStructType.fields(prevPos).dataType)
+        } else {
+          null
+        }
+
+        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        pos += 1
       }
-    } else {
-      rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
     }
   }
 
-  private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
-    val value = newSchema match {
-      case NullType | BooleanType =>
-      case DateType if oldSchema.equals(StringType) =>
-        CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(oldValue.toString))
-      case LongType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].longValue())
+  private def newWriterRenaming(prevDataType: DataType,
+                                newDataType: DataType,
+                                renamedColumnsMap: JMap[String, String],
+                                fieldNameStack: JDeque[String]): RowFieldUpdater = {
+    (newDataType, prevDataType) match {

Review Comment:
   All conversions here are the same as they were before and just are rewritten into the form of field-writers (callbacks)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.util;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-
-import org.apache.spark.sql.HoodieInternalRowUtils;
-import org.apache.spark.sql.HoodieUnsafeRowUtils;
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructType;
-
-public class HoodieSparkRecordUtils {
-
-  public static Object getValue(StructType structType, String fieldName, InternalRow row) {

Review Comment:
   Dead code



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1077,29 +1047,27 @@ object HoodieSparkSqlWriter {
             hoodieRecord
           }
         }).toJavaRDD()
+
       case HoodieRecord.HoodieRecordType.SPARK =>
-        // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
         val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
         val dataFileStructType = HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
         val writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema)
         val sourceStructType = df.schema
-        df.queryExecution.toRdd.mapPartitions { iter =>
 
-          iter.map { internalRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(internalRow, sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, sourceStructType)
+        df.queryExecution.toRdd.mapPartitions { it =>
+          val targetStructType = if (shouldDropPartitionColumns) dataFileStructType else writerStructType
+          // NOTE: To make sure we properly transform records
+          val targetStructTypeRowWriter = getCachedUnsafeRowWriter(sourceStructType, targetStructType)

Review Comment:
   This replaces old way of `rewriteRecord` then doing `unsafeProjection` w/ just applying new `UnsafeRowWriter`



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala:
##########
@@ -114,7 +164,22 @@ class TestStructTypeSchemaEvolutionUtils extends FunSuite with Matchers with Bef
     val internalSchema = AvroInternalSchemaConverter.convert(avroSchema)
     // do change type operation
     val updateChange = TableChanges.ColumnUpdateChange.get(internalSchema)
-    updateChange.updateColumnType("id", Types.LongType.get).updateColumnType("comb", Types.FloatType.get).updateColumnType("com1", Types.DoubleType.get).updateColumnType("col0", Types.StringType.get).updateColumnType("col1", Types.FloatType.get).updateColumnType("col11", Types.DoubleType.get).updateColumnType("col12", Types.StringType.get).updateColumnType("col2", Types.DoubleType.get).updateColumnType("col21", Types.StringType.get).updateColumnType("col3", Types.StringType.get).updateColumnType("col31", Types.DecimalType.get(18, 9)).updateColumnType("col4", Types.DecimalType.get(18, 9)).updateColumnType("col41", Types.StringType.get).updateColumnType("col5", Types.DateType.get).updateColumnType("col51", Types.DecimalType.get(18, 9)).updateColumnType("col6", Types.StringType.get)
+    updateChange.updateColumnType("id", Types.LongType.get)

Review Comment:
   No changes just breaking down unreadably long line



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -18,304 +18,413 @@
 
 package org.apache.spark.sql
 
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
 import org.apache.avro.Schema
 import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
 import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
 import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections, Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
 
 object HoodieInternalRowUtils {
 
-  // Projection are all thread local. Projection is not thread-safe
-  val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType, StructType), UnsafeProjection]] =
-    ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType), UnsafeProjection]] {
-      override def get(): HashMap[(StructType, StructType), UnsafeProjection] = new HashMap[(StructType, StructType), UnsafeProjection]
+  private type RenamedColumnMap = JMap[String, String]
+  private type UnsafeRowWriter = InternalRow => UnsafeRow
+
+  // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe
+  private val unsafeWriterThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] {
+      override def get(): mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter] =
+        new mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]
     })
-  val schemaMap = new ConcurrentHashMap[Schema, StructType]
-  val orderPosListMap = new ConcurrentHashMap[(StructType, String), NestedFieldPath]
 
-  /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
-   */
-  def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = {
-    val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
-
-    for ((field, pos) <- newSchema.fields.zipWithIndex) {
-      var oldValue: AnyRef = null
-      var oldType: DataType = null
-      if (existField(oldSchema, field.name)) {
-        val oldField = oldSchema(field.name)
-        val oldPos = oldSchema.fieldIndex(field.name)
-        oldType = oldField.dataType
-        oldValue = oldRecord.get(oldPos, oldType)
-      }
-      if (oldValue != null) {
-        field.dataType match {
-          case structType: StructType =>
-            val oldType = oldSchema(field.name).dataType.asInstanceOf[StructType]
-            val newValue = rewriteRecord(oldValue.asInstanceOf[InternalRow], oldType, structType)
-            newRow.update(pos, newValue)
-          case decimalType: DecimalType =>
-            val oldFieldSchema = oldSchema(field.name).dataType.asInstanceOf[DecimalType]
-            if (decimalType.scale != oldFieldSchema.scale || decimalType.precision != oldFieldSchema.precision) {
-              newRow.update(pos, Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-              )
-            } else {
-              newRow.update(pos, oldValue)
-            }
-          case t if t == oldType => newRow.update(pos, oldValue)
-          // Type promotion
-          case _: ShortType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toShort)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: IntegerType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toInt)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toInt)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: LongType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toLong)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toLong)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toLong)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: FloatType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toFloat)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toFloat)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toFloat)
-              case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toFloat)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: DoubleType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toDouble)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toDouble)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toDouble)
-              case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toDouble)
-              case _: FloatType => newRow.update(pos, oldValue.asInstanceOf[Float].toDouble)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: BinaryType if oldType.isInstanceOf[StringType] => newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
-          case _ => newRow.update(pos, oldValue)
-        }
-      } else {
-        // TODO default value in newSchema
-      }
-    }
+  // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe
+  private val unsafeProjectionThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType), UnsafeProjection]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType), UnsafeProjection]] {
+      override def get(): mutable.HashMap[(StructType, StructType), UnsafeProjection] =
+        new mutable.HashMap[(StructType, StructType), UnsafeProjection]
+    })
 
-    newRow
-  }
+  private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+  private val orderPosListMap = new ConcurrentHashMap[(StructType, String), Option[NestedFieldPath]]
 
   /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord, org.apache.avro.Schema, java.util.Map)
+   * Provides cached instance of [[UnsafeProjection]] transforming provided [[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * For more details regarding its semantic, please check corresponding scala-doc for
+   * [[HoodieCatalystExpressionUtils.generateUnsafeProjection]]
    */
-  def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: java.util.Map[String, String]): InternalRow = {
-    rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new java.util.LinkedList[String]).asInstanceOf[InternalRow]
+  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
+    unsafeProjectionThreadLocal.get()
+      .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
   }
 
   /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object, org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque)
+   * Provides cached instance of [[UnsafeRowWriter]] transforming provided [[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * Unlike [[UnsafeProjection]] requiring that [[from]] has to be a proper subset of [[to]] schema,
+   * [[UnsafeRowWriter]] is able to perform whole spectrum of schema-evolution transformations including:
+   *
+   * <ul>
+   *   <li>Transforming nested structs/maps/arrays</li>
+   *   <li>Handling type promotions (int -> long, etc)</li>
+   *   <li>Handling (field) renames</li>
+   * </ul>
    */
-  private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: java.util.Map[String, String], fieldNames: java.util.Deque[String]): Any = {
-    if (oldRecord == null) {
-      null
-    } else {
-      newSchema match {
-        case targetSchema: StructType =>
-          if (!oldRecord.isInstanceOf[InternalRow]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldRow = oldRecord.asInstanceOf[InternalRow]
-          val helper = mutable.Map[Integer, Any]()
-
-          val oldStrucType = oldSchema.asInstanceOf[StructType]
-          targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
-            fieldNames.push(field.name)
-            if (existField(oldStrucType, field.name)) {
-              val oldField = oldStrucType(field.name)
-              val oldPos = oldStrucType.fieldIndex(field.name)
-              helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-            } else {
-              val fieldFullName = createFullName(fieldNames)
-              val colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.")
-              val lastColNameFromOldSchema = colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
-              // deal with rename
-              if (!existField(oldStrucType, field.name) && existField(oldStrucType, lastColNameFromOldSchema)) {
-                // find rename
-                val oldField = oldStrucType(lastColNameFromOldSchema)
-                val oldPos = oldStrucType.fieldIndex(lastColNameFromOldSchema)
-                helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-              }
-            }
-            fieldNames.pop()
-          }
-          val newRow = new GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
-          targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
-            if (helper.contains(i)) {
-              newRow.update(i, helper(i))
-            } else {
-              // TODO add default val
-              newRow.update(i, null)
-            }
-          }
+  def getCachedUnsafeRowWriter(from: StructType, to: StructType, renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()): UnsafeRowWriter = {
+    unsafeWriterThreadLocal.get()
+      .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from, to, renamedColumnsMap))
+  }
 
-          newRow
-        case targetSchema: ArrayType =>
-          if (!oldRecord.isInstanceOf[ArrayData]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
-          val oldArray = oldRecord.asInstanceOf[ArrayData]
-          val newElementType = targetSchema.elementType
-          val newArray = new GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
-          fieldNames.push("element")
-          oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case (value, i) => newArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, newElementType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newArray
-        case targetSchema: MapType =>
-          if (!oldRecord.isInstanceOf[MapData]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldValueType = oldSchema.asInstanceOf[MapType].valueType
-          val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
-          val oldMap = oldRecord.asInstanceOf[MapData]
-          val newValueType = targetSchema.valueType
-          val newKeyArray = new GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newValueArray = new GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
-          fieldNames.push("value")
-          oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case (value, i) => newKeyArray.update(i, rewritePrimaryType(value, oldKeyType, oldKeyType)) }
-          oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { case (value, i) => newValueArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, newValueType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newMap
-        case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
-      }
+  def getCachedPosList(structType: StructType, field: String): Option[NestedFieldPath] = {
+    val nestedFieldPathOpt = orderPosListMap.get((structType, field))
+    // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path
+    if (nestedFieldPathOpt != null) {
+      nestedFieldPathOpt
+    } else {
+      orderPosListMap.computeIfAbsent((structType, field), new JFunction[(StructType, String), Option[NestedFieldPath]] {
+        override def apply(t: (StructType, String)): Option[NestedFieldPath] =
+          composeNestedFieldPath(structType, field)
+      })
     }
   }
 
-  def getCachedPosList(structType: StructType, field: String): NestedFieldPath = {
-    val schemaPair = (structType, field)
-    if (!orderPosListMap.containsKey(schemaPair)) {
-      val posList = HoodieUnsafeRowUtils.composeNestedFieldPath(structType, field)
-      orderPosListMap.put(schemaPair, posList)
+  def getCachedSchema(schema: Schema): StructType = {
+    val structType = schemaMap.get(schema)
+    // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path
+    if (structType != null) {
+      structType
+    } else {
+      schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
+        override def apply(t: Schema): StructType =
+          convertAvroSchemaToStructType(schema)
+      })
     }
-    orderPosListMap.get(schemaPair)
   }
 
-  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
-    val schemaPair = (from, to)
-    val map = unsafeProjectionThreadLocal.get()
-    if (!map.containsKey(schemaPair)) {
-      val projection = HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to)
-      map.put(schemaPair, projection)
+  private[sql] def genUnsafeRowWriter(prevSchema: StructType,
+                                      newSchema: StructType,
+                                      renamedColumnsMap: JMap[String, String]): UnsafeRowWriter = {
+    val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, new JArrayDeque[String]())
+    val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+    val phonyUpdater = new CatalystDataUpdater {
+      var value: InternalRow = _
+
+      override def set(ordinal: Int, value: Any): Unit =
+        this.value = value.asInstanceOf[InternalRow]
     }
-    map.get(schemaPair)
-  }
 
-  def getCachedSchema(schema: Schema): StructType = {
-    if (!schemaMap.containsKey(schema)) {
-      val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
-      schemaMap.put(schema, structType)
+    oldRow => {
+      writer(phonyUpdater, 0, oldRow)
+      unsafeProjection(phonyUpdater.value)
     }
-    schemaMap.get(schema)
   }
 
-  def existField(structType: StructType, name: String): Boolean = {
-    try {
-      HoodieUnsafeRowUtils.composeNestedFieldPath(structType, name)
-      true
-    } catch {
-      case _: IllegalArgumentException => false
+  private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+  private def genUnsafeStructWriter(prevStructType: StructType,
+                                    newStructType: StructType,
+                                    renamedColumnsMap: JMap[String, String],
+                                    fieldNamesStack: JDeque[String]): (CatalystDataUpdater, Any) => Unit = {
+    // TODO need to canonicalize schemas (casing)
+    val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+    val positionMap = ArrayBuffer.empty[Int]
+
+    for (newField <- newStructType.fields) {
+      fieldNamesStack.push(newField.name)
+
+      val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+        prevStructType.getFieldIndex(newField.name) match {
+          case Some(prevFieldPos) =>
+            val prevField = prevStructType(prevFieldPos)
+            (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+          case None =>
+            val newFieldQualifiedName = createFullName(fieldNamesStack)
+            val prevFieldName: String = lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+            // Handle rename
+            prevStructType.getFieldIndex(prevFieldName) match {
+              case Some(prevFieldPos) =>
+                val prevField = prevStructType.fields(prevFieldPos)
+                (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+              case None =>
+                val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => fieldUpdater.setNullAt(ordinal)
+                (updater, -1)
+            }
+        }
+
+      fieldWriters += fieldWriter
+      positionMap += prevFieldPos
+
+      fieldNamesStack.pop()
     }
-  }
 
-  private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = {
-    if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
-      oldSchema match {
-        case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | DateType | TimestampType | BinaryType =>
-          oldValue
-        // Copy UTF8String before putting into GenericInternalRow
-        case StringType => UTF8String.fromString(oldValue.toString)
-        case DecimalType() =>
-          Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-        case _ =>
-          throw new HoodieException("Unknown schema type: " + newSchema)
+    (fieldUpdater, row) => {
+      var pos = 0
+      while (pos < fieldWriters.length) {
+        val prevPos = positionMap(pos)
+        val prevValue = if (prevPos >= 0) {
+          row.asInstanceOf[InternalRow].get(prevPos, prevStructType.fields(prevPos).dataType)
+        } else {
+          null
+        }
+
+        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        pos += 1
       }
-    } else {
-      rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
     }
   }
 
-  private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
-    val value = newSchema match {
-      case NullType | BooleanType =>
-      case DateType if oldSchema.equals(StringType) =>
-        CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(oldValue.toString))
-      case LongType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].longValue())
+  private def newWriterRenaming(prevDataType: DataType,
+                                newDataType: DataType,
+                                renamedColumnsMap: JMap[String, String],
+                                fieldNameStack: JDeque[String]): RowFieldUpdater = {
+    (newDataType, prevDataType) match {
+      case (newType, prevType) if prevType == newType =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+      case (newStructType: StructType, prevStructType: StructType) =>
+        val writer = genUnsafeStructWriter(prevStructType, newStructType, renamedColumnsMap, fieldNameStack)
+
+        val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
+        val rowUpdater = new RowUpdater(newRow)
+
+        (fieldUpdater, ordinal, value) => {
+          // Here new row is built in 2 stages:
+          //    - First, we pass mutable row (used as buffer/scratchpad) created above wrapped into [[RowUpdater]]
+          //      into generated row-writer
+          //    - Upon returning from row-writer, we call back into parent row's [[fieldUpdater]] to set returned
+          //      row as a value in it
+          writer(rowUpdater, value)
+          fieldUpdater.set(ordinal, newRow)
+        }
+
+      case (ArrayType(newElementType, _), ArrayType(prevElementType, containsNull)) =>
+        fieldNameStack.push("element")
+        val elementWriter = newWriterRenaming(prevElementType, newElementType, renamedColumnsMap, fieldNameStack)
+        fieldNameStack.pop()
+
+        (fieldUpdater, ordinal, value) => {
+          val prevArrayData = value.asInstanceOf[ArrayData]
+          val prevArray = prevArrayData.toObjectArray(prevElementType)
+
+          val newArrayData = createArrayData(newElementType, prevArrayData.numElements())
+          val elementUpdater = new ArrayDataUpdater(newArrayData)
+
+          var i = 0
+          while (i < prevArray.length) {
+            val element = prevArray(i)
+            if (element == null) {
+              if (!containsNull) {
+                throw new HoodieException(
+                  s"Array value at path '${fieldNameStack.asScala.mkString(".")}' is not allowed to be null")
+              } else {
+                elementUpdater.setNullAt(i)
+              }
+            } else {
+              elementWriter(elementUpdater, i, element)
+            }
+            i += 1
+          }
+
+          fieldUpdater.set(ordinal, newArrayData)
+        }
+
+      case (MapType(_, newValueType, _), MapType(_, prevValueType, valueContainsNull)) =>
+        fieldNameStack.push("value")
+        val valueWriter = newWriterRenaming(prevValueType, newValueType, renamedColumnsMap, fieldNameStack)
+        fieldNameStack.pop()
+
+        (updater, ordinal, value) =>
+          val mapData = value.asInstanceOf[MapData]
+          val prevKeyArrayData = mapData.keyArray
+          val prevValueArrayData = mapData.valueArray
+          val prevValueArray = prevValueArrayData.toObjectArray(prevValueType)
+
+          val newValueArray = createArrayData(newValueType, mapData.numElements())
+          val valueUpdater = new ArrayDataUpdater(newValueArray)
+          var i = 0
+          while (i < prevValueArray.length) {
+            val value = prevValueArray(i)
+            if (value == null) {
+              if (!valueContainsNull) {
+                throw new HoodieException(s"Map value at path ${fieldNameStack.asScala.mkString(".")} is not allowed to be null")
+              } else {
+                valueUpdater.setNullAt(i)
+              }
+            } else {
+              valueWriter(valueUpdater, i, value)
+            }
+            i += 1
+          }
+
+          // NOTE: Key's couldn't be transformed and have to always be of [[StringType]]
+          updater.set(ordinal, new ArrayBasedMapData(prevKeyArrayData, newValueArray))
+
+      case (newDecimal: DecimalType, _) =>
+        prevDataType match {
+          case IntegerType | LongType | FloatType | DoubleType | StringType =>
+            (fieldUpdater, ordinal, value) =>
+              val scale = newDecimal.scale
+              // TODO this has to be revisited to avoid loss of precision (for fps)
+              fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(BigDecimal(value.toString).setScale(scale, ROUND_HALF_EVEN)))
+
+          case _: DecimalType =>
+            (fieldUpdater, ordinal, value) =>
+              fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(value.asInstanceOf[Decimal].toBigDecimal.setScale(newDecimal.scale)))
+
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case FloatType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].floatValue())
-          case LongType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].floatValue())
+
+      case (_: ShortType, _) =>
+        prevDataType match {
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setShort(ordinal, value.asInstanceOf[Byte].toShort)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case DoubleType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].doubleValue())
-          case LongType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].doubleValue())
-          case FloatType => CatalystTypeConverters.convertToCatalyst(java.lang.Double.valueOf(oldValue.asInstanceOf[Float] + ""))
+
+      case (_: IntegerType, _) =>
+        prevDataType match {
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Short].toInt)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Byte].toInt)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case BinaryType =>
-        oldSchema match {
-          case StringType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8))
+
+      case (_: LongType, _) =>
+        prevDataType match {
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Int].toLong)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Short].toLong)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Byte].toLong)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case StringType =>
-        oldSchema match {
-          case BinaryType => CatalystTypeConverters.convertToCatalyst(new String(oldValue.asInstanceOf[Array[Byte]]))
-          case DateType => CatalystTypeConverters.convertToCatalyst(toJavaDate(oldValue.asInstanceOf[Integer]).toString)
-          case IntegerType | LongType | FloatType | DoubleType | DecimalType() => CatalystTypeConverters.convertToCatalyst(oldValue.toString)
+
+      case (_: FloatType, _) =>
+        prevDataType match {
+          case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Long].toFloat)
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Int].toFloat)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Short].toFloat)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Byte].toFloat)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case DecimalType() =>
-        oldSchema match {
-          case IntegerType | LongType | FloatType | DoubleType | StringType =>
-            val scale = newSchema.asInstanceOf[DecimalType].scale
 
-            Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale))
+      case (_: DoubleType, _) =>
+        prevDataType match {
+          case _: FloatType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble)
+          case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Long].toDouble)
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Int].toDouble)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Short].toDouble)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Byte].toDouble)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case _ =>
-    }
-    if (value == None) {
-      throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
-    } else {
-      CatalystTypeConverters.convertToCatalyst(value)
+
+      case (_: BinaryType, _: StringType) =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value.asInstanceOf[UTF8String].getBytes)
+
+      // TODO revisit this (we need to align permitted casting w/ Spark)
+      // NOTE: This is supported to stay compatible w/ [[HoodieAvroUtils.rewriteRecordWithNewSchema]]
+      case (_: StringType, _) =>
+        prevDataType match {
+          case BinaryType => (fieldUpdater, ordinal, value) =>
+            fieldUpdater.set(ordinal, UTF8String.fromBytes(value.asInstanceOf[Array[Byte]]))
+          case DateType => (fieldUpdater, ordinal, value) =>
+            fieldUpdater.set(ordinal, UTF8String.fromString(toJavaDate(value.asInstanceOf[Integer]).toString))
+          case IntegerType | LongType | FloatType | DoubleType | _: DecimalType =>
+            (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, UTF8String.fromString(value.toString))
+
+          case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
+        }
+
+      case (DateType, StringType) =>
+        (fieldUpdater, ordinal, value) =>
+          fieldUpdater.set(ordinal, CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))
+
+      case (_, _) =>
+        throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
     }
   }
 
-  def removeFields(schema: StructType, fieldsToRemove: java.util.List[String]): StructType = {
-    StructType(schema.fields.filter(field => !fieldsToRemove.contains(field.name)))
+  private def lookupRenamedField(newFieldQualifiedName: String, renamedColumnsMap: JMap[String, String]) = {
+    val prevFieldQualifiedName = renamedColumnsMap.getOrDefault(newFieldQualifiedName, "")
+    val prevFieldQualifiedNameParts = prevFieldQualifiedName.split("\\.")
+    val prevFieldName = prevFieldQualifiedNameParts(prevFieldQualifiedNameParts.length - 1)
+
+    prevFieldName
   }
+
+  private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match {

Review Comment:
   These utility classes are borrowed from Spark's `AvroSerializer`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -416,7 +423,8 @@ private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType s
         getValue(structType, recordKeyPartitionPathFieldPair.getRight(), record.data).toString());
 
     HoodieOperation operation = withOperationField
-        ? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+        ? HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD)))

Review Comment:
   Actually, looked at it again and in that case `withOperationField` is true so this field has to be present in the schema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1090072656


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java:
##########
@@ -33,31 +33,47 @@
 
 public class ExecutorFactory {
 
-  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
+  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
                                                    Iterator<I> inputItr,
                                                    HoodieConsumer<O, E> consumer,
                                                    Function<I, O> transformFunction) {
-    return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop());
+    return create(config, inputItr, consumer, transformFunction, Functions.noop());
   }
 
-  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
+  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
                                                    Iterator<I> inputItr,
                                                    HoodieConsumer<O, E> consumer,
                                                    Function<I, O> transformFunction,
                                                    Runnable preExecuteRunnable) {
-    ExecutorType executorType = hoodieConfig.getExecutorType();
-
+    ExecutorType executorType = config.getExecutorType();
     switch (executorType) {
       case BOUNDED_IN_MEMORY:
-        return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
+        return new BoundedInMemoryExecutor<>(config.getWriteBufferLimitBytes(), inputItr, consumer,
             transformFunction, preExecuteRunnable);
       case DISRUPTOR:
-        return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer,
-            transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
+        return new DisruptorExecutor<>(config.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer,
+            transformFunction, config.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
       case SIMPLE:
         return new SimpleExecutor<>(inputItr, consumer, transformFunction);
       default:
         throw new HoodieException("Unsupported Executor Type " + executorType);
     }
   }
+
+  /**
+   * Checks whether configured {@link HoodieExecutor} buffer records (for ex, by holding them
+   * in the queue)
+   */
+  public static boolean isBufferingRecords(HoodieWriteConfig config) {

Review Comment:
   why not make this a property of ExecutorType? so we don't need this extra helper



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java:
##########
@@ -234,4 +233,18 @@ protected final IndexedRecord readRecordPayload(Kryo kryo, Input input) {
 
     return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer);
   }
+
+  static void updateMetadataValuesInternal(GenericRecord avroRecord, MetadataValues metadataValues) {

Review Comment:
   this looks like a helper method that fits in some avro utils



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -18,304 +18,413 @@
 
 package org.apache.spark.sql
 
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
 import org.apache.avro.Schema
 import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
 import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
 import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections, Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
 
 object HoodieInternalRowUtils {
 
-  // Projection are all thread local. Projection is not thread-safe
-  val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType, StructType), UnsafeProjection]] =
-    ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType), UnsafeProjection]] {
-      override def get(): HashMap[(StructType, StructType), UnsafeProjection] = new HashMap[(StructType, StructType), UnsafeProjection]
+  private type RenamedColumnMap = JMap[String, String]
+  private type UnsafeRowWriter = InternalRow => UnsafeRow
+
+  // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe
+  private val unsafeWriterThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] {
+      override def get(): mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter] =
+        new mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]
     })
-  val schemaMap = new ConcurrentHashMap[Schema, StructType]
-  val orderPosListMap = new ConcurrentHashMap[(StructType, String), NestedFieldPath]
 
-  /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
-   */
-  def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = {
-    val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
-
-    for ((field, pos) <- newSchema.fields.zipWithIndex) {
-      var oldValue: AnyRef = null
-      var oldType: DataType = null
-      if (existField(oldSchema, field.name)) {
-        val oldField = oldSchema(field.name)
-        val oldPos = oldSchema.fieldIndex(field.name)
-        oldType = oldField.dataType
-        oldValue = oldRecord.get(oldPos, oldType)
-      }
-      if (oldValue != null) {
-        field.dataType match {
-          case structType: StructType =>
-            val oldType = oldSchema(field.name).dataType.asInstanceOf[StructType]
-            val newValue = rewriteRecord(oldValue.asInstanceOf[InternalRow], oldType, structType)
-            newRow.update(pos, newValue)
-          case decimalType: DecimalType =>
-            val oldFieldSchema = oldSchema(field.name).dataType.asInstanceOf[DecimalType]
-            if (decimalType.scale != oldFieldSchema.scale || decimalType.precision != oldFieldSchema.precision) {
-              newRow.update(pos, Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-              )
-            } else {
-              newRow.update(pos, oldValue)
-            }
-          case t if t == oldType => newRow.update(pos, oldValue)
-          // Type promotion
-          case _: ShortType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toShort)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: IntegerType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toInt)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toInt)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: LongType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toLong)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toLong)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toLong)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: FloatType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toFloat)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toFloat)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toFloat)
-              case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toFloat)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: DoubleType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toDouble)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toDouble)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toDouble)
-              case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toDouble)
-              case _: FloatType => newRow.update(pos, oldValue.asInstanceOf[Float].toDouble)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: BinaryType if oldType.isInstanceOf[StringType] => newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
-          case _ => newRow.update(pos, oldValue)
-        }
-      } else {
-        // TODO default value in newSchema
-      }
-    }
+  // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe
+  private val unsafeProjectionThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType), UnsafeProjection]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType), UnsafeProjection]] {
+      override def get(): mutable.HashMap[(StructType, StructType), UnsafeProjection] =
+        new mutable.HashMap[(StructType, StructType), UnsafeProjection]
+    })
 
-    newRow
-  }
+  private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+  private val orderPosListMap = new ConcurrentHashMap[(StructType, String), Option[NestedFieldPath]]
 
   /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord, org.apache.avro.Schema, java.util.Map)
+   * Provides cached instance of [[UnsafeProjection]] transforming provided [[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * For more details regarding its semantic, please check corresponding scala-doc for
+   * [[HoodieCatalystExpressionUtils.generateUnsafeProjection]]
    */
-  def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: java.util.Map[String, String]): InternalRow = {
-    rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new java.util.LinkedList[String]).asInstanceOf[InternalRow]
+  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
+    unsafeProjectionThreadLocal.get()
+      .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
   }
 
   /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object, org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque)
+   * Provides cached instance of [[UnsafeRowWriter]] transforming provided [[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * Unlike [[UnsafeProjection]] requiring that [[from]] has to be a proper subset of [[to]] schema,
+   * [[UnsafeRowWriter]] is able to perform whole spectrum of schema-evolution transformations including:
+   *
+   * <ul>
+   *   <li>Transforming nested structs/maps/arrays</li>
+   *   <li>Handling type promotions (int -> long, etc)</li>
+   *   <li>Handling (field) renames</li>
+   * </ul>
    */
-  private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: java.util.Map[String, String], fieldNames: java.util.Deque[String]): Any = {
-    if (oldRecord == null) {
-      null
-    } else {
-      newSchema match {
-        case targetSchema: StructType =>
-          if (!oldRecord.isInstanceOf[InternalRow]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldRow = oldRecord.asInstanceOf[InternalRow]
-          val helper = mutable.Map[Integer, Any]()
-
-          val oldStrucType = oldSchema.asInstanceOf[StructType]
-          targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
-            fieldNames.push(field.name)
-            if (existField(oldStrucType, field.name)) {
-              val oldField = oldStrucType(field.name)
-              val oldPos = oldStrucType.fieldIndex(field.name)
-              helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-            } else {
-              val fieldFullName = createFullName(fieldNames)
-              val colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.")
-              val lastColNameFromOldSchema = colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
-              // deal with rename
-              if (!existField(oldStrucType, field.name) && existField(oldStrucType, lastColNameFromOldSchema)) {
-                // find rename
-                val oldField = oldStrucType(lastColNameFromOldSchema)
-                val oldPos = oldStrucType.fieldIndex(lastColNameFromOldSchema)
-                helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-              }
-            }
-            fieldNames.pop()
-          }
-          val newRow = new GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
-          targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
-            if (helper.contains(i)) {
-              newRow.update(i, helper(i))
-            } else {
-              // TODO add default val
-              newRow.update(i, null)
-            }
-          }
+  def getCachedUnsafeRowWriter(from: StructType, to: StructType, renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()): UnsafeRowWriter = {
+    unsafeWriterThreadLocal.get()
+      .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from, to, renamedColumnsMap))
+  }
 
-          newRow
-        case targetSchema: ArrayType =>
-          if (!oldRecord.isInstanceOf[ArrayData]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
-          val oldArray = oldRecord.asInstanceOf[ArrayData]
-          val newElementType = targetSchema.elementType
-          val newArray = new GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
-          fieldNames.push("element")
-          oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case (value, i) => newArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, newElementType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newArray
-        case targetSchema: MapType =>
-          if (!oldRecord.isInstanceOf[MapData]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldValueType = oldSchema.asInstanceOf[MapType].valueType
-          val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
-          val oldMap = oldRecord.asInstanceOf[MapData]
-          val newValueType = targetSchema.valueType
-          val newKeyArray = new GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newValueArray = new GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
-          fieldNames.push("value")
-          oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case (value, i) => newKeyArray.update(i, rewritePrimaryType(value, oldKeyType, oldKeyType)) }
-          oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { case (value, i) => newValueArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, newValueType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newMap
-        case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
-      }
+  def getCachedPosList(structType: StructType, field: String): Option[NestedFieldPath] = {
+    val nestedFieldPathOpt = orderPosListMap.get((structType, field))
+    // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path
+    if (nestedFieldPathOpt != null) {
+      nestedFieldPathOpt
+    } else {
+      orderPosListMap.computeIfAbsent((structType, field), new JFunction[(StructType, String), Option[NestedFieldPath]] {
+        override def apply(t: (StructType, String)): Option[NestedFieldPath] =
+          composeNestedFieldPath(structType, field)
+      })
     }
   }
 
-  def getCachedPosList(structType: StructType, field: String): NestedFieldPath = {
-    val schemaPair = (structType, field)
-    if (!orderPosListMap.containsKey(schemaPair)) {
-      val posList = HoodieUnsafeRowUtils.composeNestedFieldPath(structType, field)
-      orderPosListMap.put(schemaPair, posList)
+  def getCachedSchema(schema: Schema): StructType = {
+    val structType = schemaMap.get(schema)
+    // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path
+    if (structType != null) {
+      structType
+    } else {
+      schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
+        override def apply(t: Schema): StructType =
+          convertAvroSchemaToStructType(schema)
+      })
     }
-    orderPosListMap.get(schemaPair)
   }
 
-  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
-    val schemaPair = (from, to)
-    val map = unsafeProjectionThreadLocal.get()
-    if (!map.containsKey(schemaPair)) {
-      val projection = HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to)
-      map.put(schemaPair, projection)
+  private[sql] def genUnsafeRowWriter(prevSchema: StructType,
+                                      newSchema: StructType,
+                                      renamedColumnsMap: JMap[String, String]): UnsafeRowWriter = {
+    val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, new JArrayDeque[String]())
+    val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+    val phonyUpdater = new CatalystDataUpdater {
+      var value: InternalRow = _
+
+      override def set(ordinal: Int, value: Any): Unit =
+        this.value = value.asInstanceOf[InternalRow]
     }
-    map.get(schemaPair)
-  }
 
-  def getCachedSchema(schema: Schema): StructType = {
-    if (!schemaMap.containsKey(schema)) {
-      val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
-      schemaMap.put(schema, structType)
+    oldRow => {
+      writer(phonyUpdater, 0, oldRow)
+      unsafeProjection(phonyUpdater.value)
     }
-    schemaMap.get(schema)
   }
 
-  def existField(structType: StructType, name: String): Boolean = {
-    try {
-      HoodieUnsafeRowUtils.composeNestedFieldPath(structType, name)
-      true
-    } catch {
-      case _: IllegalArgumentException => false
+  private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+  private def genUnsafeStructWriter(prevStructType: StructType,
+                                    newStructType: StructType,
+                                    renamedColumnsMap: JMap[String, String],
+                                    fieldNamesStack: JDeque[String]): (CatalystDataUpdater, Any) => Unit = {
+    // TODO need to canonicalize schemas (casing)
+    val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+    val positionMap = ArrayBuffer.empty[Int]
+
+    for (newField <- newStructType.fields) {
+      fieldNamesStack.push(newField.name)
+
+      val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+        prevStructType.getFieldIndex(newField.name) match {
+          case Some(prevFieldPos) =>
+            val prevField = prevStructType(prevFieldPos)
+            (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+          case None =>
+            val newFieldQualifiedName = createFullName(fieldNamesStack)
+            val prevFieldName: String = lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+            // Handle rename
+            prevStructType.getFieldIndex(prevFieldName) match {
+              case Some(prevFieldPos) =>
+                val prevField = prevStructType.fields(prevFieldPos)
+                (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+              case None =>
+                val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => fieldUpdater.setNullAt(ordinal)
+                (updater, -1)
+            }
+        }
+
+      fieldWriters += fieldWriter
+      positionMap += prevFieldPos
+
+      fieldNamesStack.pop()
     }
-  }
 
-  private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = {
-    if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
-      oldSchema match {
-        case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | DateType | TimestampType | BinaryType =>
-          oldValue
-        // Copy UTF8String before putting into GenericInternalRow
-        case StringType => UTF8String.fromString(oldValue.toString)
-        case DecimalType() =>
-          Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-        case _ =>
-          throw new HoodieException("Unknown schema type: " + newSchema)
+    (fieldUpdater, row) => {
+      var pos = 0
+      while (pos < fieldWriters.length) {
+        val prevPos = positionMap(pos)
+        val prevValue = if (prevPos >= 0) {
+          row.asInstanceOf[InternalRow].get(prevPos, prevStructType.fields(prevPos).dataType)
+        } else {
+          null
+        }
+
+        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        pos += 1
       }
-    } else {
-      rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
     }
   }
 
-  private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
-    val value = newSchema match {
-      case NullType | BooleanType =>
-      case DateType if oldSchema.equals(StringType) =>
-        CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(oldValue.toString))
-      case LongType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].longValue())
+  private def newWriterRenaming(prevDataType: DataType,
+                                newDataType: DataType,
+                                renamedColumnsMap: JMap[String, String],
+                                fieldNameStack: JDeque[String]): RowFieldUpdater = {
+    (newDataType, prevDataType) match {
+      case (newType, prevType) if prevType == newType =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+      case (newStructType: StructType, prevStructType: StructType) =>
+        val writer = genUnsafeStructWriter(prevStructType, newStructType, renamedColumnsMap, fieldNameStack)
+
+        val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
+        val rowUpdater = new RowUpdater(newRow)
+
+        (fieldUpdater, ordinal, value) => {
+          // Here new row is built in 2 stages:
+          //    - First, we pass mutable row (used as buffer/scratchpad) created above wrapped into [[RowUpdater]]
+          //      into generated row-writer
+          //    - Upon returning from row-writer, we call back into parent row's [[fieldUpdater]] to set returned
+          //      row as a value in it
+          writer(rowUpdater, value)
+          fieldUpdater.set(ordinal, newRow)
+        }
+
+      case (ArrayType(newElementType, _), ArrayType(prevElementType, containsNull)) =>
+        fieldNameStack.push("element")
+        val elementWriter = newWriterRenaming(prevElementType, newElementType, renamedColumnsMap, fieldNameStack)
+        fieldNameStack.pop()
+
+        (fieldUpdater, ordinal, value) => {
+          val prevArrayData = value.asInstanceOf[ArrayData]
+          val prevArray = prevArrayData.toObjectArray(prevElementType)
+
+          val newArrayData = createArrayData(newElementType, prevArrayData.numElements())
+          val elementUpdater = new ArrayDataUpdater(newArrayData)
+
+          var i = 0
+          while (i < prevArray.length) {
+            val element = prevArray(i)
+            if (element == null) {
+              if (!containsNull) {
+                throw new HoodieException(
+                  s"Array value at path '${fieldNameStack.asScala.mkString(".")}' is not allowed to be null")
+              } else {
+                elementUpdater.setNullAt(i)
+              }
+            } else {
+              elementWriter(elementUpdater, i, element)
+            }
+            i += 1
+          }
+
+          fieldUpdater.set(ordinal, newArrayData)
+        }
+
+      case (MapType(_, newValueType, _), MapType(_, prevValueType, valueContainsNull)) =>
+        fieldNameStack.push("value")
+        val valueWriter = newWriterRenaming(prevValueType, newValueType, renamedColumnsMap, fieldNameStack)
+        fieldNameStack.pop()
+
+        (updater, ordinal, value) =>
+          val mapData = value.asInstanceOf[MapData]
+          val prevKeyArrayData = mapData.keyArray
+          val prevValueArrayData = mapData.valueArray
+          val prevValueArray = prevValueArrayData.toObjectArray(prevValueType)
+
+          val newValueArray = createArrayData(newValueType, mapData.numElements())
+          val valueUpdater = new ArrayDataUpdater(newValueArray)
+          var i = 0
+          while (i < prevValueArray.length) {
+            val value = prevValueArray(i)
+            if (value == null) {
+              if (!valueContainsNull) {
+                throw new HoodieException(s"Map value at path ${fieldNameStack.asScala.mkString(".")} is not allowed to be null")
+              } else {
+                valueUpdater.setNullAt(i)
+              }
+            } else {
+              valueWriter(valueUpdater, i, value)
+            }
+            i += 1
+          }
+
+          // NOTE: Key's couldn't be transformed and have to always be of [[StringType]]
+          updater.set(ordinal, new ArrayBasedMapData(prevKeyArrayData, newValueArray))
+
+      case (newDecimal: DecimalType, _) =>
+        prevDataType match {
+          case IntegerType | LongType | FloatType | DoubleType | StringType =>
+            (fieldUpdater, ordinal, value) =>
+              val scale = newDecimal.scale
+              // TODO this has to be revisited to avoid loss of precision (for fps)
+              fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(BigDecimal(value.toString).setScale(scale, ROUND_HALF_EVEN)))
+
+          case _: DecimalType =>
+            (fieldUpdater, ordinal, value) =>
+              fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(value.asInstanceOf[Decimal].toBigDecimal.setScale(newDecimal.scale)))
+
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case FloatType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].floatValue())
-          case LongType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].floatValue())
+
+      case (_: ShortType, _) =>
+        prevDataType match {
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setShort(ordinal, value.asInstanceOf[Byte].toShort)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case DoubleType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].doubleValue())
-          case LongType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].doubleValue())
-          case FloatType => CatalystTypeConverters.convertToCatalyst(java.lang.Double.valueOf(oldValue.asInstanceOf[Float] + ""))
+
+      case (_: IntegerType, _) =>
+        prevDataType match {
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Short].toInt)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Byte].toInt)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case BinaryType =>
-        oldSchema match {
-          case StringType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8))
+
+      case (_: LongType, _) =>
+        prevDataType match {
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Int].toLong)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Short].toLong)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Byte].toLong)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case StringType =>
-        oldSchema match {
-          case BinaryType => CatalystTypeConverters.convertToCatalyst(new String(oldValue.asInstanceOf[Array[Byte]]))
-          case DateType => CatalystTypeConverters.convertToCatalyst(toJavaDate(oldValue.asInstanceOf[Integer]).toString)
-          case IntegerType | LongType | FloatType | DoubleType | DecimalType() => CatalystTypeConverters.convertToCatalyst(oldValue.toString)
+
+      case (_: FloatType, _) =>
+        prevDataType match {
+          case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Long].toFloat)
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Int].toFloat)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Short].toFloat)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Byte].toFloat)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case DecimalType() =>
-        oldSchema match {
-          case IntegerType | LongType | FloatType | DoubleType | StringType =>
-            val scale = newSchema.asInstanceOf[DecimalType].scale
 
-            Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale))
+      case (_: DoubleType, _) =>
+        prevDataType match {
+          case _: FloatType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble)
+          case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Long].toDouble)
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Int].toDouble)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Short].toDouble)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Byte].toDouble)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case _ =>
-    }
-    if (value == None) {
-      throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
-    } else {
-      CatalystTypeConverters.convertToCatalyst(value)
+
+      case (_: BinaryType, _: StringType) =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value.asInstanceOf[UTF8String].getBytes)
+
+      // TODO revisit this (we need to align permitted casting w/ Spark)
+      // NOTE: This is supported to stay compatible w/ [[HoodieAvroUtils.rewriteRecordWithNewSchema]]
+      case (_: StringType, _) =>
+        prevDataType match {
+          case BinaryType => (fieldUpdater, ordinal, value) =>
+            fieldUpdater.set(ordinal, UTF8String.fromBytes(value.asInstanceOf[Array[Byte]]))
+          case DateType => (fieldUpdater, ordinal, value) =>
+            fieldUpdater.set(ordinal, UTF8String.fromString(toJavaDate(value.asInstanceOf[Integer]).toString))
+          case IntegerType | LongType | FloatType | DoubleType | _: DecimalType =>
+            (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, UTF8String.fromString(value.toString))
+
+          case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
+        }
+
+      case (DateType, StringType) =>
+        (fieldUpdater, ordinal, value) =>
+          fieldUpdater.set(ordinal, CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))
+
+      case (_, _) =>
+        throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
     }
   }
 
-  def removeFields(schema: StructType, fieldsToRemove: java.util.List[String]): StructType = {
-    StructType(schema.fields.filter(field => !fieldsToRemove.contains(field.name)))
+  private def lookupRenamedField(newFieldQualifiedName: String, renamedColumnsMap: JMap[String, String]) = {
+    val prevFieldQualifiedName = renamedColumnsMap.getOrDefault(newFieldQualifiedName, "")
+    val prevFieldQualifiedNameParts = prevFieldQualifiedName.split("\\.")
+    val prevFieldName = prevFieldQualifiedNameParts(prevFieldQualifiedNameParts.length - 1)
+
+    prevFieldName
   }
+
+  private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match {

Review Comment:
   are these the same across different spark versions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407596287

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 36a0d9aeab63f713dff106ed9a76411aceb900b2 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704) 
   * 84962c128b8dd4f52cc7179d60a4af0efee3de3c Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407280738

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ece5561859923b8773d6ff9fa633f014c104300 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688) 
   * 80d38554649038cb9e668be4edc3a3c0a2c4373f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1409586306

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14784",
       "triggerID" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774) 
   * 9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5 UNKNOWN
   * 62f1095775d8c3effa93175cdcc6423fca9a3968 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14784) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1409843852

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14784",
       "triggerID" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "triggerType" : "PUSH"
     }, {
       "hash" : "325244765016f67034fd8f364942028fe217ecb5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14788",
       "triggerID" : "325244765016f67034fd8f364942028fe217ecb5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "24020a964671b35fb9aa7b86748771fd71512495",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14800",
       "triggerID" : "24020a964671b35fb9aa7b86748771fd71512495",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5 UNKNOWN
   * 325244765016f67034fd8f364942028fe217ecb5 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14788) 
   * 24020a964671b35fb9aa7b86748771fd71512495 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14800) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1408694766

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750) 
   * ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1409773643

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14784",
       "triggerID" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "triggerType" : "PUSH"
     }, {
       "hash" : "325244765016f67034fd8f364942028fe217ecb5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14788",
       "triggerID" : "325244765016f67034fd8f364942028fe217ecb5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "24020a964671b35fb9aa7b86748771fd71512495",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "24020a964671b35fb9aa7b86748771fd71512495",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5 UNKNOWN
   * 325244765016f67034fd8f364942028fe217ecb5 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14788) 
   * 24020a964671b35fb9aa7b86748771fd71512495 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1090888180


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java:
##########
@@ -33,31 +33,47 @@
 
 public class ExecutorFactory {
 
-  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
+  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
                                                    Iterator<I> inputItr,
                                                    HoodieConsumer<O, E> consumer,
                                                    Function<I, O> transformFunction) {
-    return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop());
+    return create(config, inputItr, consumer, transformFunction, Functions.noop());
   }
 
-  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
+  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
                                                    Iterator<I> inputItr,
                                                    HoodieConsumer<O, E> consumer,
                                                    Function<I, O> transformFunction,
                                                    Runnable preExecuteRunnable) {
-    ExecutorType executorType = hoodieConfig.getExecutorType();
-
+    ExecutorType executorType = config.getExecutorType();
     switch (executorType) {
       case BOUNDED_IN_MEMORY:
-        return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
+        return new BoundedInMemoryExecutor<>(config.getWriteBufferLimitBytes(), inputItr, consumer,
             transformFunction, preExecuteRunnable);
       case DISRUPTOR:
-        return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer,
-            transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
+        return new DisruptorExecutor<>(config.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer,
+            transformFunction, config.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
       case SIMPLE:
         return new SimpleExecutor<>(inputItr, consumer, transformFunction);
       default:
         throw new HoodieException("Unsupported Executor Type " + executorType);
     }
   }
+
+  /**
+   * Checks whether configured {@link HoodieExecutor} buffer records (for ex, by holding them
+   * in the queue)
+   */
+  public static boolean isBufferingRecords(HoodieWriteConfig config) {

Review Comment:
   Good call. Will address in a follow-up (to avoid re-running CI)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] wzx140 commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "wzx140 (via GitHub)" <gi...@apache.org>.
wzx140 commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089861698


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -308,14 +175,351 @@ object HoodieInternalRowUtils {
         }
       case _ =>
     }
+
     if (value == None) {
       throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
     } else {
       CatalystTypeConverters.convertToCatalyst(value)
     }
   }
+   */
+  
+  private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+  private def genUnsafeRowWriterRenaming(prevStructType: StructType, newStructType: StructType, renamedColumnsMap: JMap[String, String], fieldNames: JDeque[String]): (CatalystDataUpdater, Any) => Unit = {
+    // TODO need to canonicalize schemas (casing)
+    val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+    val positionMap = ArrayBuffer.empty[Int]
+
+    for (newField <- newStructType.fields) {
+      fieldNames.push(newField.name)
+
+      val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+        prevStructType.getFieldIndex(newField.name) match {
+          case Some(prevFieldPos) =>
+            val prevField = prevStructType(prevFieldPos)
+            (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNames), prevFieldPos)
+
+          case None =>
+            val newFieldQualifiedName = createFullName(fieldNames)
+            val prevFieldName: String = lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+            // Handle rename
+            prevStructType.getFieldIndex(prevFieldName) match {
+              case Some(prevFieldPos) =>
+                val prevField = prevStructType.fields(prevFieldPos)
+                (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNames), prevFieldPos)
+
+              case None =>
+                // TODO handle defaults
+                val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => fieldUpdater.setNullAt(ordinal)
+                (updater, -1)
+            }
+        }
+
+      fieldWriters += fieldWriter
+      positionMap += prevFieldPos
+
+      fieldNames.pop()
+    }
+
+    (fieldUpdater, row) => {
+      var pos = 0
+      while (pos < fieldWriters.length) {
+        val prevPos = positionMap(pos)
+        val prevValue = if (prevPos >= 0) {
+          row.asInstanceOf[InternalRow].get(prevPos, prevStructType.fields(prevPos).dataType)
+        } else {
+          null
+        }
+
+        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        pos += 1
+      }
+    }
+  }
+
+  private def newWriterRenaming(prevDataType: DataType,
+                                newDataType: DataType,
+                                renamedColumnsMap: JMap[String, String],
+                                fieldNames: JDeque[String]): RowFieldUpdater = {
+    (prevDataType, newDataType) match {
+      case (prevType, newType) if prevType == newType =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+      case (prevStructType: StructType, newStructType: StructType) =>
+        val writer = genUnsafeRowWriterRenaming(prevStructType, newStructType, renamedColumnsMap, fieldNames)
+        val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
+        val rowUpdater = new RowUpdater(newRow)
+
+        (fieldUpdater, ordinal, value) => {
+          // TODO elaborate

Review Comment:
   What does elaborate mean? Need more comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1409576601

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774) 
   * 9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407779151

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6940a386344827d47ff9c9da1d81306e10021025 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747) 
   * 1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] wzx140 commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "wzx140 (via GitHub)" <gi...@apache.org>.
wzx140 commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089871282


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -206,9 +211,10 @@ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties p
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType, newStructType, renameCols);
-    UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType, newStructType).apply(rewriteRecord);
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, newStructType, Collections.emptyMap());

Review Comment:
   miss renameCols



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407824547

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407595039

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 36a0d9aeab63f713dff106ed9a76411aceb900b2 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704) 
   * 84962c128b8dd4f52cc7179d60a4af0efee3de3c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407719936

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 84962c128b8dd4f52cc7179d60a4af0efee3de3c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741) 
   * 6940a386344827d47ff9c9da1d81306e10021025 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089682826


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1078,28 +1049,28 @@ object HoodieSparkSqlWriter {
           }
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
-        // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
         val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
         val dataFileStructType = HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
         val writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema)
         val sourceStructType = df.schema
-        df.queryExecution.toRdd.mapPartitions { iter =>
 
-          iter.map { internalRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(internalRow, sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, sourceStructType)
+        df.queryExecution.toRdd.mapPartitions { it =>
+          // TODO elaborate

Review Comment:
   please elaborate. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -1078,28 +1049,28 @@ object HoodieSparkSqlWriter {
           }
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
-        // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
         val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
         val dataFileStructType = HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
         val writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema)
         val sourceStructType = df.schema
-        df.queryExecution.toRdd.mapPartitions { iter =>
 
-          iter.map { internalRow =>
-            val recordKey = sparkKeyGenerator.getRecordKey(internalRow, sourceStructType)
-            val partitionPath = sparkKeyGenerator.getPartitionPath(internalRow, sourceStructType)
+        df.queryExecution.toRdd.mapPartitions { it =>
+          // TODO elaborate
+          val (unsafeProjection, transformer) = if (shouldDropPartitionColumns) {
+            (generateUnsafeProjection(dataFileStructType, dataFileStructType), genUnsafeRowWriter(sourceStructType, dataFileStructType))

Review Comment:
   nice optimization



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieInternalRowUtils.scala:
##########
@@ -65,25 +61,32 @@ class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAf
   }
 
   test("test rewrite") {
-    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18, "like1", 181)))
+    val rows = Seq(
+      Row("Andrew", 18, Row("Mission st", "SF"), "John", 19)
+    )
+    val data = sparkSession.sparkContext.parallelize(rows)
     val oldRow = sparkSession.createDataFrame(data, schemaMerge).queryExecution.toRdd.first()
-    val newRow1 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema1)
-    val newRow2 = HoodieInternalRowUtils.rewriteRecord(oldRow, schemaMerge, schema2)
-    assert(newRow1.get(0, StringType).toString.equals("like"))
-    assert(newRow1.get(1, IntegerType) == 18)
-    assert(newRow2.get(0, StringType).toString.equals("like1"))
-    assert(newRow2.get(1, IntegerType) == 181)
+
+    val rowWriter1 = HoodieInternalRowUtils.genUnsafeRowWriter(schemaMerge, schema1)
+    val newRow1 = rowWriter1(oldRow)
+
+    val serDe1 = sparkAdapter.createSparkRowSerDe(schema1)
+    assertEquals(serDe1.deserializeRow(newRow1), Row("Andrew", 18, Row("Mission st", "SF")));
+
+    val rowWriter2 = HoodieInternalRowUtils.genUnsafeRowWriter(schemaMerge, schema2)
+    val newRow2 = rowWriter2(oldRow)
+
+    val serDe2 = sparkAdapter.createSparkRowSerDe(schema2)
+    assertEquals(serDe2.deserializeRow(newRow2), Row("John", 19));
   }
 
   test("test rewrite with nullable value") {
-    val data = sparkSession.sparkContext.parallelize(Seq(Row("like", 18)))
+    val data = sparkSession.sparkContext.parallelize(Seq(Row("Rob", 18, null.asInstanceOf[StructType])))
     val oldRow = sparkSession.createDataFrame(data, schema1).queryExecution.toRdd.first()
-    val newRow = HoodieInternalRowUtils.rewriteRecord(oldRow, schema1, schemaMerge)
-    assert(newRow.get(0, StringType).toString.equals("like"))
-    assert(newRow.get(1, IntegerType) == 18)
-    assert(newRow.get(2, StringType) == null)
-    assert(newRow.get(3, IntegerType) == null)
-  }
-
+    val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(schema1, schemaMerge)
+    val newRow = rowWriter(oldRow)
 
+    val serDe = sparkAdapter.createSparkRowSerDe(schemaMerge)

Review Comment:
   let's test all data types as much as possible (all primitives, arrays, maps etc). 
   also, lets test some null values for some of the fields.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1090883188


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -190,9 +194,10 @@ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
-    UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType).apply(rewriteRecord);
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, targetStructType, Collections.emptyMap());

Review Comment:
   Not sure i follow your train of thought -- there's no more `rewriteRecord` method, instead it's being replaced w/ `prependMetaFields`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1091314159


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java:
##########
@@ -33,31 +33,47 @@
 
 public class ExecutorFactory {
 
-  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
+  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
                                                    Iterator<I> inputItr,
                                                    HoodieConsumer<O, E> consumer,
                                                    Function<I, O> transformFunction) {
-    return create(hoodieConfig, inputItr, consumer, transformFunction, Functions.noop());
+    return create(config, inputItr, consumer, transformFunction, Functions.noop());
   }
 
-  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig hoodieConfig,
+  public static <I, O, E> HoodieExecutor<E> create(HoodieWriteConfig config,
                                                    Iterator<I> inputItr,
                                                    HoodieConsumer<O, E> consumer,
                                                    Function<I, O> transformFunction,
                                                    Runnable preExecuteRunnable) {
-    ExecutorType executorType = hoodieConfig.getExecutorType();
-
+    ExecutorType executorType = config.getExecutorType();
     switch (executorType) {
       case BOUNDED_IN_MEMORY:
-        return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
+        return new BoundedInMemoryExecutor<>(config.getWriteBufferLimitBytes(), inputItr, consumer,
             transformFunction, preExecuteRunnable);
       case DISRUPTOR:
-        return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer,
-            transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
+        return new DisruptorExecutor<>(config.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer,
+            transformFunction, config.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
       case SIMPLE:
         return new SimpleExecutor<>(inputItr, consumer, transformFunction);
       default:
         throw new HoodieException("Unsupported Executor Type " + executorType);
     }
   }
+
+  /**
+   * Checks whether configured {@link HoodieExecutor} buffer records (for ex, by holding them
+   * in the queue)
+   */
+  public static boolean isBufferingRecords(HoodieWriteConfig config) {

Review Comment:
   I actually realized that this is not possible unfortunately: we're copying in transformers which we pass as args to ctor of the respective Executor, therefore we can't just call a method on it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407377934

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 36a0d9aeab63f713dff106ed9a76411aceb900b2 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] minihippo commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "minihippo (via GitHub)" <gi...@apache.org>.
minihippo commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089748637


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -416,7 +423,8 @@ private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType s
         getValue(structType, recordKeyPartitionPathFieldPair.getRight(), record.data).toString());
 
     HoodieOperation operation = withOperationField
-        ? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+        ? HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD)))

Review Comment:
   `getNullableValAsString` considers the situation that field does not exist, But `structType.fieldIndex` not



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1408988030

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1409762705

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14784",
       "triggerID" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "triggerType" : "PUSH"
     }, {
       "hash" : "325244765016f67034fd8f364942028fe217ecb5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14788",
       "triggerID" : "325244765016f67034fd8f364942028fe217ecb5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5 UNKNOWN
   * 325244765016f67034fd8f364942028fe217ecb5 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14788) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1408706660

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750) 
   * ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1090889041


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java:
##########
@@ -234,4 +233,18 @@ protected final IndexedRecord readRecordPayload(Kryo kryo, Input input) {
 
     return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer);
   }
+
+  static void updateMetadataValuesInternal(GenericRecord avroRecord, MetadataValues metadataValues) {

Review Comment:
   It's very specific in its purpose though -- it overwrites meta-fields that shouldn't occur outside of `HoodieRecord`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1409635192

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14750",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774",
       "triggerID" : "ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14784",
       "triggerID" : "62f1095775d8c3effa93175cdcc6423fca9a3968",
       "triggerType" : "PUSH"
     }, {
       "hash" : "325244765016f67034fd8f364942028fe217ecb5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "325244765016f67034fd8f364942028fe217ecb5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ca3b5fa7a3a5252dbd9acdf5c3e92934e477d10c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14774) 
   * 9bfa20f45fcc675b79053bb8b4f379b09c6cd6c5 UNKNOWN
   * 62f1095775d8c3effa93175cdcc6423fca9a3968 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14784) 
   * 325244765016f67034fd8f364942028fe217ecb5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407721531

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 84962c128b8dd4f52cc7179d60a4af0efee3de3c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741) 
   * 6940a386344827d47ff9c9da1d81306e10021025 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1091305177


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java:
##########
@@ -62,31 +71,29 @@ Schema getAvroSchema(Path sourceFilePath) throws IOException {
   }
 
   @Override
-  void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
-                        Path sourceFilePath, KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) throws Exception {
+  protected void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
+                                  Path sourceFilePath,
+                                  KeyGeneratorInterface keyGenerator,
+                                  String partitionPath,
+                                  Schema schema) throws Exception {
     BoundedInMemoryExecutor<HoodieRecord, HoodieRecord, Void> wrapper = null;
-    HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(table.getConfig().getRecordMerger().getRecordType())
+    HoodieRecordMerger recordMerger = table.getConfig().getRecordMerger();
+
+    HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(recordMerger.getRecordType())
             .getFileReader(table.getHadoopConf(), sourceFilePath);
     try {
+      Function<HoodieRecord, HoodieRecord> transformer = record -> {
+        String recordKey = record.getRecordKey(schema, Option.of(keyGenerator));
+        return createNewMetadataBootstrapRecord(recordKey, partitionPath, recordMerger.getRecordType())

Review Comment:
   Creating `createNewMetadataBootstrapRecord` is the crux of the change here: 
    - Now metadata bootstrap record is properly initialized with schema including all of the meta-fields and not the one truncated to just record-key (HoodieSparkRecord is not able to handle such truncated meta-fields schema)
    - Avro path is restored to what it was before RFC-46



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java:
##########
@@ -62,10 +61,10 @@ public BootstrapWriteStatus runMetadataBootstrap(String srcPartitionPath, String
           .map(HoodieAvroUtils::getRootLevelFieldName)
           .collect(Collectors.toList());
       Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
-      LOG.info("Schema to be used for reading record Keys :" + recordKeySchema);

Review Comment:
   This are now properly set by actual FileReaders



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin merged pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin merged PR #7769:
URL: https://github.com/apache/hudi/pull/7769


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407278862

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ece5561859923b8773d6ff9fa633f014c104300 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407282031

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ece5561859923b8773d6ff9fa633f014c104300 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688) 
   * 80d38554649038cb9e668be4edc3a3c0a2c4373f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] wzx140 commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "wzx140 (via GitHub)" <gi...@apache.org>.
wzx140 commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089861400


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -139,20 +139,25 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
         recordSchema = isPureProjection ? writerSchema : readerSchema;
       }
 
+      boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig);
+
       wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
-        // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
-        //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
-        //       it since these records will be put into queue of QueueBasedExecutorFactory.
+        HoodieRecord newRecord;
         if (shouldRewriteInWriterSchema) {
           try {
-            return record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema).copy();
+            newRecord = record.rewriteRecordWithNewSchema(recordSchema, writeConfig.getProps(), writerSchema);
           } catch (IOException e) {
             LOG.error("Error rewrite record with new schema", e);
             throw new HoodieException(e);
           }
         } else {
-          return record.copy();
+          newRecord = record;
         }
+
+        // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
+        //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
+        //       it since these records will be put into queue of QueueBasedExecutorFactory.
+        return isBufferingRecords ? newRecord.copy() : newRecord;

Review Comment:
   :thumbsup:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1090883889


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -18,304 +18,413 @@
 
 package org.apache.spark.sql
 
-import java.nio.charset.StandardCharsets
-import java.util.HashMap
-import java.util.concurrent.ConcurrentHashMap
 import org.apache.avro.Schema
 import org.apache.hbase.thirdparty.com.google.common.base.Supplier
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
 import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
 import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, composeNestedFieldPath}
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types.Decimal.ROUND_HALF_EVEN
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+
+import java.util.concurrent.ConcurrentHashMap
+import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections, Deque => JDeque, Map => JMap}
+import java.util.function.{Function => JFunction}
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
 
 object HoodieInternalRowUtils {
 
-  // Projection are all thread local. Projection is not thread-safe
-  val unsafeProjectionThreadLocal: ThreadLocal[HashMap[(StructType, StructType), UnsafeProjection]] =
-    ThreadLocal.withInitial(new Supplier[HashMap[(StructType, StructType), UnsafeProjection]] {
-      override def get(): HashMap[(StructType, StructType), UnsafeProjection] = new HashMap[(StructType, StructType), UnsafeProjection]
+  private type RenamedColumnMap = JMap[String, String]
+  private type UnsafeRowWriter = InternalRow => UnsafeRow
+
+  // NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe
+  private val unsafeWriterThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]] {
+      override def get(): mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter] =
+        new mutable.HashMap[(StructType, StructType, RenamedColumnMap), UnsafeRowWriter]
     })
-  val schemaMap = new ConcurrentHashMap[Schema, StructType]
-  val orderPosListMap = new ConcurrentHashMap[(StructType, String), NestedFieldPath]
 
-  /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecord(org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
-   */
-  def rewriteRecord(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType): InternalRow = {
-    val newRow = new GenericInternalRow(Array.fill(newSchema.fields.length)(null).asInstanceOf[Array[Any]])
-
-    for ((field, pos) <- newSchema.fields.zipWithIndex) {
-      var oldValue: AnyRef = null
-      var oldType: DataType = null
-      if (existField(oldSchema, field.name)) {
-        val oldField = oldSchema(field.name)
-        val oldPos = oldSchema.fieldIndex(field.name)
-        oldType = oldField.dataType
-        oldValue = oldRecord.get(oldPos, oldType)
-      }
-      if (oldValue != null) {
-        field.dataType match {
-          case structType: StructType =>
-            val oldType = oldSchema(field.name).dataType.asInstanceOf[StructType]
-            val newValue = rewriteRecord(oldValue.asInstanceOf[InternalRow], oldType, structType)
-            newRow.update(pos, newValue)
-          case decimalType: DecimalType =>
-            val oldFieldSchema = oldSchema(field.name).dataType.asInstanceOf[DecimalType]
-            if (decimalType.scale != oldFieldSchema.scale || decimalType.precision != oldFieldSchema.precision) {
-              newRow.update(pos, Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-              )
-            } else {
-              newRow.update(pos, oldValue)
-            }
-          case t if t == oldType => newRow.update(pos, oldValue)
-          // Type promotion
-          case _: ShortType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toShort)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: IntegerType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toInt)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toInt)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: LongType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toLong)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toLong)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toLong)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: FloatType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toFloat)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toFloat)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toFloat)
-              case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toFloat)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: DoubleType =>
-            oldType match {
-              case _: ByteType => newRow.update(pos, oldValue.asInstanceOf[Byte].toDouble)
-              case _: ShortType => newRow.update(pos, oldValue.asInstanceOf[Short].toDouble)
-              case _: IntegerType => newRow.update(pos, oldValue.asInstanceOf[Int].toDouble)
-              case _: LongType => newRow.update(pos, oldValue.asInstanceOf[Long].toDouble)
-              case _: FloatType => newRow.update(pos, oldValue.asInstanceOf[Float].toDouble)
-              case _ => throw new IllegalArgumentException(s"$oldSchema and $newSchema are incompatible")
-            }
-          case _: BinaryType if oldType.isInstanceOf[StringType] => newRow.update(pos, oldValue.asInstanceOf[String].getBytes)
-          case _ => newRow.update(pos, oldValue)
-        }
-      } else {
-        // TODO default value in newSchema
-      }
-    }
+  // NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]] since these are not thread-safe
+  private val unsafeProjectionThreadLocal: ThreadLocal[mutable.HashMap[(StructType, StructType), UnsafeProjection]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType, StructType), UnsafeProjection]] {
+      override def get(): mutable.HashMap[(StructType, StructType), UnsafeProjection] =
+        new mutable.HashMap[(StructType, StructType), UnsafeProjection]
+    })
 
-    newRow
-  }
+  private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+  private val orderPosListMap = new ConcurrentHashMap[(StructType, String), Option[NestedFieldPath]]
 
   /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(org.apache.avro.generic.IndexedRecord, org.apache.avro.Schema, java.util.Map)
+   * Provides cached instance of [[UnsafeProjection]] transforming provided [[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * For more details regarding its semantic, please check corresponding scala-doc for
+   * [[HoodieCatalystExpressionUtils.generateUnsafeProjection]]
    */
-  def rewriteRecordWithNewSchema(oldRecord: InternalRow, oldSchema: StructType, newSchema: StructType, renameCols: java.util.Map[String, String]): InternalRow = {
-    rewriteRecordWithNewSchema(oldRecord, oldSchema, newSchema, renameCols, new java.util.LinkedList[String]).asInstanceOf[InternalRow]
+  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
+    unsafeProjectionThreadLocal.get()
+      .getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
   }
 
   /**
-   * @see org.apache.hudi.avro.HoodieAvroUtils#rewriteRecordWithNewSchema(java.lang.Object, org.apache.avro.Schema, org.apache.avro.Schema, java.util.Map, java.util.Deque)
+   * Provides cached instance of [[UnsafeRowWriter]] transforming provided [[InternalRow]]s from
+   * one [[StructType]] and into another [[StructType]]
+   *
+   * Unlike [[UnsafeProjection]] requiring that [[from]] has to be a proper subset of [[to]] schema,
+   * [[UnsafeRowWriter]] is able to perform whole spectrum of schema-evolution transformations including:
+   *
+   * <ul>
+   *   <li>Transforming nested structs/maps/arrays</li>
+   *   <li>Handling type promotions (int -> long, etc)</li>
+   *   <li>Handling (field) renames</li>
+   * </ul>
    */
-  private def rewriteRecordWithNewSchema(oldRecord: Any, oldSchema: DataType, newSchema: DataType, renameCols: java.util.Map[String, String], fieldNames: java.util.Deque[String]): Any = {
-    if (oldRecord == null) {
-      null
-    } else {
-      newSchema match {
-        case targetSchema: StructType =>
-          if (!oldRecord.isInstanceOf[InternalRow]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldRow = oldRecord.asInstanceOf[InternalRow]
-          val helper = mutable.Map[Integer, Any]()
-
-          val oldStrucType = oldSchema.asInstanceOf[StructType]
-          targetSchema.fields.zipWithIndex.foreach { case (field, i) =>
-            fieldNames.push(field.name)
-            if (existField(oldStrucType, field.name)) {
-              val oldField = oldStrucType(field.name)
-              val oldPos = oldStrucType.fieldIndex(field.name)
-              helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-            } else {
-              val fieldFullName = createFullName(fieldNames)
-              val colNamePartsFromOldSchema = renameCols.getOrDefault(fieldFullName, "").split("\\.")
-              val lastColNameFromOldSchema = colNamePartsFromOldSchema(colNamePartsFromOldSchema.length - 1)
-              // deal with rename
-              if (!existField(oldStrucType, field.name) && existField(oldStrucType, lastColNameFromOldSchema)) {
-                // find rename
-                val oldField = oldStrucType(lastColNameFromOldSchema)
-                val oldPos = oldStrucType.fieldIndex(lastColNameFromOldSchema)
-                helper(i) = rewriteRecordWithNewSchema(oldRow.get(oldPos, oldField.dataType), oldField.dataType, field.dataType, renameCols, fieldNames)
-              }
-            }
-            fieldNames.pop()
-          }
-          val newRow = new GenericInternalRow(Array.fill(targetSchema.length)(null).asInstanceOf[Array[Any]])
-          targetSchema.fields.zipWithIndex.foreach { case (_, i) =>
-            if (helper.contains(i)) {
-              newRow.update(i, helper(i))
-            } else {
-              // TODO add default val
-              newRow.update(i, null)
-            }
-          }
+  def getCachedUnsafeRowWriter(from: StructType, to: StructType, renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()): UnsafeRowWriter = {
+    unsafeWriterThreadLocal.get()
+      .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from, to, renamedColumnsMap))
+  }
 
-          newRow
-        case targetSchema: ArrayType =>
-          if (!oldRecord.isInstanceOf[ArrayData]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldElementType = oldSchema.asInstanceOf[ArrayType].elementType
-          val oldArray = oldRecord.asInstanceOf[ArrayData]
-          val newElementType = targetSchema.elementType
-          val newArray = new GenericArrayData(Array.fill(oldArray.numElements())(null).asInstanceOf[Array[Any]])
-          fieldNames.push("element")
-          oldArray.toSeq[Any](oldElementType).zipWithIndex.foreach { case (value, i) => newArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldElementType, newElementType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newArray
-        case targetSchema: MapType =>
-          if (!oldRecord.isInstanceOf[MapData]) {
-            throw new IllegalArgumentException("cannot rewrite record with different type")
-          }
-          val oldValueType = oldSchema.asInstanceOf[MapType].valueType
-          val oldKeyType = oldSchema.asInstanceOf[MapType].keyType
-          val oldMap = oldRecord.asInstanceOf[MapData]
-          val newValueType = targetSchema.valueType
-          val newKeyArray = new GenericArrayData(Array.fill(oldMap.keyArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newValueArray = new GenericArrayData(Array.fill(oldMap.valueArray().numElements())(null).asInstanceOf[Array[Any]])
-          val newMap = new ArrayBasedMapData(newKeyArray, newValueArray)
-          fieldNames.push("value")
-          oldMap.keyArray().toSeq[Any](oldKeyType).zipWithIndex.foreach { case (value, i) => newKeyArray.update(i, rewritePrimaryType(value, oldKeyType, oldKeyType)) }
-          oldMap.valueArray().toSeq[Any](oldValueType).zipWithIndex.foreach { case (value, i) => newValueArray.update(i, rewriteRecordWithNewSchema(value.asInstanceOf[AnyRef], oldValueType, newValueType, renameCols, fieldNames)) }
-          fieldNames.pop()
-
-          newMap
-        case _ => rewritePrimaryType(oldRecord, oldSchema, newSchema)
-      }
+  def getCachedPosList(structType: StructType, field: String): Option[NestedFieldPath] = {
+    val nestedFieldPathOpt = orderPosListMap.get((structType, field))
+    // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path
+    if (nestedFieldPathOpt != null) {
+      nestedFieldPathOpt
+    } else {
+      orderPosListMap.computeIfAbsent((structType, field), new JFunction[(StructType, String), Option[NestedFieldPath]] {
+        override def apply(t: (StructType, String)): Option[NestedFieldPath] =
+          composeNestedFieldPath(structType, field)
+      })
     }
   }
 
-  def getCachedPosList(structType: StructType, field: String): NestedFieldPath = {
-    val schemaPair = (structType, field)
-    if (!orderPosListMap.containsKey(schemaPair)) {
-      val posList = HoodieUnsafeRowUtils.composeNestedFieldPath(structType, field)
-      orderPosListMap.put(schemaPair, posList)
+  def getCachedSchema(schema: Schema): StructType = {
+    val structType = schemaMap.get(schema)
+    // NOTE: This specifically designed to do 2 lookups (in case of cache-miss) to avoid
+    //       allocating the closure when using [[computeIfAbsent]] on more frequent cache-hit path
+    if (structType != null) {
+      structType
+    } else {
+      schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
+        override def apply(t: Schema): StructType =
+          convertAvroSchemaToStructType(schema)
+      })
     }
-    orderPosListMap.get(schemaPair)
   }
 
-  def getCachedUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
-    val schemaPair = (from, to)
-    val map = unsafeProjectionThreadLocal.get()
-    if (!map.containsKey(schemaPair)) {
-      val projection = HoodieCatalystExpressionUtils.generateUnsafeProjection(from, to)
-      map.put(schemaPair, projection)
+  private[sql] def genUnsafeRowWriter(prevSchema: StructType,
+                                      newSchema: StructType,
+                                      renamedColumnsMap: JMap[String, String]): UnsafeRowWriter = {
+    val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, new JArrayDeque[String]())
+    val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+    val phonyUpdater = new CatalystDataUpdater {
+      var value: InternalRow = _
+
+      override def set(ordinal: Int, value: Any): Unit =
+        this.value = value.asInstanceOf[InternalRow]
     }
-    map.get(schemaPair)
-  }
 
-  def getCachedSchema(schema: Schema): StructType = {
-    if (!schemaMap.containsKey(schema)) {
-      val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
-      schemaMap.put(schema, structType)
+    oldRow => {
+      writer(phonyUpdater, 0, oldRow)
+      unsafeProjection(phonyUpdater.value)
     }
-    schemaMap.get(schema)
   }
 
-  def existField(structType: StructType, name: String): Boolean = {
-    try {
-      HoodieUnsafeRowUtils.composeNestedFieldPath(structType, name)
-      true
-    } catch {
-      case _: IllegalArgumentException => false
+  private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+  private def genUnsafeStructWriter(prevStructType: StructType,
+                                    newStructType: StructType,
+                                    renamedColumnsMap: JMap[String, String],
+                                    fieldNamesStack: JDeque[String]): (CatalystDataUpdater, Any) => Unit = {
+    // TODO need to canonicalize schemas (casing)
+    val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+    val positionMap = ArrayBuffer.empty[Int]
+
+    for (newField <- newStructType.fields) {
+      fieldNamesStack.push(newField.name)
+
+      val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+        prevStructType.getFieldIndex(newField.name) match {
+          case Some(prevFieldPos) =>
+            val prevField = prevStructType(prevFieldPos)
+            (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+          case None =>
+            val newFieldQualifiedName = createFullName(fieldNamesStack)
+            val prevFieldName: String = lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+            // Handle rename
+            prevStructType.getFieldIndex(prevFieldName) match {
+              case Some(prevFieldPos) =>
+                val prevField = prevStructType.fields(prevFieldPos)
+                (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNamesStack), prevFieldPos)
+
+              case None =>
+                val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => fieldUpdater.setNullAt(ordinal)
+                (updater, -1)
+            }
+        }
+
+      fieldWriters += fieldWriter
+      positionMap += prevFieldPos
+
+      fieldNamesStack.pop()
     }
-  }
 
-  private def rewritePrimaryType(oldValue: Any, oldSchema: DataType, newSchema: DataType) = {
-    if (oldSchema.equals(newSchema) || (oldSchema.isInstanceOf[DecimalType] && newSchema.isInstanceOf[DecimalType])) {
-      oldSchema match {
-        case NullType | BooleanType | IntegerType | LongType | FloatType | DoubleType | DateType | TimestampType | BinaryType =>
-          oldValue
-        // Copy UTF8String before putting into GenericInternalRow
-        case StringType => UTF8String.fromString(oldValue.toString)
-        case DecimalType() =>
-          Decimal.fromDecimal(oldValue.asInstanceOf[Decimal].toBigDecimal.setScale(newSchema.asInstanceOf[DecimalType].scale))
-        case _ =>
-          throw new HoodieException("Unknown schema type: " + newSchema)
+    (fieldUpdater, row) => {
+      var pos = 0
+      while (pos < fieldWriters.length) {
+        val prevPos = positionMap(pos)
+        val prevValue = if (prevPos >= 0) {
+          row.asInstanceOf[InternalRow].get(prevPos, prevStructType.fields(prevPos).dataType)
+        } else {
+          null
+        }
+
+        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        pos += 1
       }
-    } else {
-      rewritePrimaryTypeWithDiffSchemaType(oldValue, oldSchema, newSchema)
     }
   }
 
-  private def rewritePrimaryTypeWithDiffSchemaType(oldValue: Any, oldSchema: DataType, newSchema: DataType): Any = {
-    val value = newSchema match {
-      case NullType | BooleanType =>
-      case DateType if oldSchema.equals(StringType) =>
-        CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(oldValue.toString))
-      case LongType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].longValue())
+  private def newWriterRenaming(prevDataType: DataType,
+                                newDataType: DataType,
+                                renamedColumnsMap: JMap[String, String],
+                                fieldNameStack: JDeque[String]): RowFieldUpdater = {
+    (newDataType, prevDataType) match {
+      case (newType, prevType) if prevType == newType =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+      case (newStructType: StructType, prevStructType: StructType) =>
+        val writer = genUnsafeStructWriter(prevStructType, newStructType, renamedColumnsMap, fieldNameStack)
+
+        val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
+        val rowUpdater = new RowUpdater(newRow)
+
+        (fieldUpdater, ordinal, value) => {
+          // Here new row is built in 2 stages:
+          //    - First, we pass mutable row (used as buffer/scratchpad) created above wrapped into [[RowUpdater]]
+          //      into generated row-writer
+          //    - Upon returning from row-writer, we call back into parent row's [[fieldUpdater]] to set returned
+          //      row as a value in it
+          writer(rowUpdater, value)
+          fieldUpdater.set(ordinal, newRow)
+        }
+
+      case (ArrayType(newElementType, _), ArrayType(prevElementType, containsNull)) =>
+        fieldNameStack.push("element")
+        val elementWriter = newWriterRenaming(prevElementType, newElementType, renamedColumnsMap, fieldNameStack)
+        fieldNameStack.pop()
+
+        (fieldUpdater, ordinal, value) => {
+          val prevArrayData = value.asInstanceOf[ArrayData]
+          val prevArray = prevArrayData.toObjectArray(prevElementType)
+
+          val newArrayData = createArrayData(newElementType, prevArrayData.numElements())
+          val elementUpdater = new ArrayDataUpdater(newArrayData)
+
+          var i = 0
+          while (i < prevArray.length) {
+            val element = prevArray(i)
+            if (element == null) {
+              if (!containsNull) {
+                throw new HoodieException(
+                  s"Array value at path '${fieldNameStack.asScala.mkString(".")}' is not allowed to be null")
+              } else {
+                elementUpdater.setNullAt(i)
+              }
+            } else {
+              elementWriter(elementUpdater, i, element)
+            }
+            i += 1
+          }
+
+          fieldUpdater.set(ordinal, newArrayData)
+        }
+
+      case (MapType(_, newValueType, _), MapType(_, prevValueType, valueContainsNull)) =>
+        fieldNameStack.push("value")
+        val valueWriter = newWriterRenaming(prevValueType, newValueType, renamedColumnsMap, fieldNameStack)
+        fieldNameStack.pop()
+
+        (updater, ordinal, value) =>
+          val mapData = value.asInstanceOf[MapData]
+          val prevKeyArrayData = mapData.keyArray
+          val prevValueArrayData = mapData.valueArray
+          val prevValueArray = prevValueArrayData.toObjectArray(prevValueType)
+
+          val newValueArray = createArrayData(newValueType, mapData.numElements())
+          val valueUpdater = new ArrayDataUpdater(newValueArray)
+          var i = 0
+          while (i < prevValueArray.length) {
+            val value = prevValueArray(i)
+            if (value == null) {
+              if (!valueContainsNull) {
+                throw new HoodieException(s"Map value at path ${fieldNameStack.asScala.mkString(".")} is not allowed to be null")
+              } else {
+                valueUpdater.setNullAt(i)
+              }
+            } else {
+              valueWriter(valueUpdater, i, value)
+            }
+            i += 1
+          }
+
+          // NOTE: Key's couldn't be transformed and have to always be of [[StringType]]
+          updater.set(ordinal, new ArrayBasedMapData(prevKeyArrayData, newValueArray))
+
+      case (newDecimal: DecimalType, _) =>
+        prevDataType match {
+          case IntegerType | LongType | FloatType | DoubleType | StringType =>
+            (fieldUpdater, ordinal, value) =>
+              val scale = newDecimal.scale
+              // TODO this has to be revisited to avoid loss of precision (for fps)
+              fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(BigDecimal(value.toString).setScale(scale, ROUND_HALF_EVEN)))
+
+          case _: DecimalType =>
+            (fieldUpdater, ordinal, value) =>
+              fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(value.asInstanceOf[Decimal].toBigDecimal.setScale(newDecimal.scale)))
+
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case FloatType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].floatValue())
-          case LongType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].floatValue())
+
+      case (_: ShortType, _) =>
+        prevDataType match {
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setShort(ordinal, value.asInstanceOf[Byte].toShort)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case DoubleType =>
-        oldSchema match {
-          case IntegerType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Int].doubleValue())
-          case LongType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[Long].doubleValue())
-          case FloatType => CatalystTypeConverters.convertToCatalyst(java.lang.Double.valueOf(oldValue.asInstanceOf[Float] + ""))
+
+      case (_: IntegerType, _) =>
+        prevDataType match {
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Short].toInt)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Byte].toInt)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case BinaryType =>
-        oldSchema match {
-          case StringType => CatalystTypeConverters.convertToCatalyst(oldValue.asInstanceOf[String].getBytes(StandardCharsets.UTF_8))
+
+      case (_: LongType, _) =>
+        prevDataType match {
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Int].toLong)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Short].toLong)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Byte].toLong)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case StringType =>
-        oldSchema match {
-          case BinaryType => CatalystTypeConverters.convertToCatalyst(new String(oldValue.asInstanceOf[Array[Byte]]))
-          case DateType => CatalystTypeConverters.convertToCatalyst(toJavaDate(oldValue.asInstanceOf[Integer]).toString)
-          case IntegerType | LongType | FloatType | DoubleType | DecimalType() => CatalystTypeConverters.convertToCatalyst(oldValue.toString)
+
+      case (_: FloatType, _) =>
+        prevDataType match {
+          case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Long].toFloat)
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Int].toFloat)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Short].toFloat)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Byte].toFloat)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case DecimalType() =>
-        oldSchema match {
-          case IntegerType | LongType | FloatType | DoubleType | StringType =>
-            val scale = newSchema.asInstanceOf[DecimalType].scale
 
-            Decimal.fromDecimal(BigDecimal(oldValue.toString).setScale(scale))
+      case (_: DoubleType, _) =>
+        prevDataType match {
+          case _: FloatType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble)
+          case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Long].toDouble)
+          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Int].toDouble)
+          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Short].toDouble)
+          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Byte].toDouble)
           case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
         }
-      case _ =>
-    }
-    if (value == None) {
-      throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
-    } else {
-      CatalystTypeConverters.convertToCatalyst(value)
+
+      case (_: BinaryType, _: StringType) =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value.asInstanceOf[UTF8String].getBytes)
+
+      // TODO revisit this (we need to align permitted casting w/ Spark)
+      // NOTE: This is supported to stay compatible w/ [[HoodieAvroUtils.rewriteRecordWithNewSchema]]
+      case (_: StringType, _) =>
+        prevDataType match {
+          case BinaryType => (fieldUpdater, ordinal, value) =>
+            fieldUpdater.set(ordinal, UTF8String.fromBytes(value.asInstanceOf[Array[Byte]]))
+          case DateType => (fieldUpdater, ordinal, value) =>
+            fieldUpdater.set(ordinal, UTF8String.fromString(toJavaDate(value.asInstanceOf[Integer]).toString))
+          case IntegerType | LongType | FloatType | DoubleType | _: DecimalType =>
+            (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, UTF8String.fromString(value.toString))
+
+          case _ =>
+            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
+        }
+
+      case (DateType, StringType) =>
+        (fieldUpdater, ordinal, value) =>
+          fieldUpdater.set(ordinal, CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))
+
+      case (_, _) =>
+        throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
     }
   }
 
-  def removeFields(schema: StructType, fieldsToRemove: java.util.List[String]): StructType = {
-    StructType(schema.fields.filter(field => !fieldsToRemove.contains(field.name)))
+  private def lookupRenamedField(newFieldQualifiedName: String, renamedColumnsMap: JMap[String, String]) = {
+    val prevFieldQualifiedName = renamedColumnsMap.getOrDefault(newFieldQualifiedName, "")
+    val prevFieldQualifiedNameParts = prevFieldQualifiedName.split("\\.")
+    val prevFieldName = prevFieldQualifiedNameParts(prevFieldQualifiedNameParts.length - 1)
+
+    prevFieldName
   }
+
+  private def createArrayData(elementType: DataType, length: Int): ArrayData = elementType match {

Review Comment:
   Yes, they 1:1 w/ Spark type system



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1091455819


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -159,35 +153,8 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
               queryTypeAndInstantEndpts.getRight().getRight()));
     }
 
-    /*
-     * log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());
-     *
-     * StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema
-     * = newSchema.add(field, DataTypes.StringType, true); }
-     *
-     * /** Validates if the commit time is sane and also generates Partition fields from _hoodie_partition_path if
-     * configured
-     *
-     * Dataset<Row> validated = source.map((MapFunction<Row, Row>) (Row row) -> { // _hoodie_instant_time String
-     * instantTime = row.getString(0); IncrSourceHelper.validateInstantTime(row, instantTime, instantEndpts.getKey(),
-     * instantEndpts.getValue()); if (!partitionFields.isEmpty()) { // _hoodie_partition_path String hoodiePartitionPath
-     * = row.getString(3); List<Object> partitionVals =
-     * extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream() .map(o -> (Object)
-     * o).collect(Collectors.toList()); ValidationUtils.checkArgument(partitionVals.size() == partitionFields.size(),
-     * "#partition-fields != #partition-values-extracted"); List<Object> rowObjs = new
-     * ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); rowObjs.addAll(partitionVals); return
-     * RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema));
-     *
-     * log.info("Validated Source Schema :" + validated.schema());
-     */
-    boolean dropAllMetaFields = props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE,
-        Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE);
-
-    // Remove Hoodie meta columns except partition path from input source
-    String[] colsToDrop = dropAllMetaFields ? HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
-        HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
-    final Dataset<Row> src = source.drop(colsToDrop);
-    // log.info("Final Schema from Source is :" + src.schema());
+    // Remove Hoodie meta columns
+    final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new));

Review Comment:
   Change here is to avoid keeping partition-path as this will make `HoodieSparkSqlWriter` treat it as data column which is not compatible w/ `SparkRecordMerger`
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -159,35 +153,8 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
               queryTypeAndInstantEndpts.getRight().getRight()));
     }
 
-    /*
-     * log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());

Review Comment:
   Cleaning up dead commented code (not updated since 2018)



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -159,35 +153,8 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
               queryTypeAndInstantEndpts.getRight().getRight()));
     }
 
-    /*
-     * log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());
-     *
-     * StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema
-     * = newSchema.add(field, DataTypes.StringType, true); }
-     *
-     * /** Validates if the commit time is sane and also generates Partition fields from _hoodie_partition_path if
-     * configured
-     *
-     * Dataset<Row> validated = source.map((MapFunction<Row, Row>) (Row row) -> { // _hoodie_instant_time String
-     * instantTime = row.getString(0); IncrSourceHelper.validateInstantTime(row, instantTime, instantEndpts.getKey(),
-     * instantEndpts.getValue()); if (!partitionFields.isEmpty()) { // _hoodie_partition_path String hoodiePartitionPath
-     * = row.getString(3); List<Object> partitionVals =
-     * extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream() .map(o -> (Object)
-     * o).collect(Collectors.toList()); ValidationUtils.checkArgument(partitionVals.size() == partitionFields.size(),
-     * "#partition-fields != #partition-values-extracted"); List<Object> rowObjs = new
-     * ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); rowObjs.addAll(partitionVals); return
-     * RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema));
-     *
-     * log.info("Validated Source Schema :" + validated.schema());
-     */
-    boolean dropAllMetaFields = props.getBoolean(Config.HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE,
-        Config.DEFAULT_HOODIE_DROP_ALL_META_FIELDS_FROM_SOURCE);
-
-    // Remove Hoodie meta columns except partition path from input source
-    String[] colsToDrop = dropAllMetaFields ? HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new) :
-        HoodieRecord.HOODIE_META_COLUMNS.stream().filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new);
-    final Dataset<Row> src = source.drop(colsToDrop);
-    // log.info("Final Schema from Source is :" + src.schema());
+    // Remove Hoodie meta columns
+    final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream().toArray(String[]::new));

Review Comment:
   `_hoodie_partition_path` isn't used neither in the source or DS and according to the commented out code it's been used previously but is not used anymore.
   
   #7132 recently added config that forces all of the meta-fields to be cleaned up, but it's false by default



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407297918

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0ece5561859923b8773d6ff9fa633f014c104300 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688) 
   * 80d38554649038cb9e668be4edc3a3c0a2c4373f Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089880703


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -236,29 +236,33 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
       // If the format can not record the operation field, nullify the DELETE payload manually.
       boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField();
       recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord));
-      Option<HoodieRecord> finalRecord = nullifyPayload ? Option.empty() : Option.of(hoodieRecord);
+
+      Option<HoodieRecord> finalRecordOpt = nullifyPayload ? Option.empty() : Option.of(hoodieRecord);
       // Check for delete
-      if (finalRecord.isPresent() && !finalRecord.get().isDelete(schema, recordProperties)) {
-        // Check for ignore ExpressionPayload
-        if (finalRecord.get().shouldIgnore(schema, recordProperties)) {
-          return finalRecord;
+      if (finalRecordOpt.isPresent() && !finalRecordOpt.get().isDelete(schema, recordProperties)) {
+        HoodieRecord finalRecord = finalRecordOpt.get();
+        // Check if the record should be ignored (special case for [[ExpressionPayload]])
+        if (finalRecord.shouldIgnore(schema, recordProperties)) {
+          return finalRecordOpt;
         }
-        // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
-        HoodieRecord rewrittenRecord = schemaOnReadEnabled ? finalRecord.get().rewriteRecordWithNewSchema(schema, recordProperties, writeSchemaWithMetaFields)
-            : finalRecord.get().rewriteRecord(schema, recordProperties, writeSchemaWithMetaFields);
+
+        // Prepend meta-fields into the record

Review Comment:
   This is primary change in this file: instead of the sequence:
     - `rewriteRecord`/`rewriteRecordWithNewSchema` (rewriting record into schema bearing meta-fields)
     - `updateMetadataValues`
   
   we call directly `prependMetaFields` API (expanding record's schema w/ meta-fields and setting them at the same time)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] wzx140 commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "wzx140 (via GitHub)" <gi...@apache.org>.
wzx140 commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089897145


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -190,9 +194,10 @@ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
-    UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType).apply(rewriteRecord);
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, targetStructType, Collections.emptyMap());

Review Comment:
   We should support avro type promotion in this function in HoodieSparkRecord.  We have discussed it before in https://github.com/apache/hudi/pull/7003.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407631370

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 84962c128b8dd4f52cc7179d60a4af0efee3de3c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7769:
URL: https://github.com/apache/hudi/pull/7769#issuecomment-1407777763

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14688",
       "triggerID" : "0ece5561859923b8773d6ff9fa633f014c104300",
       "triggerType" : "PUSH"
     }, {
       "hash" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14693",
       "triggerID" : "80d38554649038cb9e668be4edc3a3c0a2c4373f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14704",
       "triggerID" : "36a0d9aeab63f713dff106ed9a76411aceb900b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741",
       "triggerID" : "84962c128b8dd4f52cc7179d60a4af0efee3de3c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6940a386344827d47ff9c9da1d81306e10021025",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747",
       "triggerID" : "6940a386344827d47ff9c9da1d81306e10021025",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 84962c128b8dd4f52cc7179d60a4af0efee3de3c Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14741) 
   * 6940a386344827d47ff9c9da1d81306e10021025 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=14747) 
   * 1aa1f0cc4f8c643e6b5423b9b5de54ef334ba8c1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089793370


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -505,31 +495,6 @@ object HoodieSparkSqlWriter {
     HoodieAvroUtils.removeFields(schema, partitionColumns.toSet.asJava)
   }
 
-  def generateSparkSchemaWithoutPartitionColumns(partitionParam: String, schema: StructType): StructType = {

Review Comment:
   Dead code



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -82,12 +84,11 @@ object HoodieSparkSqlWriter {
             optParams: Map[String, String],
             df: DataFrame,
             hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
-            hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
-            asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
-            asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty,
-            extraPreCommitFn: Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]] = Option.empty)
-  : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String],
-    SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
+            hoodieWriteClient: Option[SparkRDDWriteClient[_]] = Option.empty,

Review Comment:
   This was just search-and-replace removing invalid type references 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala:
##########
@@ -308,14 +175,351 @@ object HoodieInternalRowUtils {
         }
       case _ =>
     }
+
     if (value == None) {
       throw new HoodieException(String.format("cannot support rewrite value for schema type: %s since the old schema type is: %s", newSchema, oldSchema))
     } else {
       CatalystTypeConverters.convertToCatalyst(value)
     }
   }
+   */
+  
+  private type RowFieldUpdater = (CatalystDataUpdater, Int, Any) => Unit
+
+  private def genUnsafeRowWriterRenaming(prevStructType: StructType, newStructType: StructType, renamedColumnsMap: JMap[String, String], fieldNames: JDeque[String]): (CatalystDataUpdater, Any) => Unit = {
+    // TODO need to canonicalize schemas (casing)
+    val fieldWriters = ArrayBuffer.empty[RowFieldUpdater]
+    val positionMap = ArrayBuffer.empty[Int]
+
+    for (newField <- newStructType.fields) {
+      fieldNames.push(newField.name)
+
+      val (fieldWriter, prevFieldPos): (RowFieldUpdater, Int) =
+        prevStructType.getFieldIndex(newField.name) match {
+          case Some(prevFieldPos) =>
+            val prevField = prevStructType(prevFieldPos)
+            (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNames), prevFieldPos)
+
+          case None =>
+            val newFieldQualifiedName = createFullName(fieldNames)
+            val prevFieldName: String = lookupRenamedField(newFieldQualifiedName, renamedColumnsMap)
+
+            // Handle rename
+            prevStructType.getFieldIndex(prevFieldName) match {
+              case Some(prevFieldPos) =>
+                val prevField = prevStructType.fields(prevFieldPos)
+                (newWriterRenaming(prevField.dataType, newField.dataType, renamedColumnsMap, fieldNames), prevFieldPos)
+
+              case None =>
+                // TODO handle defaults
+                val updater: RowFieldUpdater = (fieldUpdater, ordinal, _) => fieldUpdater.setNullAt(ordinal)
+                (updater, -1)
+            }
+        }
+
+      fieldWriters += fieldWriter
+      positionMap += prevFieldPos
+
+      fieldNames.pop()
+    }
+
+    (fieldUpdater, row) => {
+      var pos = 0
+      while (pos < fieldWriters.length) {
+        val prevPos = positionMap(pos)
+        val prevValue = if (prevPos >= 0) {
+          row.asInstanceOf[InternalRow].get(prevPos, prevStructType.fields(prevPos).dataType)
+        } else {
+          null
+        }
+
+        fieldWriters(pos)(fieldUpdater, pos, prevValue)
+        pos += 1
+      }
+    }
+  }
+
+  private def newWriterRenaming(prevDataType: DataType,
+                                newDataType: DataType,
+                                renamedColumnsMap: JMap[String, String],
+                                fieldNames: JDeque[String]): RowFieldUpdater = {
+    (prevDataType, newDataType) match {
+      case (prevType, newType) if prevType == newType =>
+        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
+
+      case (prevStructType: StructType, newStructType: StructType) =>
+        val writer = genUnsafeRowWriterRenaming(prevStructType, newStructType, renamedColumnsMap, fieldNames)
+        val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
+        val rowUpdater = new RowUpdater(newRow)
+
+        (fieldUpdater, ordinal, value) => {
+          // TODO elaborate

Review Comment:
   Note to self to explain particular piece



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7769: [HUDI-5633] Fixing performance regression in `HoodieSparkRecord`

Posted by "alexeykudinkin (via GitHub)" <gi...@apache.org>.
alexeykudinkin commented on code in PR #7769:
URL: https://github.com/apache/hudi/pull/7769#discussion_r1089880847


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -136,24 +136,22 @@ protected void doWrite(HoodieRecord record, Schema schema, TypedProperties props
         if (record.shouldIgnore(schema, config.getProps())) {
           return;
         }
-        // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
-        HoodieRecord rewriteRecord;
-        if (schemaOnReadEnabled) {
-          rewriteRecord = record.rewriteRecordWithNewSchema(schema, config.getProps(), writeSchemaWithMetaFields);
-        } else {
-          rewriteRecord = record.rewriteRecord(schema, config.getProps(), writeSchemaWithMetaFields);
-        }
+
         MetadataValues metadataValues = new MetadataValues().setFileName(path.getName());
-        rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, config.getProps(), metadataValues);
+        HoodieRecord populatedRecord =

Review Comment:
   Change similar to https://github.com/apache/hudi/pull/7769#discussion_r1089880703



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -374,20 +374,16 @@ public void write(HoodieRecord<T> oldRecord) {
   }
 
   protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema schema, Properties prop, boolean shouldPreserveRecordMetadata) throws IOException {
-    HoodieRecord rewriteRecord;
-    if (schemaOnReadEnabled) {
-      rewriteRecord = record.rewriteRecordWithNewSchema(schema, prop, writeSchemaWithMetaFields);
-    } else {
-      rewriteRecord = record.rewriteRecord(schema, prop, writeSchemaWithMetaFields);
-    }
     // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
     //       file holding this record even in cases when overall metadata is preserved
     MetadataValues metadataValues = new MetadataValues().setFileName(newFilePath.getName());
-    rewriteRecord = rewriteRecord.updateMetadataValues(writeSchemaWithMetaFields, prop, metadataValues);
+    HoodieRecord populatedRecord =

Review Comment:
   Change similar to https://github.com/apache/hudi/pull/7769#discussion_r1089880703



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -130,29 +128,27 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
             (left, right) ->
                 left.joinWith(right, mergeHandle.getWriterSchemaWithMetaFields()));
         recordSchema = mergeHandle.getWriterSchemaWithMetaFields();
-      } else if (schemaEvolutionTransformerOpt.isPresent()) {
-        recordIterator = new MappingIterator<>(baseFileRecordIterator,
-            schemaEvolutionTransformerOpt.get().getLeft().apply(isPureProjection ? writerSchema : readerSchema));
-        recordSchema = schemaEvolutionTransformerOpt.get().getRight();
       } else {
         recordIterator = baseFileRecordIterator;
         recordSchema = isPureProjection ? writerSchema : readerSchema;
       }
 
+      boolean isBufferingRecords = ExecutorFactory.isBufferingRecords(writeConfig);
+
       wrapper = ExecutorFactory.create(writeConfig, recordIterator, new UpdateHandler(mergeHandle), record -> {
+        HoodieRecord newRecord;
+        if (schemaEvolutionTransformerOpt.isPresent()) {

Review Comment:
   Schema Evolution transformer now is applied inside the transformer as opposed to as an MappingIterator previously



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -416,7 +423,8 @@ private static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType s
         getValue(structType, recordKeyPartitionPathFieldPair.getRight(), record.data).toString());
 
     HoodieOperation operation = withOperationField
-        ? HoodieOperation.fromName(getNullableValAsString(structType, record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
+        ? HoodieOperation.fromName(record.data.getString(structType.fieldIndex(HoodieRecord.OPERATION_METADATA_FIELD)))

Review Comment:
   Good point! Let me revisit



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -190,9 +194,10 @@ public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecord(this.data, structType, targetStructType);
-    UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType, targetStructType).apply(rewriteRecord);
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, targetStructType, Collections.emptyMap());

Review Comment:
   Key point here is that we actually don't need actually `rewriteRecord` operation as such: historically it has been used to expand (Avro) schema of the record to accommodate for meta-fields, which is actually handled differently now in `HoodieSparkRecord`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -206,9 +211,10 @@ public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties p
     StructType structType = HoodieInternalRowUtils.getCachedSchema(recordSchema);
     StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
 
-    // TODO HUDI-5281 Rewrite HoodieSparkRecord with UnsafeRowWriter
-    InternalRow rewriteRecord = HoodieInternalRowUtils.rewriteRecordWithNewSchema(this.data, structType, newStructType, renameCols);
-    UnsafeRow unsafeRow = HoodieInternalRowUtils.getCachedUnsafeProjection(newStructType, newStructType).apply(rewriteRecord);
+    Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, newStructType, Collections.emptyMap());

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org