You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/24 18:35:25 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #7050: [MINOR] update rfc46 doc

alexeykudinkin commented on code in PR #7050:
URL: https://github.com/apache/hudi/pull/7050#discussion_r1003628096


##########
rfc/rfc-46/rfc-46.md:
##########
@@ -74,49 +74,115 @@ Following (high-level) steps are proposed:
    2. Split into interface and engine-specific implementations (holding internal engine-specific representation of the payload) 
    3. Implementing new standardized record-level APIs (like `getPartitionKey` , `getRecordKey`, etc)
    4. Staying **internal** component, that will **NOT** contain any user-defined semantic (like merging)
-2. Extract Record Combining (Merge) API from `HoodieRecordPayload` into a standalone, stateless component (engine). Such component will be
+2. Extract Record Merge API from `HoodieRecordPayload` into a standalone, stateless component. Such component will be
    1. Abstracted as stateless object providing API to combine records (according to predefined semantics) for engines (Spark, Flink) of interest
    2. Plug-in point for user-defined combination semantics
 3. Gradually deprecate, phase-out and eventually remove `HoodieRecordPayload` abstraction
 
 Phasing out usage of `HoodieRecordPayload` will also bring the benefit of avoiding to use Java reflection in the hot-path, which
 is known to have poor performance (compared to non-reflection based instantiation).
 
-#### Combine API Engine
+#### Record Merge API
 
-Stateless component interface providing for API Combining Records will look like following:
+CombineAndGetUpdateValue and Precombine will converge to one API. Stateless component interface providing for API Combining Records will look like following:
 
 ```java
-interface HoodieRecordCombiningEngine {
-  
-  default HoodieRecord precombine(HoodieRecord older, HoodieRecord newer) {
-    if (spark) {
-      precombineSpark((SparkHoodieRecord) older, (SparkHoodieRecord) newer);
-    } else if (flink) {
-      // precombine for Flink
-    }
-  }
+interface HoodieRecordMerger {
 
    /**
-    * Spark-specific implementation 
+    * The kind of merging strategy this recordMerger belongs to. A UUID represents merging strategy.
     */
-  SparkHoodieRecord precombineSpark(SparkHoodieRecord older, SparkHoodieRecord newer);
+   String getMergingStrategy();
   
-  // ...
+   // This method converges combineAndGetUpdateValue and precombine from HoodiePayload. 
+   // It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C of the single record, both orders of operations applications have to yield the same result)
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;
+   
+   // The record type handled by the current merger
+   // SPARK, AVRO, FLINK
+   HoodieRecordType getRecordType();
+}
+
+/**
+ * Spark-specific implementation 
+ */
+class HoodieSparkRecordMerger implements HoodieRecordMerger {
+
+  @Override
+  public String getMergingStrategy() {
+    return UUID_MERGER_STRATEGY;
+  }
+  
+   @Override
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+     // HoodieSparkRecord precombine and combineAndGetUpdateValue. It'd be associative operation.
+   }
+
+   @Override
+   HoodieRecordType getRecordType() {
+     return HoodieRecordType.SPARK;
+   }
+}
+   
+/**
+ * Flink-specific implementation 
+ */
+class HoodieFlinkRecordMerger implements HoodieRecordMerger {
+
+   @Override
+   public String getMergingStrategy() {
+      return UUID_MERGER_STRATEGY;

Review Comment:
   nit: Let's name this `LATEST_RECORD_MERGING_STRATEGY` to hint at what merging strategy really is



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -74,49 +74,115 @@ Following (high-level) steps are proposed:
    2. Split into interface and engine-specific implementations (holding internal engine-specific representation of the payload) 
    3. Implementing new standardized record-level APIs (like `getPartitionKey` , `getRecordKey`, etc)
    4. Staying **internal** component, that will **NOT** contain any user-defined semantic (like merging)
-2. Extract Record Combining (Merge) API from `HoodieRecordPayload` into a standalone, stateless component (engine). Such component will be
+2. Extract Record Merge API from `HoodieRecordPayload` into a standalone, stateless component. Such component will be
    1. Abstracted as stateless object providing API to combine records (according to predefined semantics) for engines (Spark, Flink) of interest
    2. Plug-in point for user-defined combination semantics
 3. Gradually deprecate, phase-out and eventually remove `HoodieRecordPayload` abstraction
 
 Phasing out usage of `HoodieRecordPayload` will also bring the benefit of avoiding to use Java reflection in the hot-path, which
 is known to have poor performance (compared to non-reflection based instantiation).
 
-#### Combine API Engine
+#### Record Merge API
 
-Stateless component interface providing for API Combining Records will look like following:
+CombineAndGetUpdateValue and Precombine will converge to one API. Stateless component interface providing for API Combining Records will look like following:
 
 ```java
