You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/04/23 21:11:06 UTC

[GitHub] [incubator-gobblin] autumnust opened a new pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

autumnust opened a new pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - https://issues.apache.org/jira/browse/GOBBLIN-1126
   
   ### Description
   The contribution of this PR contains: 
   - Refactored `upConvertOrcStruct` which was used specifically for handling schema evolution. The downside of the original implementation is, it creates new OrcStruct for each map call which is usually considered as an anti-pattern in MR execution. The fix makes it possible to reuse the same `OrcStruct` object by making this method only generating side-effect but return void. 
   - The refactored `upConvertOrcStruct` can be reused in column projection of `OrcStruct` so that if we want to have an arbitrary subset of columns to form a shuffle key (we used to use the whole record for shuffle key which is unnecessarily burdensome and harm the performance). 
   - To accommodate with tests I created a bunch of tools in `OrcUtils`: 
   -- `orcStructFillerWithFixedValue`: Given a schema, generate a `OrcStruct` with some value. This would be useful to randomly generate some record in row-manner and we could write them into a ORC file easily. 
   -- `writableComparableTypeWidening`: The original `upConvertOrcStruct` cannot handle this. Added for correctness. 
   -- `createValueRecursively`: The ORC API to create a `OrcStruct`, `OrcStruct.createValue` has a problem: If there's nested type within a contain-type like List/MAP, `createValue` call doesn't create any instance within the container, so that any nested-type information within the container's element type is lost. It might be worthwhile to contribute this back to ORC upstream. 
   
   The test might be hard to read, but given all the tools mentioned above, what I really do is mostly following this sequence: 
   
   1. create a ORC schema by: `schema = TypeDescription.createFromString(schemaString);` 
   2. create a OrcStruct by `orcStruct = OrcUtils.createValueRecursively(schema)`. 
   3. Fill in the value by `OrcUtils.orcStructFillerWithFixedValue(orcStruct, schema, valueForDifferntTypes ...)` 
   4. Do this for two different schema: writer schema and reader schema with certain evolution. 
   5. `upConvertOrcStruct(recordInWriterSchema, recordInReaderSchema, readerSchema); `
   6. Examine if `recordInReaderSchema`'s result is expected. 
   
   
   
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-io commented on pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#issuecomment-619497656


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=h1) Report
   > Merging [#2966](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/a52e1a8fe0958a813e4c1967879cc17a12b444ce&el=desc) will **increase** coverage by `0.02%`.
   > The diff coverage is `59.37%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2966      +/-   ##
   ============================================
   + Coverage     45.64%   45.66%   +0.02%     
   - Complexity     9196     9226      +30     
   ============================================
     Files          1940     1940              
     Lines         73564    73739     +175     
     Branches       8128     8155      +27     
   ============================================
   + Hits          33578    33676      +98     
   - Misses        36860    36930      +70     
   - Partials       3126     3133       +7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...n/compaction/mapreduce/orc/OrcKeyDedupReducer.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL29yYy9PcmNLZXlEZWR1cFJlZHVjZXIuamF2YQ==) | `48.14% <22.22%> (-51.86%)` | `6.00 <1.00> (+1.00)` | :arrow_down: |
   | [...che/gobblin/compaction/mapreduce/orc/OrcUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL29yYy9PcmNVdGlscy5qYXZh) | `63.20% <60.00%> (-9.72%)` | `47.00 <35.00> (+35.00)` | :arrow_down: |
   | [...action/mapreduce/CompactionOrcJobConfigurator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL0NvbXBhY3Rpb25PcmNKb2JDb25maWd1cmF0b3IuamF2YQ==) | `88.46% <66.66%> (-3.21%)` | `5.00 <0.00> (ø)` | |
   | [...bblin/compaction/mapreduce/orc/OrcValueMapper.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL29yYy9PcmNWYWx1ZU1hcHBlci5qYXZh) | `93.75% <88.88%> (+12.32%)` | `8.00 <5.00> (-8.00)` | :arrow_up: |
   | [...he/gobblin/writer/FineGrainedWatermarkTracker.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vd3JpdGVyL0ZpbmVHcmFpbmVkV2F0ZXJtYXJrVHJhY2tlci5qYXZh) | `82.25% <0.00%> (-2.42%)` | `28.00% <0.00%> (-1.00%)` | |
   | [...e/gobblin/runtime/app/ServiceBasedAppLauncher.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBwL1NlcnZpY2VCYXNlZEFwcExhdW5jaGVyLmphdmE=) | `47.57% <0.00%> (-1.95%)` | `12.00% <0.00%> (ø%)` | |
   | [...gobblin/service/monitoring/JobStatusRetriever.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9Kb2JTdGF0dXNSZXRyaWV2ZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `2.00% <0.00%> (ø%)` | |
   | [...blin/service/monitoring/KafkaJobStatusMonitor.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9LYWZrYUpvYlN0YXR1c01vbml0b3IuamF2YQ==) | `47.94% <0.00%> (+0.05%)` | `10.00% <0.00%> (ø%)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `67.91% <0.00%> (+0.37%)` | `33.00% <0.00%> (ø%)` | |
   | [.../apache/gobblin/runtime/api/JobExecutionState.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvYXBpL0pvYkV4ZWN1dGlvblN0YXRlLmphdmE=) | `80.37% <0.00%> (+0.93%)` | `24.00% <0.00%> (ø%)` | |
   | ... and [4 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=footer). Last update [a52e1a8...a8e9c37](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] codecov-io edited a comment on pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#issuecomment-619497656


   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=h1) Report
   > Merging [#2966](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/cc50c491cc40789300ad3019999cfa23f0027724&el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `57.93%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2966      +/-   ##
   ============================================
   - Coverage     45.63%   45.62%   -0.01%     
   - Complexity     9196     9240      +44     
   ============================================
     Files          1940     1943       +3     
     Lines         73574    73938     +364     
     Branches       8128     8179      +51     
   ============================================
   + Hits          33574    33734     +160     
   - Misses        36873    37061     +188     
   - Partials       3127     3143      +16     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...che/gobblin/compaction/mapreduce/orc/OrcUtils.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL29yYy9PcmNVdGlscy5qYXZh) | `56.22% <52.05%> (-16.70%)` | `49.00 <37.00> (+37.00)` | :arrow_down: |
   | [...bblin/compaction/mapreduce/orc/OrcValueMapper.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL29yYy9PcmNWYWx1ZU1hcHBlci5qYXZh) | `75.00% <64.00%> (-6.43%)` | `8.00 <5.00> (-8.00)` | |
   | [...action/mapreduce/CompactionOrcJobConfigurator.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL0NvbXBhY3Rpb25PcmNKb2JDb25maWd1cmF0b3IuamF2YQ==) | `92.30% <100.00%> (+0.64%)` | `6.00 <0.00> (+1.00)` | |
   | [...ompaction/mapreduce/RecordKeyDedupReducerBase.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL1JlY29yZEtleURlZHVwUmVkdWNlckJhc2UuamF2YQ==) | `90.32% <100.00%> (+0.66%)` | `11.00 <2.00> (+2.00)` | |
   | [...n/compaction/mapreduce/orc/OrcKeyDedupReducer.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL29yYy9PcmNLZXlEZWR1cFJlZHVjZXIuamF2YQ==) | `100.00% <100.00%> (ø)` | `10.00 <5.00> (+5.00)` | |
   | [...n/java/org/apache/gobblin/util/logs/LogCopier.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvbG9ncy9Mb2dDb3BpZXIuamF2YQ==) | `61.53% <0.00%> (-8.03%)` | `18.00% <0.00%> (ø%)` | |
   | [...a/org/apache/gobblin/service/RequesterService.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi1mbG93LWNvbmZpZy1zZXJ2aWNlL2dvYmJsaW4tZmxvdy1jb25maWctc2VydmljZS1zZXJ2ZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9SZXF1ZXN0ZXJTZXJ2aWNlLmphdmE=) | `92.30% <0.00%> (-7.70%)` | `4.00% <0.00%> (ø%)` | |
   | [...che/gobblin/yarn/YarnContainerSecurityManager.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vWWFybkNvbnRhaW5lclNlY3VyaXR5TWFuYWdlci5qYXZh) | `58.62% <0.00%> (-5.02%)` | `6.00% <0.00%> (+1.00%)` | :arrow_down: |
   | [.../org/apache/gobblin/yarn/GobblinYarnLogSource.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbllhcm5Mb2dTb3VyY2UuamF2YQ==) | `19.35% <0.00%> (-3.73%)` | `3.00% <0.00%> (ø%)` | |
   | [.../apache/gobblin/yarn/GobblinApplicationMaster.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbkFwcGxpY2F0aW9uTWFzdGVyLmphdmE=) | `17.56% <0.00%> (-0.25%)` | `3.00% <0.00%> (ø%)` | |
   | ... and [19 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2966/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=footer). Last update [cc50c49...a4469bc](https://codecov.io/gh/apache/incubator-gobblin/pull/2966?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#discussion_r419745811



##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
##########
@@ -88,21 +87,33 @@ protected void reduce(KI key, Iterable<VI> values, Context context)
       numVals++;
     }
 
