You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/08/05 22:21:44 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #3401: [HUDI-2170] Always choose the latest record for HoodieRecordPayload

vinothchandar commented on a change in pull request #3401:
URL: https://github.com/apache/hudi/pull/3401#discussion_r683820637



##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
##########
@@ -231,6 +229,83 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void testHoodieMergeHandlePreCombine(ExternalSpillableMap.DiskMapType diskMapType,
+                                              boolean isCompressionEnabled) throws Exception {
+    // Create records in a single partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
+
+    // Build a common config with diff configs
+    Properties properties = new Properties();
+    properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType.name());
+    properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), String.valueOf(isCompressionEnabled));
+    metaClient.getTableConfig().setValue(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP, "timestamp");
+
+    // Build a write config with insert parallelism set
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withProperties(properties)
+        .build();
+
+    // test1: combine before insert, small preCombineField value record will be merged and replaced.
+    cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "true");
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      List<WriteStatus> statuses = client.insert(writeRecords, newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+      Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime);
+      Row[] rows = (Row[]) dataSet.collect();
+      for (Row row : rows) {
+        if (row.getAs("_hoodie_record_key").equals("01")) {
+          assertEquals((Long) row.getAs("timestamp"), 4, "Record with greater preCombine field is chosen");
+        } else if (row.getAs("_hoodie_record_key").equals("02")) {
+          assertEquals((Long) row.getAs("timestamp"), 5, "Record with equal preCombine field, New arrival record is chosen");
+        } else if (row.getAs("_hoodie_record_key").equals("03")) {
+          assertEquals((Long) row.getAs("timestamp"), 6, "Record with greater preCombine field is chosen");
+        }
+      }
+      assertEquals(5, dataSet.count(),
+          "Must contain 5 records, because three records be combined");
+    }
+
+    //test2 not combine before insert, and combine will happen on HoodieMergeHandle.
+    cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "false");
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
+      //Combine before insert be set false, and combine will happen on HoodieMergeHandle.
+      String newCommitTime = "002";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      List<WriteStatus> statuses = client.insert(writeRecords, newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+      Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime);
+      Row[] rows = (Row[]) dataSet.collect();
+      for (Row row : rows) {

Review comment:
       can we reuse the asserts from here?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
##########
@@ -231,6 +229,83 @@ public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMap
     }
   }
 
+  @ParameterizedTest
+  @MethodSource("testArguments")
+  public void testHoodieMergeHandlePreCombine(ExternalSpillableMap.DiskMapType diskMapType,
+                                              boolean isCompressionEnabled) throws Exception {
+    // Create records in a single partition
+    String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
+    dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
+
+    // Build a common config with diff configs
+    Properties properties = new Properties();
+    properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType.name());
+    properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), String.valueOf(isCompressionEnabled));
+    metaClient.getTableConfig().setValue(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD_PROP, "timestamp");
+
+    // Build a write config with insert parallelism set
+    HoodieWriteConfig cfg = getConfigBuilder()
+        .withProperties(properties)
+        .build();
+
+    // test1: combine before insert, small preCombineField value record will be merged and replaced.
+    cfg.setValue(COMBINE_BEFORE_INSERT_PROP, "true");
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateRecordsWithTimestampInsert(newCommitTime);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+      List<WriteStatus> statuses = client.insert(writeRecords, newCommitTime).collect();
+      assertNoWriteErrors(statuses);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+      Dataset<Row> dataSet = HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime);
+      Row[] rows = (Row[]) dataSet.collect();
+      for (Row row : rows) {
+        if (row.getAs("_hoodie_record_key").equals("01")) {

Review comment:
       use teh `HoodieRecord.` members?

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -70,6 +70,10 @@
   private int type = 0;
   private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
 
+  public HoodieMetadataPayload(GenericRecord record, Comparable<?> orderingVal) {

Review comment:
       would all existing payloads be forced to implement this new constructor?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org