-interface HoodieRecordCombiningEngine {
-  
-  default HoodieRecord precombine(HoodieRecord older, HoodieRecord newer) {
-    if (spark) {
-      precombineSpark((SparkHoodieRecord) older, (SparkHoodieRecord) newer);
-    } else if (flink) {
-      // precombine for Flink
-    }
-  }
+interface HoodieRecordMerger {
 
    /**
-    * Spark-specific implementation 
+    * The kind of merging strategy this recordMerger belongs to. A UUID represents merging strategy.
     */
-  SparkHoodieRecord precombineSpark(SparkHoodieRecord older, SparkHoodieRecord newer);
+   String getMergingStrategy();

Review Comment:
   nit: let's name this `getMerginStrategyId`
   
   Let's also update the docs elaborating what this id is: 
    - Every RecordMerger implementation being engine-specific (referred to as "implementation"), implements particular merging semantic (referred to as "merging strategy")
    - Such tiering allowing us to be flexible in terms of providing implementations for the merging strategy only for engines you might be interested in
    - Merging strategy is a table property that is set once during init
    - Merging implementations could be configured for each write individually



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -128,21 +194,73 @@ Following major components will be refactored:
 
 1. `HoodieWriteHandle`s will be  
    1. Accepting `HoodieRecord` instead of raw Avro payload (avoiding Avro conversion)
-   2. Using Combining API engine to merge records (when necessary) 
+   2. Using Record Merge API to merge records (when necessary) 
    3. Passes `HoodieRecord` as is to `FileWriter`
 2. `HoodieFileWriter`s will be 
    1. Accepting `HoodieRecord`
    2. Will be engine-specific (so that they're able to handle internal record representation)
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for RecordMerger
+The RecordMerger is engine-aware. We provide a config called HoodieWriteConfig.MERGER_IMPLS. You can set a list of RecordMerger class name to it. And you can set HoodieWriteConfig.MERGER_STRATEGY which is UUID of RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has the same MERGER_STRATEGY according to the engine type at runtime.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to implement functionality similar to AvroUtils in HoodieRecord for different data(avro, InternalRow, RowData).
+Its public API will look like following:
+
+```java
+import java.util.Properties;
+
+class HoodieRecord {
+
+   /**
+    * Get column in record to support RDDCustomColumnsSortPartitioner
+    */
+   ComparableList getComparableColumnValues(Schema recordSchema, String[] columns,

Review Comment:
   We made this `Object[] getColumnValues`, didn't we?



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -128,21 +194,73 @@ Following major components will be refactored:
 
 1. `HoodieWriteHandle`s will be  
    1. Accepting `HoodieRecord` instead of raw Avro payload (avoiding Avro conversion)
-   2. Using Combining API engine to merge records (when necessary) 
+   2. Using Record Merge API to merge records (when necessary) 
    3. Passes `HoodieRecord` as is to `FileWriter`
 2. `HoodieFileWriter`s will be 
    1. Accepting `HoodieRecord`
    2. Will be engine-specific (so that they're able to handle internal record representation)
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for RecordMerger
+The RecordMerger is engine-aware. We provide a config called HoodieWriteConfig.MERGER_IMPLS. You can set a list of RecordMerger class name to it. And you can set HoodieWriteConfig.MERGER_STRATEGY which is UUID of RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has the same MERGER_STRATEGY according to the engine type at runtime.

Review Comment:
   Let's elaborate on what is relation b/w RM impls and Merging Strategy. Hinted above at the points we should call out.



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -74,49 +74,115 @@ Following (high-level) steps are proposed:
    2. Split into interface and engine-specific implementations (holding internal engine-specific representation of the payload) 
    3. Implementing new standardized record-level APIs (like `getPartitionKey` , `getRecordKey`, etc)
    4. Staying **internal** component, that will **NOT** contain any user-defined semantic (like merging)
-2. Extract Record Combining (Merge) API from `HoodieRecordPayload` into a standalone, stateless component (engine). Such component will be
+2. Extract Record Merge API from `HoodieRecordPayload` into a standalone, stateless component. Such component will be
    1. Abstracted as stateless object providing API to combine records (according to predefined semantics) for engines (Spark, Flink) of interest
    2. Plug-in point for user-defined combination semantics
 3. Gradually deprecate, phase-out and eventually remove `HoodieRecordPayload` abstraction
 
 Phasing out usage of `HoodieRecordPayload` will also bring the benefit of avoiding to use Java reflection in the hot-path, which
 is known to have poor performance (compared to non-reflection based instantiation).
 
-#### Combine API Engine
+#### Record Merge API
 
-Stateless component interface providing for API Combining Records will look like following:
+CombineAndGetUpdateValue and Precombine will converge to one API. Stateless component interface providing for API Combining Records will look like following:
 
 ```java
-interface HoodieRecordCombiningEngine {
-  
-  default HoodieRecord precombine(HoodieRecord older, HoodieRecord newer) {
-    if (spark) {
-      precombineSpark((SparkHoodieRecord) older, (SparkHoodieRecord) newer);
-    } else if (flink) {
-      // precombine for Flink
-    }
-  }
+interface HoodieRecordMerger {
 
    /**
-    * Spark-specific implementation 
+    * The kind of merging strategy this recordMerger belongs to. A UUID represents merging strategy.
     */
-  SparkHoodieRecord precombineSpark(SparkHoodieRecord older, SparkHoodieRecord newer);
+   String getMergingStrategy();
   
-  // ...
+   // This method converges combineAndGetUpdateValue and precombine from HoodiePayload. 
+   // It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C of the single record, both orders of operations applications have to yield the same result)
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;
+   
+   // The record type handled by the current merger
+   // SPARK, AVRO, FLINK
+   HoodieRecordType getRecordType();
+}
+
+/**
+ * Spark-specific implementation 
+ */
+class HoodieSparkRecordMerger implements HoodieRecordMerger {
+
+  @Override
+  public String getMergingStrategy() {
+    return UUID_MERGER_STRATEGY;
+  }
+  
+   @Override
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+     // HoodieSparkRecord precombine and combineAndGetUpdateValue. It'd be associative operation.
+   }
+
+   @Override
+   HoodieRecordType getRecordType() {
+     return HoodieRecordType.SPARK;
+   }
+}
+   
+/**
+ * Flink-specific implementation 
+ */
+class HoodieFlinkRecordMerger implements HoodieRecordMerger {
+
+   @Override
+   public String getMergingStrategy() {
+      return UUID_MERGER_STRATEGY;
+   }
+  
+   @Override
+   Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
+      // HoodieFlinkRecord precombine and combineAndGetUpdateValue. It'd be associative operation.
+   }
+
+   @Override
+   HoodieRecordType getRecordType() {
+      return HoodieRecordType.FLINK;
+   }
 }
 ```
 Where user can provide their own subclass implementing such interface for the engines of interest.
 
-#### Migration from `HoodieRecordPayload` to `HoodieRecordCombiningEngine`
+#### Migration from `HoodieRecordPayload` to `HoodieRecordMerger`
 
 To warrant backward-compatibility (BWC) on the code-level with already created subclasses of `HoodieRecordPayload` currently
-already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of `HoodieRecordCombiningEngine`, that will 
+already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of `HoodieRecordMerger` called `HoodieAvroRecordMerger`, that will 
 be using user-defined subclass of `HoodieRecordPayload` to combine the records.
 
-Leveraging such bridge will make provide for seamless BWC migration to the 0.11 release, however will be removing the performance 
+Leveraging such bridge will provide for seamless BWC migration to the 0.11 release, however will be removing the performance 
 benefit of this refactoring, since it would unavoidably have to perform conversion to intermediate representation (Avro). To realize
 full-suite of benefits of this refactoring, users will have to migrate their merging logic out of `HoodieRecordPayload` subclass and into
-new `HoodieRecordCombiningEngine` implementation.
+new `HoodieRecordMerger` implementation.
+
+Precombine is used to merge records from logs or incoming records; CombineAndGetUpdateValue is used to merge record from log file and record from base file.
+these two merge logics are unified in HoodieAvroRecordMerger as merge function. `HoodieAvroRecordMerger`'s API will look like following:

Review Comment:
   "unified in HoodieRecordMerger implementation"



##########
rfc/rfc-46/rfc-46.md:
##########
@@ -128,21 +194,73 @@ Following major components will be refactored:
 
 1. `HoodieWriteHandle`s will be  
    1. Accepting `HoodieRecord` instead of raw Avro payload (avoiding Avro conversion)
-   2. Using Combining API engine to merge records (when necessary) 
+   2. Using Record Merge API to merge records (when necessary) 
    3. Passes `HoodieRecord` as is to `FileWriter`
 2. `HoodieFileWriter`s will be 
    1. Accepting `HoodieRecord`
    2. Will be engine-specific (so that they're able to handle internal record representation)
 3. `HoodieRealtimeRecordReader`s 
    1. API will be returning opaque `HoodieRecord` instead of raw Avro payload
 
+### Config for RecordMerger
+The RecordMerger is engine-aware. We provide a config called HoodieWriteConfig.MERGER_IMPLS. You can set a list of RecordMerger class name to it. And you can set HoodieWriteConfig.MERGER_STRATEGY which is UUID of RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has the same MERGER_STRATEGY according to the engine type at runtime.
+
+### Public Api in HoodieRecord
+Because we implement different types of records, we need to implement functionality similar to AvroUtils in HoodieRecord for different data(avro, InternalRow, RowData).

Review Comment:
   "for different engine-specific payload representations (GenericRecord, InternalRow, RowData)"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org