+    writeRetainValue(valueToRetain, context);

Review comment:
       Isn't this method only write single value? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
autumnust commented on pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#issuecomment-618673425


   @sv2000  Please take a look, thanks ! 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#discussion_r420897902



##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
##########
@@ -37,6 +53,30 @@ protected void setOutKey(OrcValue valueToRetain) {
     // do nothing since initReusableObject has assigned value for outKey.
   }
 
+  @Override
+  protected void reduce(OrcKey key, Iterable<OrcValue> values, Context context)
+      throws IOException, InterruptedException {
+
+    /* Map from hash of value(Typed in OrcStruct) object to its times of duplication*/
+    Map<Integer, Integer> valuesToRetain = new HashMap<>();
+    int valueHash = 0;
+
+    for (OrcValue value : values) {

Review comment:
       Thanks for the explanation. Makes sense. 

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
##########
@@ -88,21 +87,33 @@ protected void reduce(KI key, Iterable<VI> values, Context context)
       numVals++;
     }
 
+    writeRetainValue(valueToRetain, context);

Review comment:
       Ah ok. Maybe name it writeRetainedValue then?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
sv2000 commented on pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#issuecomment-624732169


   @autumnust You may want to check on the travis failure. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#discussion_r419748724



##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
##########
@@ -37,6 +53,30 @@ protected void setOutKey(OrcValue valueToRetain) {
     // do nothing since initReusableObject has assigned value for outKey.
   }
 
+  @Override
+  protected void reduce(OrcKey key, Iterable<OrcValue> values, Context context)
+      throws IOException, InterruptedException {
+
+    /* Map from hash of value(Typed in OrcStruct) object to its times of duplication*/
+    Map<Integer, Integer> valuesToRetain = new HashMap<>();
+    int valueHash = 0;
+
+    for (OrcValue value : values) {
+      valueHash = ((OrcStruct) value.value).hashCode();

Review comment:
       Mentioned above: As long as the shuffle key is chosen properly, the cost should not be high.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#discussion_r419787249



##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
##########
@@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader
       return ConvertTreeReaderFactory.canConvert(fileType, readerType);
     }
   }
+
+  /**
+   * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}.
+   * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the
+   * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}.
+   *
+   * For simplicity here are some assumptions:
+   * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that
+   * can be widened or shrunk to each other, please use value within small range.
+   * - For List, Map or Union, make sure there's at least one entry within the record-container.
+   * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)}
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag,
+      int intValue, String stringValue, boolean booleanValue) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        ((BooleanWritable) w).set(booleanValue);
+        break;
+      case BYTE:
+        ((ByteWritable) w).set((byte) intValue);
+        break;
+      case SHORT:
+        ((ShortWritable) w).set((short) intValue);
+        break;
+      case INT:
+        ((IntWritable) w).set(intValue);
+        break;
+      case LONG:
+        ((LongWritable) w).set(intValue);
+        break;
+      case FLOAT:
+        ((FloatWritable) w).set(intValue * 1.0f);
+        break;
+      case DOUBLE:
+        ((DoubleWritable) w).set(intValue * 1.0);
+        break;
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        ((Text) w).set(stringValue);
+        break;
+      case BINARY:
+        throw new UnsupportedOperationException("Binary type is not supported in random orc data filler");
+      case DECIMAL:
+        throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler");
+      case DATE:
+      case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
+        throw new UnsupportedOperationException(
+            "Timestamp and its derived types is not supported in random orc data filler");
+      case LIST:
+        OrcList castedList = (OrcList) w;
+        // Here it is not trivial to create typed-object in element-type. So this method expect the value container
+        // to at least contain one element, or the traversing within the list will be skipped.
+        for (Object i : castedList) {
+          orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case MAP:
+        OrcMap castedMap = (OrcMap) w;
+        for (Object entry : castedMap.entrySet()) {
+          Map.Entry<WritableComparable, WritableComparable> castedEntry =
+              (Map.Entry<WritableComparable, WritableComparable>) entry;
+          orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+          orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case STRUCT:
+        OrcStruct castedStruct = (OrcStruct) w;
+        int fieldIdx = 0;
+        for (TypeDescription child : schema.getChildren()) {
+          orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue,
+              booleanValue);
+          fieldIdx += 1;
+        }
+        break;
+      case UNION:
+        OrcUnion castedUnion = (OrcUnion) w;
+        TypeDescription targetMemberSchema = schema.getChildren().get(unionTag);
+        castedUnion.set(unionTag, createValueRecursively(targetMemberSchema));
+        orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag,
+            intValue, stringValue, booleanValue);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown type " + schema.toString());
+    }
+  }
+
+  /**
+   * The simple API: Union tag by default set to 0.
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue,
+      String stringValue, boolean booleanValue) {
+    orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue);
+  }
+
+  /**
+   * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types.
+   * Check failure will trigger Cast exception and blow up the process.
+   */
+  @SuppressWarnings("unchecked")
+  private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v,
+      TypeDescription targetSchema) {
+
+    if (w instanceof OrcStruct) {
+      upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema);
+    } else if (w instanceof OrcList) {
+      OrcList castedList = (OrcList) w;
+      OrcList targetList = (OrcList) v;
+      TypeDescription elementType = targetSchema.getChildren().get(0);
+      WritableComparable targetListRecordContainer =
+          targetList.size() > 0 ? (WritableComparable) targetList.get(0) : createValueRecursively(elementType, 0);
+      targetList.clear();
+
+      for (int i = 0; i < castedList.size(); i++) {
+        targetList.add(i,
+            structConversionHelper((WritableComparable) castedList.get(i), targetListRecordContainer, elementType));
+      }
+    } else if (w instanceof OrcMap) {
+      OrcMap castedMap = (OrcMap) w;
+      OrcMap targetMap = (OrcMap) v;
+      TypeDescription valueSchema = targetSchema.getChildren().get(1);
+
+      // Create recordContainer with the schema of value.
+      Iterator targetMapEntries = targetMap.values().iterator();
+      WritableComparable targetMapRecordContainer =
+          targetMapEntries.hasNext() ? (WritableComparable) targetMapEntries.next()
+              : createValueRecursively(valueSchema);
+
+      targetMap.clear();
+
+      for (Object entry : castedMap.entrySet()) {
+        Map.Entry<WritableComparable, WritableComparable> castedEntry =
+            (Map.Entry<WritableComparable, WritableComparable>) entry;
+
+        targetMapRecordContainer =
+            structConversionHelper(castedEntry.getValue(), targetMapRecordContainer, valueSchema);
+        targetMap.put(castedEntry.getKey(), targetMapRecordContainer);
+      }
+    } else if (w instanceof OrcUnion) {
+      OrcUnion castedUnion = (OrcUnion) w;
+      OrcUnion targetUnion = (OrcUnion) v;
+      byte tag = castedUnion.getTag();
+
+      // ORC doesn't support Union type widening
+      // Avro doesn't allow it either, reference: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
+      // As a result, member schema within source and target should be identical.
+      TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag);
+      targetUnion.set(tag, structConversionHelper((WritableComparable) castedUnion.getObject(),
+          (WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema));
+    } else {
+      // Regardless whether type-widening is happening or not, this method copy the value of w into v.
+        handlePrimitiveWritableComparable(w, v);
+    }
+
+    // If non-primitive or type-widening is required, v should already be populated by w's value recursively.
+    return v;
+  }
+
+  /**
+   * Recursively convert the {@param oldStruct} into {@param newStruct} whose schema is {@param targetSchema}.
+   * This serves similar purpose like GenericDatumReader for Avro, which accepts an reader schema and writer schema
+   * to allow users convert bytes into reader's schema in a compatible approach.
+   * Calling this method SHALL NOT cause any side-effect for {@param oldStruct}, also it will copy value of each fields
+   * in {@param oldStruct} into {@param newStruct} recursively. Please ensure avoiding unnecessary call as it could
+   * be pretty expensive if the struct schema is complicated, or contains container objects like array/map.
+   *
+   * Note that if newStruct containing things like List/Map (container-type), the up-conversion is doing two things:
+   * 1. Clear all elements in original containers.
+   * 2. Make value of container elements in {@param oldStruct} is populated into {@param newStruct} with element-type
+   * in {@param newStruct} if compatible.
+   *
+   * Limitation:
+   * 1. Does not support up-conversion of key types in Maps. The underlying reasoning is because of the primary format
+   * from upstream is Avro, which enforces key-type to be string only.
+   * 2. Conversion from a field A to field B only happens if
+   * org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B) return true.
+   */
+  @VisibleForTesting
+  public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct newStruct, TypeDescription targetSchema) {
+
+    // If target schema is not equal to newStruct's schema, it is a illegal state and doesn't make sense to work through.
+    Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema));
+
+    int indexInNewSchema = 0;
+    List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames();
+    List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren();
+    List<TypeDescription> newSchemaTypes = targetSchema.getChildren();
+
+    for (String fieldName : targetSchema.getFieldNames()) {
+      if (oldSchemaFieldNames.contains(fieldName) && oldStruct.getFieldValue(fieldName) != null) {
+        int fieldIndex = oldSchemaFieldNames.indexOf(fieldName);

Review comment:
       Yes, will address. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#discussion_r419130838



##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
##########
@@ -88,21 +87,33 @@ protected void reduce(KI key, Iterable<VI> values, Context context)
       numVals++;
     }
 
+    writeRetainValue(valueToRetain, context);

Review comment:
       writeRetainValue -> writeValuesToRetain

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionOrcJobConfigurator.java
##########
@@ -36,6 +38,10 @@
 
 
 public class CompactionOrcJobConfigurator extends CompactionJobConfigurator {
+
+  public static final String ORC_MAPPER_SHUFFLE_SCHEMA_KEY = "orcMapperShuffleSchema";

Review comment:
       Did you mean orcMapperShuffleKeySchema? i.e. is it the schema of the map output key? If so, might be useful to add a javadoc comment to help orient the reader.

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/RecordKeyDedupReducerBase.java
##########
@@ -88,21 +87,33 @@ protected void reduce(KI key, Iterable<VI> values, Context context)
       numVals++;
     }
 
+    writeRetainValue(valueToRetain, context);
+    updateCounter(numVals, context);

Review comment:
       updateCounter -> updateCounters.

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
##########
@@ -37,6 +53,30 @@ protected void setOutKey(OrcValue valueToRetain) {
     // do nothing since initReusableObject has assigned value for outKey.
   }
 
+  @Override
+  protected void reduce(OrcKey key, Iterable<OrcValue> values, Context context)
+      throws IOException, InterruptedException {
+
+    /* Map from hash of value(Typed in OrcStruct) object to its times of duplication*/
+    Map<Integer, Integer> valuesToRetain = new HashMap<>();
+    int valueHash = 0;
+
+    for (OrcValue value : values) {

Review comment:
       I am not fully clear what is the objective here. Is it that we want to avoid using the entire record as the shuffle key and hence, we have to de-dupe based on the values? If so, what is the benefit of doing that? Seems like we would end up with paying the cost of computing the hash code twice, once on the map output key to shuffle the map output and once for detecting value duplicates. 

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
##########
@@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader
       return ConvertTreeReaderFactory.canConvert(fileType, readerType);
     }
   }
+
+  /**
+   * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}.
+   * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the
+   * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}.
+   *
+   * For simplicity here are some assumptions:
+   * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that
+   * can be widened or shrunk to each other, please use value within small range.
+   * - For List, Map or Union, make sure there's at least one entry within the record-container.
+   * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)}
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag,

Review comment:
       orcStructFillerWithFixedValue -> fillOrcStructWithFixedValue. Better to use verbs for method names. 

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
##########
@@ -37,6 +53,30 @@ protected void setOutKey(OrcValue valueToRetain) {
     // do nothing since initReusableObject has assigned value for outKey.
   }
 
+  @Override
+  protected void reduce(OrcKey key, Iterable<OrcValue> values, Context context)
+      throws IOException, InterruptedException {
+
+    /* Map from hash of value(Typed in OrcStruct) object to its times of duplication*/
+    Map<Integer, Integer> valuesToRetain = new HashMap<>();
+    int valueHash = 0;
+
+    for (OrcValue value : values) {
+      valueHash = ((OrcStruct) value.value).hashCode();

Review comment:
       Have we measured the cost of the hashCode computation?

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
##########
@@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader
       return ConvertTreeReaderFactory.canConvert(fileType, readerType);
     }
   }
