You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/10/02 16:53:59 UTC

[GitHub] [hudi] bvaradar opened a new pull request #2141: [HUDI-898] Add new backwards compatible API to expose schema in preCombine

bvaradar opened a new pull request #2141:
URL: https://github.com/apache/hudi/pull/2141


   
   This is to let users implement custom deduping-merge logic.
    
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] bvaradar commented on a change in pull request #2141: [HUDI-898] Add new backwards compatible API to expose schema in preCombine

Posted by GitBox <gi...@apache.org>.
bvaradar commented on a change in pull request #2141:
URL: https://github.com/apache/hudi/pull/2141#discussion_r498938639



##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkWriteHelper.java
##########
@@ -49,21 +52,48 @@ public static SparkWriteHelper newInstance() {
   @Override
   public JavaRDD<HoodieRecord<T>> deduplicateRecords(JavaRDD<HoodieRecord<T>> records,
                                                      HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> index,
-                                                     int parallelism) {
-    boolean isIndexingGlobal = index.isGlobal();
-    return records.mapToPair(record -> {
-      HoodieKey hoodieKey = record.getKey();
-      // If index used is global, then records are expected to differ in their partitionPath
-      Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey;
-      return new Tuple2<>(key, record);
-    }).reduceByKey((rec1, rec2) -> {
-      @SuppressWarnings("unchecked")
-      T reducedData = (T) rec1.getData().preCombine(rec2.getData());
-      // we cannot allow the user to change the key or partitionPath, since that will affect
-      // everything
-      // so pick it from one of the records.
-      return new HoodieRecord<T>(rec1.getKey(), reducedData);
-    }, parallelism).map(Tuple2::_2);
+                                                     HoodieWriteConfig writeConfig, int parallelism) {
+    return new RecordDeduper<T>(index.isGlobal(), writeConfig.getSchema(), parallelism).deduplicateRecords(records);
   }
 
+  /**
+   * Helper class to dedupe records.
+   * @param <T>
+   */
+  private static class RecordDeduper<T extends HoodieRecordPayload> implements Serializable {

Review comment:
       Made this to a class to take advantatage of lazy creation of Schema in executors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar closed pull request #2141: [HUDI-898] Add new backwards compatible API to expose schema in preCombine

Posted by GitBox <gi...@apache.org>.
vinothchandar closed pull request #2141:
URL: https://github.com/apache/hudi/pull/2141


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2141: [HUDI-898] Add new backwards compatible API to expose schema in preCombine

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2141:
URL: https://github.com/apache/hudi/pull/2141#issuecomment-777649890


   @bvaradar : can you check my previous comment. We can close this out if not required. If not, let me know.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #2141: [HUDI-898] Add new backwards compatible API to expose schema in preCombine

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #2141:
URL: https://github.com/apache/hudi/pull/2141#discussion_r499274425



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java
##########
@@ -41,9 +41,23 @@
    * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to
    * insert/upsert (if combining turned on in HoodieClientConfig).
    */
-  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  @Deprecated
   T preCombine(T another);
 
+  /**
+   * When more than one HoodieRecord have the same HoodieKey, this function combines them before attempting to
+   * insert/upsert (if combining turned on in HoodieClientConfig).
+   * Any custom Record payload implementation that requires schema must override this method.
+   *
+   * @param another Record payload to combine with.
+   * @param schema  Schema used for writing to dataset.
+   * @return Combined Record.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
+  default T preCombine(T another, Schema schema) {

Review comment:
       Could we just introduce a config that simply uses `combineAndGetUpdateValue()` to perform the preCombine as well. We have debated doing this for a while. Do you see a reason not to do something like that.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #2141: [HUDI-898] Add new backwards compatible API to expose schema in preCombine

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #2141:
URL: https://github.com/apache/hudi/pull/2141#issuecomment-751317929


   @bvaradar : we have already introduced properties as an arg to all apis as per https://github.com/apache/hudi/pull/2311. Do you think we can redo this patch based on this new api. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2141: [HUDI-898] Add new backwards compatible API to expose schema in preCombine

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2141:
URL: https://github.com/apache/hudi/pull/2141#issuecomment-716076733


   cc @Karl-WangSK 
   
   @bvaradar please see the other PR #2106 for a similar discussion.  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org