+
+  /**
+   * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}.
+   * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the
+   * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}.
+   *
+   * For simplicity here are some assumptions:
+   * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that
+   * can be widened or shrunk to each other, please use value within small range.
+   * - For List, Map or Union, make sure there's at least one entry within the record-container.
+   * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)}
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag,
+      int intValue, String stringValue, boolean booleanValue) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        ((BooleanWritable) w).set(booleanValue);
+        break;
+      case BYTE:
+        ((ByteWritable) w).set((byte) intValue);
+        break;
+      case SHORT:
+        ((ShortWritable) w).set((short) intValue);
+        break;
+      case INT:
+        ((IntWritable) w).set(intValue);
+        break;
+      case LONG:
+        ((LongWritable) w).set(intValue);
+        break;
+      case FLOAT:
+        ((FloatWritable) w).set(intValue * 1.0f);
+        break;
+      case DOUBLE:
+        ((DoubleWritable) w).set(intValue * 1.0);
+        break;
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        ((Text) w).set(stringValue);
+        break;
+      case BINARY:
+        throw new UnsupportedOperationException("Binary type is not supported in random orc data filler");
+      case DECIMAL:
+        throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler");
+      case DATE:
+      case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
+        throw new UnsupportedOperationException(
+            "Timestamp and its derived types is not supported in random orc data filler");
+      case LIST:
+        OrcList castedList = (OrcList) w;
+        // Here it is not trivial to create typed-object in element-type. So this method expect the value container
+        // to at least contain one element, or the traversing within the list will be skipped.
+        for (Object i : castedList) {
+          orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case MAP:
+        OrcMap castedMap = (OrcMap) w;
+        for (Object entry : castedMap.entrySet()) {
+          Map.Entry<WritableComparable, WritableComparable> castedEntry =
+              (Map.Entry<WritableComparable, WritableComparable>) entry;
+          orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+          orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case STRUCT:
+        OrcStruct castedStruct = (OrcStruct) w;
+        int fieldIdx = 0;
+        for (TypeDescription child : schema.getChildren()) {
+          orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue,
+              booleanValue);
+          fieldIdx += 1;
+        }
+        break;
+      case UNION:
+        OrcUnion castedUnion = (OrcUnion) w;
+        TypeDescription targetMemberSchema = schema.getChildren().get(unionTag);
+        castedUnion.set(unionTag, createValueRecursively(targetMemberSchema));
+        orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag,
+            intValue, stringValue, booleanValue);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown type " + schema.toString());
+    }
+  }
+
+  /**
+   * The simple API: Union tag by default set to 0.
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue,
+      String stringValue, boolean booleanValue) {
+    orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue);
+  }
+
+  /**
+   * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types.
+   * Check failure will trigger Cast exception and blow up the process.
+   */
+  @SuppressWarnings("unchecked")
+  private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v,
+      TypeDescription targetSchema) {
+
+    if (w instanceof OrcStruct) {
+      upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema);
+    } else if (w instanceof OrcList) {
+      OrcList castedList = (OrcList) w;
+      OrcList targetList = (OrcList) v;
+      TypeDescription elementType = targetSchema.getChildren().get(0);
+      WritableComparable targetListRecordContainer =
+          targetList.size() > 0 ? (WritableComparable) targetList.get(0) : createValueRecursively(elementType, 0);
+      targetList.clear();
+
+      for (int i = 0; i < castedList.size(); i++) {
+        targetList.add(i,
+            structConversionHelper((WritableComparable) castedList.get(i), targetListRecordContainer, elementType));
+      }
+    } else if (w instanceof OrcMap) {
+      OrcMap castedMap = (OrcMap) w;
+      OrcMap targetMap = (OrcMap) v;
+      TypeDescription valueSchema = targetSchema.getChildren().get(1);
+
+      // Create recordContainer with the schema of value.
+      Iterator targetMapEntries = targetMap.values().iterator();
+      WritableComparable targetMapRecordContainer =
+          targetMapEntries.hasNext() ? (WritableComparable) targetMapEntries.next()
+              : createValueRecursively(valueSchema);
+
+      targetMap.clear();
+
+      for (Object entry : castedMap.entrySet()) {
+        Map.Entry<WritableComparable, WritableComparable> castedEntry =
+            (Map.Entry<WritableComparable, WritableComparable>) entry;
+
+        targetMapRecordContainer =
+            structConversionHelper(castedEntry.getValue(), targetMapRecordContainer, valueSchema);
+        targetMap.put(castedEntry.getKey(), targetMapRecordContainer);
+      }
+    } else if (w instanceof OrcUnion) {
+      OrcUnion castedUnion = (OrcUnion) w;
+      OrcUnion targetUnion = (OrcUnion) v;
+      byte tag = castedUnion.getTag();
+
+      // ORC doesn't support Union type widening
+      // Avro doesn't allow it either, reference: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
+      // As a result, member schema within source and target should be identical.
+      TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag);
+      targetUnion.set(tag, structConversionHelper((WritableComparable) castedUnion.getObject(),
+          (WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema));
+    } else {
+      // Regardless whether type-widening is happening or not, this method copy the value of w into v.
+        handlePrimitiveWritableComparable(w, v);
+    }
+
+    // If non-primitive or type-widening is required, v should already be populated by w's value recursively.
+    return v;
+  }
+
+  /**
+   * Recursively convert the {@param oldStruct} into {@param newStruct} whose schema is {@param targetSchema}.
+   * This serves similar purpose like GenericDatumReader for Avro, which accepts an reader schema and writer schema
+   * to allow users convert bytes into reader's schema in a compatible approach.
+   * Calling this method SHALL NOT cause any side-effect for {@param oldStruct}, also it will copy value of each fields
+   * in {@param oldStruct} into {@param newStruct} recursively. Please ensure avoiding unnecessary call as it could
+   * be pretty expensive if the struct schema is complicated, or contains container objects like array/map.
+   *
+   * Note that if newStruct containing things like List/Map (container-type), the up-conversion is doing two things:
+   * 1. Clear all elements in original containers.
+   * 2. Make value of container elements in {@param oldStruct} is populated into {@param newStruct} with element-type
+   * in {@param newStruct} if compatible.
+   *
+   * Limitation:
+   * 1. Does not support up-conversion of key types in Maps. The underlying reasoning is because of the primary format
+   * from upstream is Avro, which enforces key-type to be string only.
+   * 2. Conversion from a field A to field B only happens if
+   * org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B) return true.
+   */
+  @VisibleForTesting
+  public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct newStruct, TypeDescription targetSchema) {
+
+    // If target schema is not equal to newStruct's schema, it is a illegal state and doesn't make sense to work through.
+    Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema));
+
+    int indexInNewSchema = 0;
+    List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames();
+    List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren();
+    List<TypeDescription> newSchemaTypes = targetSchema.getChildren();
+
+    for (String fieldName : targetSchema.getFieldNames()) {
+      if (oldSchemaFieldNames.contains(fieldName) && oldStruct.getFieldValue(fieldName) != null) {
+        int fieldIndex = oldSchemaFieldNames.indexOf(fieldName);

Review comment:
       Can we avoid the indexOf call? It is an O(N) call where N = number of fields. Can't we just store a map of fieldNames to indexes?

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
##########
@@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader
       return ConvertTreeReaderFactory.canConvert(fileType, readerType);
     }
   }
+
+  /**
+   * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}.

Review comment:
       Is the method primarily to be used for Orc struct generation in test classes? If so, can we move this inside a TestUtils class? 

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
##########
@@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader
       return ConvertTreeReaderFactory.canConvert(fileType, readerType);
     }
   }
+
+  /**
+   * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}.
+   * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the
+   * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}.
+   *
+   * For simplicity here are some assumptions:
+   * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that
+   * can be widened or shrunk to each other, please use value within small range.
+   * - For List, Map or Union, make sure there's at least one entry within the record-container.
+   * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)}
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag,
+      int intValue, String stringValue, boolean booleanValue) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        ((BooleanWritable) w).set(booleanValue);
+        break;
+      case BYTE:
+        ((ByteWritable) w).set((byte) intValue);
+        break;
+      case SHORT:
+        ((ShortWritable) w).set((short) intValue);
+        break;
+      case INT:
+        ((IntWritable) w).set(intValue);
+        break;
+      case LONG:
+        ((LongWritable) w).set(intValue);
+        break;
+      case FLOAT:
+        ((FloatWritable) w).set(intValue * 1.0f);
+        break;
+      case DOUBLE:
+        ((DoubleWritable) w).set(intValue * 1.0);
+        break;
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        ((Text) w).set(stringValue);
+        break;
+      case BINARY:
+        throw new UnsupportedOperationException("Binary type is not supported in random orc data filler");
+      case DECIMAL:
+        throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler");
+      case DATE:
+      case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
+        throw new UnsupportedOperationException(
+            "Timestamp and its derived types is not supported in random orc data filler");
+      case LIST:
+        OrcList castedList = (OrcList) w;
+        // Here it is not trivial to create typed-object in element-type. So this method expect the value container
+        // to at least contain one element, or the traversing within the list will be skipped.
+        for (Object i : castedList) {
+          orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case MAP:
+        OrcMap castedMap = (OrcMap) w;
+        for (Object entry : castedMap.entrySet()) {
+          Map.Entry<WritableComparable, WritableComparable> castedEntry =
+              (Map.Entry<WritableComparable, WritableComparable>) entry;
+          orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+          orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case STRUCT:
+        OrcStruct castedStruct = (OrcStruct) w;
+        int fieldIdx = 0;
+        for (TypeDescription child : schema.getChildren()) {
+          orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue,
+              booleanValue);
+          fieldIdx += 1;
+        }
+        break;
+      case UNION:
+        OrcUnion castedUnion = (OrcUnion) w;
+        TypeDescription targetMemberSchema = schema.getChildren().get(unionTag);
+        castedUnion.set(unionTag, createValueRecursively(targetMemberSchema));
+        orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag,
+            intValue, stringValue, booleanValue);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown type " + schema.toString());
+    }
+  }
+
+  /**
+   * The simple API: Union tag by default set to 0.
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue,
+      String stringValue, boolean booleanValue) {
+    orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue);
+  }
+
+  /**
+   * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types.
+   * Check failure will trigger Cast exception and blow up the process.
+   */
+  @SuppressWarnings("unchecked")
+  private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v,
+      TypeDescription targetSchema) {
+
+    if (w instanceof OrcStruct) {
+      upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema);
+    } else if (w instanceof OrcList) {
+      OrcList castedList = (OrcList) w;
+      OrcList targetList = (OrcList) v;
+      TypeDescription elementType = targetSchema.getChildren().get(0);
+      WritableComparable targetListRecordContainer =
+          targetList.size() > 0 ? (WritableComparable) targetList.get(0) : createValueRecursively(elementType, 0);
+      targetList.clear();
+
+      for (int i = 0; i < castedList.size(); i++) {
+        targetList.add(i,
+            structConversionHelper((WritableComparable) castedList.get(i), targetListRecordContainer, elementType));
+      }
+    } else if (w instanceof OrcMap) {
+      OrcMap castedMap = (OrcMap) w;
+      OrcMap targetMap = (OrcMap) v;
+      TypeDescription valueSchema = targetSchema.getChildren().get(1);
+
+      // Create recordContainer with the schema of value.
+      Iterator targetMapEntries = targetMap.values().iterator();
+      WritableComparable targetMapRecordContainer =
+          targetMapEntries.hasNext() ? (WritableComparable) targetMapEntries.next()
+              : createValueRecursively(valueSchema);
+
+      targetMap.clear();
+
+      for (Object entry : castedMap.entrySet()) {
+        Map.Entry<WritableComparable, WritableComparable> castedEntry =
+            (Map.Entry<WritableComparable, WritableComparable>) entry;
+
+        targetMapRecordContainer =
+            structConversionHelper(castedEntry.getValue(), targetMapRecordContainer, valueSchema);
+        targetMap.put(castedEntry.getKey(), targetMapRecordContainer);
+      }
+    } else if (w instanceof OrcUnion) {
+      OrcUnion castedUnion = (OrcUnion) w;
+      OrcUnion targetUnion = (OrcUnion) v;
+      byte tag = castedUnion.getTag();
+
+      // ORC doesn't support Union type widening
+      // Avro doesn't allow it either, reference: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
+      // As a result, member schema within source and target should be identical.
+      TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag);
+      targetUnion.set(tag, structConversionHelper((WritableComparable) castedUnion.getObject(),
+          (WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema));
+    } else {
+      // Regardless whether type-widening is happening or not, this method copy the value of w into v.
+        handlePrimitiveWritableComparable(w, v);
+    }
+
+    // If non-primitive or type-widening is required, v should already be populated by w's value recursively.
+    return v;
+  }
+
+  /**
+   * Recursively convert the {@param oldStruct} into {@param newStruct} whose schema is {@param targetSchema}.
+   * This serves similar purpose like GenericDatumReader for Avro, which accepts an reader schema and writer schema
+   * to allow users convert bytes into reader's schema in a compatible approach.
+   * Calling this method SHALL NOT cause any side-effect for {@param oldStruct}, also it will copy value of each fields
+   * in {@param oldStruct} into {@param newStruct} recursively. Please ensure avoiding unnecessary call as it could
+   * be pretty expensive if the struct schema is complicated, or contains container objects like array/map.
+   *
+   * Note that if newStruct containing things like List/Map (container-type), the up-conversion is doing two things:
+   * 1. Clear all elements in original containers.
+   * 2. Make value of container elements in {@param oldStruct} is populated into {@param newStruct} with element-type
+   * in {@param newStruct} if compatible.
+   *
+   * Limitation:
+   * 1. Does not support up-conversion of key types in Maps. The underlying reasoning is because of the primary format
+   * from upstream is Avro, which enforces key-type to be string only.
+   * 2. Conversion from a field A to field B only happens if
+   * org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B) return true.
+   */
+  @VisibleForTesting
+  public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct newStruct, TypeDescription targetSchema) {
+
+    // If target schema is not equal to newStruct's schema, it is a illegal state and doesn't make sense to work through.
+    Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema));
+
+    int indexInNewSchema = 0;
+    List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames();
+    List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren();
+    List<TypeDescription> newSchemaTypes = targetSchema.getChildren();
+
+    for (String fieldName : targetSchema.getFieldNames()) {
+      if (oldSchemaFieldNames.contains(fieldName) && oldStruct.getFieldValue(fieldName) != null) {
+        int fieldIndex = oldSchemaFieldNames.indexOf(fieldName);
+
+        TypeDescription oldFieldSchema = oldSchemaTypes.get(fieldIndex);
+        TypeDescription newFieldSchema = newSchemaTypes.get(indexInNewSchema);
+
+        if (isEvolutionValid(oldFieldSchema, newFieldSchema)) {
+          WritableComparable oldField = oldStruct.getFieldValue(fieldName);
+          WritableComparable newField = newStruct.getFieldValue(fieldName);
+          newField = (newField == null) ? OrcUtils.createValueRecursively(newFieldSchema) : newField;
+          newStruct.setFieldValue(fieldName, structConversionHelper(oldField, newField, newFieldSchema));
+        } else {
+          throw new SchemaEvolution.IllegalEvolutionException(String
+              .format("ORC does not support type conversion from file" + " type %s to reader type %s ",
+                  oldFieldSchema.toString(), newFieldSchema.toString()));
+        }
+      } else {
+        newStruct.setFieldValue(fieldName, null);
+      }
+
+      indexInNewSchema++;
+    }
+  }
+
+  /**
+   * For primitive types of {@link WritableComparable}, supporting ORC-allowed type-widening.
+   */
+  public static void handlePrimitiveWritableComparable(WritableComparable from, WritableComparable to) {

Review comment:
       Can we explain the expected behavior of this method in the JavaDoc? 

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
##########
@@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader
       return ConvertTreeReaderFactory.canConvert(fileType, readerType);
     }
   }
+
+  /**
+   * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}.
+   * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the
+   * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}.
+   *
+   * For simplicity here are some assumptions:
+   * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that
+   * can be widened or shrunk to each other, please use value within small range.
+   * - For List, Map or Union, make sure there's at least one entry within the record-container.
+   * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)}
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag,
+      int intValue, String stringValue, boolean booleanValue) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        ((BooleanWritable) w).set(booleanValue);
+        break;
+      case BYTE:
+        ((ByteWritable) w).set((byte) intValue);
+        break;
+      case SHORT:
+        ((ShortWritable) w).set((short) intValue);
+        break;
+      case INT:
+        ((IntWritable) w).set(intValue);
+        break;
+      case LONG:
+        ((LongWritable) w).set(intValue);
+        break;
+      case FLOAT:
+        ((FloatWritable) w).set(intValue * 1.0f);
+        break;
+      case DOUBLE:
+        ((DoubleWritable) w).set(intValue * 1.0);
+        break;
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        ((Text) w).set(stringValue);
+        break;
+      case BINARY:
+        throw new UnsupportedOperationException("Binary type is not supported in random orc data filler");
+      case DECIMAL:
+        throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler");
+      case DATE:
+      case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
+        throw new UnsupportedOperationException(
+            "Timestamp and its derived types is not supported in random orc data filler");
+      case LIST:
+        OrcList castedList = (OrcList) w;
+        // Here it is not trivial to create typed-object in element-type. So this method expect the value container
+        // to at least contain one element, or the traversing within the list will be skipped.
+        for (Object i : castedList) {
+          orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case MAP:
+        OrcMap castedMap = (OrcMap) w;
+        for (Object entry : castedMap.entrySet()) {
+          Map.Entry<WritableComparable, WritableComparable> castedEntry =
+              (Map.Entry<WritableComparable, WritableComparable>) entry;
+          orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+          orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case STRUCT:
+        OrcStruct castedStruct = (OrcStruct) w;
+        int fieldIdx = 0;
+        for (TypeDescription child : schema.getChildren()) {
+          orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue,
+              booleanValue);
+          fieldIdx += 1;
+        }
+        break;
+      case UNION:
+        OrcUnion castedUnion = (OrcUnion) w;
+        TypeDescription targetMemberSchema = schema.getChildren().get(unionTag);
+        castedUnion.set(unionTag, createValueRecursively(targetMemberSchema));
+        orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag,
+            intValue, stringValue, booleanValue);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown type " + schema.toString());
+    }
+  }
+
+  /**
+   * The simple API: Union tag by default set to 0.
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue,
+      String stringValue, boolean booleanValue) {
+    orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue);
+  }
+
+  /**
+   * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types.
+   * Check failure will trigger Cast exception and blow up the process.
+   */
+  @SuppressWarnings("unchecked")
+  private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v,

Review comment:
       Can we explain what this method is doing? 

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
##########
@@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader
       return ConvertTreeReaderFactory.canConvert(fileType, readerType);
     }
   }
+
+  /**
+   * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}.
+   * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the
+   * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}.
+   *
+   * For simplicity here are some assumptions:
+   * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that
+   * can be widened or shrunk to each other, please use value within small range.
+   * - For List, Map or Union, make sure there's at least one entry within the record-container.
+   * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)}
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag,
+      int intValue, String stringValue, boolean booleanValue) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        ((BooleanWritable) w).set(booleanValue);
+        break;
+      case BYTE:
+        ((ByteWritable) w).set((byte) intValue);
+        break;
+      case SHORT:
+        ((ShortWritable) w).set((short) intValue);
+        break;
+      case INT:
+        ((IntWritable) w).set(intValue);
+        break;
+      case LONG:
+        ((LongWritable) w).set(intValue);
+        break;
+      case FLOAT:
+        ((FloatWritable) w).set(intValue * 1.0f);
+        break;
+      case DOUBLE:
+        ((DoubleWritable) w).set(intValue * 1.0);
+        break;
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        ((Text) w).set(stringValue);
+        break;
+      case BINARY:
+        throw new UnsupportedOperationException("Binary type is not supported in random orc data filler");
+      case DECIMAL:
+        throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler");
+      case DATE:
+      case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
+        throw new UnsupportedOperationException(
+            "Timestamp and its derived types is not supported in random orc data filler");
+      case LIST:
+        OrcList castedList = (OrcList) w;
+        // Here it is not trivial to create typed-object in element-type. So this method expect the value container
+        // to at least contain one element, or the traversing within the list will be skipped.
+        for (Object i : castedList) {
+          orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case MAP:
+        OrcMap castedMap = (OrcMap) w;
+        for (Object entry : castedMap.entrySet()) {
+          Map.Entry<WritableComparable, WritableComparable> castedEntry =
+              (Map.Entry<WritableComparable, WritableComparable>) entry;
+          orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+          orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case STRUCT:
+        OrcStruct castedStruct = (OrcStruct) w;
+        int fieldIdx = 0;
+        for (TypeDescription child : schema.getChildren()) {
+          orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue,
+              booleanValue);
+          fieldIdx += 1;
+        }
+        break;
+      case UNION:
+        OrcUnion castedUnion = (OrcUnion) w;
+        TypeDescription targetMemberSchema = schema.getChildren().get(unionTag);
+        castedUnion.set(unionTag, createValueRecursively(targetMemberSchema));
+        orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag,
+            intValue, stringValue, booleanValue);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown type " + schema.toString());
+    }
+  }
+
+  /**
+   * The simple API: Union tag by default set to 0.
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue,
+      String stringValue, boolean booleanValue) {
+    orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue);
+  }
+
+  /**
+   * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types.
+   * Check failure will trigger Cast exception and blow up the process.
+   */
+  @SuppressWarnings("unchecked")
+  private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v,
+      TypeDescription targetSchema) {
+
+    if (w instanceof OrcStruct) {
+      upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema);
+    } else if (w instanceof OrcList) {
+      OrcList castedList = (OrcList) w;
+      OrcList targetList = (OrcList) v;
+      TypeDescription elementType = targetSchema.getChildren().get(0);
+      WritableComparable targetListRecordContainer =
+          targetList.size() > 0 ? (WritableComparable) targetList.get(0) : createValueRecursively(elementType, 0);
+      targetList.clear();
+
+      for (int i = 0; i < castedList.size(); i++) {
+        targetList.add(i,
+            structConversionHelper((WritableComparable) castedList.get(i), targetListRecordContainer, elementType));
+      }
+    } else if (w instanceof OrcMap) {
+      OrcMap castedMap = (OrcMap) w;
+      OrcMap targetMap = (OrcMap) v;
+      TypeDescription valueSchema = targetSchema.getChildren().get(1);
+
+      // Create recordContainer with the schema of value.
+      Iterator targetMapEntries = targetMap.values().iterator();
+      WritableComparable targetMapRecordContainer =
+          targetMapEntries.hasNext() ? (WritableComparable) targetMapEntries.next()
+              : createValueRecursively(valueSchema);
+
+      targetMap.clear();
+
+      for (Object entry : castedMap.entrySet()) {
+        Map.Entry<WritableComparable, WritableComparable> castedEntry =
+            (Map.Entry<WritableComparable, WritableComparable>) entry;
+
+        targetMapRecordContainer =
+            structConversionHelper(castedEntry.getValue(), targetMapRecordContainer, valueSchema);
+        targetMap.put(castedEntry.getKey(), targetMapRecordContainer);
+      }
+    } else if (w instanceof OrcUnion) {
+      OrcUnion castedUnion = (OrcUnion) w;
+      OrcUnion targetUnion = (OrcUnion) v;
+      byte tag = castedUnion.getTag();
+
+      // ORC doesn't support Union type widening
+      // Avro doesn't allow it either, reference: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
+      // As a result, member schema within source and target should be identical.
+      TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag);
+      targetUnion.set(tag, structConversionHelper((WritableComparable) castedUnion.getObject(),
+          (WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema));
+    } else {
+      // Regardless whether type-widening is happening or not, this method copy the value of w into v.
+        handlePrimitiveWritableComparable(w, v);
+    }
+
+    // If non-primitive or type-widening is required, v should already be populated by w's value recursively.
+    return v;
+  }
+
+  /**
+   * Recursively convert the {@param oldStruct} into {@param newStruct} whose schema is {@param targetSchema}.
+   * This serves similar purpose like GenericDatumReader for Avro, which accepts an reader schema and writer schema
+   * to allow users convert bytes into reader's schema in a compatible approach.
+   * Calling this method SHALL NOT cause any side-effect for {@param oldStruct}, also it will copy value of each fields
+   * in {@param oldStruct} into {@param newStruct} recursively. Please ensure avoiding unnecessary call as it could
+   * be pretty expensive if the struct schema is complicated, or contains container objects like array/map.
+   *
+   * Note that if newStruct containing things like List/Map (container-type), the up-conversion is doing two things:
+   * 1. Clear all elements in original containers.
+   * 2. Make value of container elements in {@param oldStruct} is populated into {@param newStruct} with element-type
+   * in {@param newStruct} if compatible.
+   *
+   * Limitation:
+   * 1. Does not support up-conversion of key types in Maps. The underlying reasoning is because of the primary format
+   * from upstream is Avro, which enforces key-type to be string only.
+   * 2. Conversion from a field A to field B only happens if
+   * org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B) return true.
+   */
+  @VisibleForTesting
+  public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct newStruct, TypeDescription targetSchema) {
+
+    // If target schema is not equal to newStruct's schema, it is a illegal state and doesn't make sense to work through.
+    Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema));
+
+    int indexInNewSchema = 0;
+    List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames();
+    List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren();
+    List<TypeDescription> newSchemaTypes = targetSchema.getChildren();
+
+    for (String fieldName : targetSchema.getFieldNames()) {
+      if (oldSchemaFieldNames.contains(fieldName) && oldStruct.getFieldValue(fieldName) != null) {
+        int fieldIndex = oldSchemaFieldNames.indexOf(fieldName);
+
+        TypeDescription oldFieldSchema = oldSchemaTypes.get(fieldIndex);
+        TypeDescription newFieldSchema = newSchemaTypes.get(indexInNewSchema);
+
+        if (isEvolutionValid(oldFieldSchema, newFieldSchema)) {
+          WritableComparable oldField = oldStruct.getFieldValue(fieldName);
+          WritableComparable newField = newStruct.getFieldValue(fieldName);
+          newField = (newField == null) ? OrcUtils.createValueRecursively(newFieldSchema) : newField;
+          newStruct.setFieldValue(fieldName, structConversionHelper(oldField, newField, newFieldSchema));
+        } else {
+          throw new SchemaEvolution.IllegalEvolutionException(String
+              .format("ORC does not support type conversion from file" + " type %s to reader type %s ",
+                  oldFieldSchema.toString(), newFieldSchema.toString()));
+        }
+      } else {
+        newStruct.setFieldValue(fieldName, null);
+      }
+
+      indexInNewSchema++;
+    }
+  }
+
+  /**
+   * For primitive types of {@link WritableComparable}, supporting ORC-allowed type-widening.
+   */
+  public static void handlePrimitiveWritableComparable(WritableComparable from, WritableComparable to) {
+    if (from instanceof ByteWritable) {
+      if (to instanceof ByteWritable) {
+        ((ByteWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof ShortWritable) {
+        ((ShortWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof IntWritable) {
+        ((IntWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((ByteWritable) from).get());
+        return;
+      }
+    } else if (from instanceof ShortWritable) {
+      if (to instanceof ShortWritable) {
+        ((ShortWritable) to).set(((ShortWritable) from).get());
+        return;
+      } else if (to instanceof IntWritable) {
+        ((IntWritable) to).set(((ShortWritable) from).get());
+        return;
+      } else if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((ShortWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((ShortWritable) from).get());
+        return;
+      }
+    } else if (from instanceof IntWritable) {
+      if (to instanceof IntWritable) {
+        ((IntWritable) to).set(((IntWritable) from).get());
+        return;
+      } else if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((IntWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((IntWritable) from).get());
+        return;
+      }
+    } else if (from instanceof LongWritable) {
+      if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((LongWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((LongWritable) from).get());
+        return;
+      }
+      // Following from this branch, type-widening is not allowed and only value-copy will happen.
+    } else if (from instanceof DoubleWritable) {
+      if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((DoubleWritable) from).get());
+        return;
+      }
+    } else if (from instanceof BytesWritable) {
+      if (to instanceof BytesWritable) {
+        ((BytesWritable) to).set((BytesWritable) from);
+        return;
+      }
+    } else if (from instanceof FloatWritable) {
+      if (to instanceof FloatWritable) {
+        ((FloatWritable) to).set(((FloatWritable) from).get());
+        return;
+      }
+    } else if (from instanceof Text) {
+      if (to instanceof Text) {
+        ((Text) to).set((Text) from);
+        return;
+      }
+    } else if (from instanceof DateWritable) {
+      if (to instanceof DateWritable) {
+        ((DateWritable) to).set(((DateWritable) from).get());
+        return;
+      }
+    } else if (from instanceof OrcTimestamp && to instanceof OrcTimestamp) {
+      ((OrcTimestamp) to).set(((OrcTimestamp) from).toString());
+      return;
+    } else if (from instanceof HiveDecimalWritable && to instanceof HiveDecimalWritable) {
+      ((HiveDecimalWritable) to).set(((HiveDecimalWritable) from).getHiveDecimal());
+      return;
+    }
+    throw new UnsupportedOperationException(String
+        .format("The conversion of primitive-type WritableComparable object from %s to %s is not supported",
+            from.getClass(), to.getClass()));
+  }
+
+  /**
+   * For nested structure like struct<a:array<struct<int,string>>>, calling OrcStruct.createValue doesn't create entry for the inner
+   * list, which would be required to assign a value if the entry-type has nested structure, or it just cannot see the
+   * entry's nested structure.
+   *
+   * This function should be fed back to open-source ORC.

Review comment:
       "TODO: This function should be fed back to open-source ORC."?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#discussion_r419787662



##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
##########
@@ -156,4 +184,386 @@ static boolean isEvolutionValid(TypeDescription fileType, TypeDescription reader
       return ConvertTreeReaderFactory.canConvert(fileType, readerType);
     }
   }
+
+  /**
+   * Fill in value in OrcStruct with given schema, assuming {@param w} contains the same schema as {@param schema}.
+   * {@param schema} is still necessary to given given {@param w} do contains schema information itself, because the
+   * actual value type is only available in {@link TypeDescription} but not {@link org.apache.orc.mapred.OrcValue}.
+   *
+   * For simplicity here are some assumptions:
+   * - We only give 3 primitive values and use them to construct compound values. To make it work for different types that
+   * can be widened or shrunk to each other, please use value within small range.
+   * - For List, Map or Union, make sure there's at least one entry within the record-container.
+   * you may want to try {@link #createValueRecursively(TypeDescription)} instead of {@link OrcStruct#createValue(TypeDescription)}
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int unionTag,
+      int intValue, String stringValue, boolean booleanValue) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        ((BooleanWritable) w).set(booleanValue);
+        break;
+      case BYTE:
+        ((ByteWritable) w).set((byte) intValue);
+        break;
+      case SHORT:
+        ((ShortWritable) w).set((short) intValue);
+        break;
+      case INT:
+        ((IntWritable) w).set(intValue);
+        break;
+      case LONG:
+        ((LongWritable) w).set(intValue);
+        break;
+      case FLOAT:
+        ((FloatWritable) w).set(intValue * 1.0f);
+        break;
+      case DOUBLE:
+        ((DoubleWritable) w).set(intValue * 1.0);
+        break;
+      case STRING:
+      case CHAR:
+      case VARCHAR:
+        ((Text) w).set(stringValue);
+        break;
+      case BINARY:
+        throw new UnsupportedOperationException("Binary type is not supported in random orc data filler");
+      case DECIMAL:
+        throw new UnsupportedOperationException("Decimal type is not supported in random orc data filler");
+      case DATE:
+      case TIMESTAMP:
+      case TIMESTAMP_INSTANT:
+        throw new UnsupportedOperationException(
+            "Timestamp and its derived types is not supported in random orc data filler");
+      case LIST:
+        OrcList castedList = (OrcList) w;
+        // Here it is not trivial to create typed-object in element-type. So this method expect the value container
+        // to at least contain one element, or the traversing within the list will be skipped.
+        for (Object i : castedList) {
+          orcStructFillerWithFixedValue((WritableComparable) i, schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case MAP:
+        OrcMap castedMap = (OrcMap) w;
+        for (Object entry : castedMap.entrySet()) {
+          Map.Entry<WritableComparable, WritableComparable> castedEntry =
+              (Map.Entry<WritableComparable, WritableComparable>) entry;
+          orcStructFillerWithFixedValue(castedEntry.getKey(), schema.getChildren().get(0), unionTag, intValue,
+              stringValue, booleanValue);
+          orcStructFillerWithFixedValue(castedEntry.getValue(), schema.getChildren().get(1), unionTag, intValue,
+              stringValue, booleanValue);
+        }
+        break;
+      case STRUCT:
+        OrcStruct castedStruct = (OrcStruct) w;
+        int fieldIdx = 0;
+        for (TypeDescription child : schema.getChildren()) {
+          orcStructFillerWithFixedValue(castedStruct.getFieldValue(fieldIdx), child, unionTag, intValue, stringValue,
+              booleanValue);
+          fieldIdx += 1;
+        }
+        break;
+      case UNION:
+        OrcUnion castedUnion = (OrcUnion) w;
+        TypeDescription targetMemberSchema = schema.getChildren().get(unionTag);
+        castedUnion.set(unionTag, createValueRecursively(targetMemberSchema));
+        orcStructFillerWithFixedValue((WritableComparable) castedUnion.getObject(), targetMemberSchema, unionTag,
+            intValue, stringValue, booleanValue);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown type " + schema.toString());
+    }
+  }
+
+  /**
+   * The simple API: Union tag by default set to 0.
+   */
+  public static void orcStructFillerWithFixedValue(WritableComparable w, TypeDescription schema, int intValue,
+      String stringValue, boolean booleanValue) {
+    orcStructFillerWithFixedValue(w, schema, 0, intValue, stringValue, booleanValue);
+  }
+
+  /**
+   * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types.
+   * Check failure will trigger Cast exception and blow up the process.
+   */
+  @SuppressWarnings("unchecked")
+  private static WritableComparable structConversionHelper(WritableComparable w, WritableComparable v,
+      TypeDescription targetSchema) {
+
+    if (w instanceof OrcStruct) {
+      upConvertOrcStruct((OrcStruct) w, (OrcStruct) v, targetSchema);
+    } else if (w instanceof OrcList) {
+      OrcList castedList = (OrcList) w;
+      OrcList targetList = (OrcList) v;
+      TypeDescription elementType = targetSchema.getChildren().get(0);
+      WritableComparable targetListRecordContainer =
+          targetList.size() > 0 ? (WritableComparable) targetList.get(0) : createValueRecursively(elementType, 0);
+      targetList.clear();
+
+      for (int i = 0; i < castedList.size(); i++) {
+        targetList.add(i,
+            structConversionHelper((WritableComparable) castedList.get(i), targetListRecordContainer, elementType));
+      }
+    } else if (w instanceof OrcMap) {
+      OrcMap castedMap = (OrcMap) w;
+      OrcMap targetMap = (OrcMap) v;
+      TypeDescription valueSchema = targetSchema.getChildren().get(1);
+
+      // Create recordContainer with the schema of value.
+      Iterator targetMapEntries = targetMap.values().iterator();
+      WritableComparable targetMapRecordContainer =
+          targetMapEntries.hasNext() ? (WritableComparable) targetMapEntries.next()
+              : createValueRecursively(valueSchema);
+
+      targetMap.clear();
+
+      for (Object entry : castedMap.entrySet()) {
+        Map.Entry<WritableComparable, WritableComparable> castedEntry =
+            (Map.Entry<WritableComparable, WritableComparable>) entry;
+
+        targetMapRecordContainer =
+            structConversionHelper(castedEntry.getValue(), targetMapRecordContainer, valueSchema);
+        targetMap.put(castedEntry.getKey(), targetMapRecordContainer);
+      }
+    } else if (w instanceof OrcUnion) {
+      OrcUnion castedUnion = (OrcUnion) w;
+      OrcUnion targetUnion = (OrcUnion) v;
+      byte tag = castedUnion.getTag();
+
+      // ORC doesn't support Union type widening
+      // Avro doesn't allow it either, reference: https://avro.apache.org/docs/current/spec.html#Schema+Resolution
+      // As a result, member schema within source and target should be identical.
+      TypeDescription targetMemberSchema = targetSchema.getChildren().get(tag);
+      targetUnion.set(tag, structConversionHelper((WritableComparable) castedUnion.getObject(),
+          (WritableComparable) OrcUtils.createValueRecursively(targetMemberSchema), targetMemberSchema));
+    } else {
+      // Regardless whether type-widening is happening or not, this method copy the value of w into v.
+        handlePrimitiveWritableComparable(w, v);
+    }
+
+    // If non-primitive or type-widening is required, v should already be populated by w's value recursively.
+    return v;
+  }
+
+  /**
+   * Recursively convert the {@param oldStruct} into {@param newStruct} whose schema is {@param targetSchema}.
+   * This serves similar purpose like GenericDatumReader for Avro, which accepts an reader schema and writer schema
+   * to allow users convert bytes into reader's schema in a compatible approach.
+   * Calling this method SHALL NOT cause any side-effect for {@param oldStruct}, also it will copy value of each fields
+   * in {@param oldStruct} into {@param newStruct} recursively. Please ensure avoiding unnecessary call as it could
+   * be pretty expensive if the struct schema is complicated, or contains container objects like array/map.
+   *
+   * Note that if newStruct containing things like List/Map (container-type), the up-conversion is doing two things:
+   * 1. Clear all elements in original containers.
+   * 2. Make value of container elements in {@param oldStruct} is populated into {@param newStruct} with element-type
+   * in {@param newStruct} if compatible.
+   *
+   * Limitation:
+   * 1. Does not support up-conversion of key types in Maps. The underlying reasoning is because of the primary format
+   * from upstream is Avro, which enforces key-type to be string only.
+   * 2. Conversion from a field A to field B only happens if
+   * org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid(A,B) return true.
+   */
+  @VisibleForTesting
+  public static void upConvertOrcStruct(OrcStruct oldStruct, OrcStruct newStruct, TypeDescription targetSchema) {
+
+    // If target schema is not equal to newStruct's schema, it is a illegal state and doesn't make sense to work through.
+    Preconditions.checkArgument(newStruct.getSchema().equals(targetSchema));
+
+    int indexInNewSchema = 0;
+    List<String> oldSchemaFieldNames = oldStruct.getSchema().getFieldNames();
+    List<TypeDescription> oldSchemaTypes = oldStruct.getSchema().getChildren();
+    List<TypeDescription> newSchemaTypes = targetSchema.getChildren();
+
+    for (String fieldName : targetSchema.getFieldNames()) {
+      if (oldSchemaFieldNames.contains(fieldName) && oldStruct.getFieldValue(fieldName) != null) {
+        int fieldIndex = oldSchemaFieldNames.indexOf(fieldName);
+
+        TypeDescription oldFieldSchema = oldSchemaTypes.get(fieldIndex);
+        TypeDescription newFieldSchema = newSchemaTypes.get(indexInNewSchema);
+
+        if (isEvolutionValid(oldFieldSchema, newFieldSchema)) {
+          WritableComparable oldField = oldStruct.getFieldValue(fieldName);
+          WritableComparable newField = newStruct.getFieldValue(fieldName);
+          newField = (newField == null) ? OrcUtils.createValueRecursively(newFieldSchema) : newField;
+          newStruct.setFieldValue(fieldName, structConversionHelper(oldField, newField, newFieldSchema));
+        } else {
+          throw new SchemaEvolution.IllegalEvolutionException(String
+              .format("ORC does not support type conversion from file" + " type %s to reader type %s ",
+                  oldFieldSchema.toString(), newFieldSchema.toString()));
+        }
+      } else {
+        newStruct.setFieldValue(fieldName, null);
+      }
+
+      indexInNewSchema++;
+    }
+  }
+
+  /**
+   * For primitive types of {@link WritableComparable}, supporting ORC-allowed type-widening.
+   */
+  public static void handlePrimitiveWritableComparable(WritableComparable from, WritableComparable to) {
+    if (from instanceof ByteWritable) {
+      if (to instanceof ByteWritable) {
+        ((ByteWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof ShortWritable) {
+        ((ShortWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof IntWritable) {
+        ((IntWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((ByteWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((ByteWritable) from).get());
+        return;
+      }
+    } else if (from instanceof ShortWritable) {
+      if (to instanceof ShortWritable) {
+        ((ShortWritable) to).set(((ShortWritable) from).get());
+        return;
+      } else if (to instanceof IntWritable) {
+        ((IntWritable) to).set(((ShortWritable) from).get());
+        return;
+      } else if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((ShortWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((ShortWritable) from).get());
+        return;
+      }
+    } else if (from instanceof IntWritable) {
+      if (to instanceof IntWritable) {
+        ((IntWritable) to).set(((IntWritable) from).get());
+        return;
+      } else if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((IntWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((IntWritable) from).get());
+        return;
+      }
+    } else if (from instanceof LongWritable) {
+      if (to instanceof LongWritable) {
+        ((LongWritable) to).set(((LongWritable) from).get());
+        return;
+      } else if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((LongWritable) from).get());
+        return;
+      }
+      // Following from this branch, type-widening is not allowed and only value-copy will happen.
+    } else if (from instanceof DoubleWritable) {
+      if (to instanceof DoubleWritable) {
+        ((DoubleWritable) to).set(((DoubleWritable) from).get());
+        return;
+      }
+    } else if (from instanceof BytesWritable) {
+      if (to instanceof BytesWritable) {
+        ((BytesWritable) to).set((BytesWritable) from);
+        return;
+      }
+    } else if (from instanceof FloatWritable) {
+      if (to instanceof FloatWritable) {
+        ((FloatWritable) to).set(((FloatWritable) from).get());
+        return;
+      }
+    } else if (from instanceof Text) {
+      if (to instanceof Text) {
+        ((Text) to).set((Text) from);
+        return;
+      }
+    } else if (from instanceof DateWritable) {
+      if (to instanceof DateWritable) {
+        ((DateWritable) to).set(((DateWritable) from).get());
+        return;
+      }
+    } else if (from instanceof OrcTimestamp && to instanceof OrcTimestamp) {
+      ((OrcTimestamp) to).set(((OrcTimestamp) from).toString());
+      return;
+    } else if (from instanceof HiveDecimalWritable && to instanceof HiveDecimalWritable) {
+      ((HiveDecimalWritable) to).set(((HiveDecimalWritable) from).getHiveDecimal());
+      return;
+    }
+    throw new UnsupportedOperationException(String
+        .format("The conversion of primitive-type WritableComparable object from %s to %s is not supported",
+            from.getClass(), to.getClass()));
+  }
+
+  /**
+   * For nested structure like struct<a:array<struct<int,string>>>, calling OrcStruct.createValue doesn't create entry for the inner
+   * list, which would be required to assign a value if the entry-type has nested structure, or it just cannot see the
+   * entry's nested structure.
+   *
+   * This function should be fed back to open-source ORC.

Review comment:
       A side note: I think this actually could be the reason on why we are seeing one schema evolution failure in complex-nested schema on ORC reader. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2966: [GOBBLIN-1126] Make ORC compaction shuffle key configurable

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2966:
URL: https://github.com/apache/incubator-gobblin/pull/2966#discussion_r419748400



##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcKeyDedupReducer.java
##########
@@ -37,6 +53,30 @@ protected void setOutKey(OrcValue valueToRetain) {
     // do nothing since initReusableObject has assigned value for outKey.
   }
 
+  @Override
+  protected void reduce(OrcKey key, Iterable<OrcValue> values, Context context)
+      throws IOException, InterruptedException {
+
+    /* Map from hash of value(Typed in OrcStruct) object to its times of duplication*/
+    Map<Integer, Integer> valuesToRetain = new HashMap<>();
+    int valueHash = 0;
+
+    for (OrcValue value : values) {

Review comment:
       The benefit is during the sorting phase and MR is going to compare key, the comparison is down to a much shortened object instead of the whole object (and we get rid of complex column like array , map, etc.).  This is improving map throughput quite a bit, and on the reducer side we are not doing much more if the shuffle key is selected properly: We chose uuid + header.time and chances for two different records to collide on this pair is pretty low. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org