You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/14 03:43:07 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6476: [HUDI-3478] Support CDC for Spark in Hudi

alexeykudinkin commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r970277676


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -273,6 +283,33 @@ protected HoodieFileWriter createNewFileWriter(String instantTime, Path path, Ho
     return HoodieFileWriterFactory.getFileWriter(instantTime, path, hoodieTable, config, schema, taskContextSupplier);
   }
 
+  protected HoodieLogFormat.Writer createLogWriter(
+      Option<FileSlice> fileSlice, String baseCommitTime) throws IOException {
+    int logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
+    long logFileSize = 0L;
+    String logWriteToken = writeToken;
+    if (fileSlice.isPresent()) {
+      Option<HoodieLogFile> latestLogFileOpt = fileSlice.get().getLatestLogFile();
+      if (latestLogFileOpt.isPresent()) {
+        HoodieLogFile latestLogFile = latestLogFileOpt.get();
+        logVersion = latestLogFile.getLogVersion();
+        logFileSize = latestLogFile.getFileSize();
+        logWriteToken = FSUtils.getWriteTokenFromLogPath(latestLogFile.getPath());
+      }
+    }
+    return HoodieLogFormat.newWriterBuilder()
+        .onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
+        .withFileId(fileId)
+        .overBaseCommit(baseCommitTime)
+        .withLogVersion(logVersion)
+        .withFileSize(logFileSize)
+        .withSizeThreshold(config.getLogFileMaxSize())
+        .withFs(fs)
+        .withRolloverLogWriteToken(writeToken)
+        .withLogWriteToken(logWriteToken)
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();

Review Comment:
   Are we mixing `CDCBlocks` w/ normal Delta Data Blocks? 
   
   I don't think we can do that as this will severely affect performance for pure non-CDC queries for MOR tables



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord cdcRecord(HoodieCDCOperation operation, String recordKey, String partitionPath,
+                                         GenericRecord oldRecord, GenericRecord newRecord) {
+    GenericData.Record record;
+    if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER)) {
+      record = CDCUtils.cdcRecord(operation.getValue(), instantTime,
+          oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
+    } else if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE)) {
+      record = CDCUtils.cdcRecord(operation.getValue(), recordKey, oldRecord);
+    } else {
+      record = CDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return new SerializableRecord(record);
+  }
+
+  protected GenericRecord addCommitMetadata(GenericRecord record, String recordKey, String partitionPath) {
+    if (record != null && config.populateMetaFields()) {
+      GenericRecord rewriteRecord = rewriteRecord(record);
+      String seqId = HoodieRecord.generateSequenceId(instantTime, getPartitionId(), writtenRecordCount.get());
+      HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, seqId);
+      HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, partitionPath, newFilePath.getName());
+      return rewriteRecord;
+    }
+    return record;
+  }
+
+  protected Option<AppendResult> writeCDCData() {
+    if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
+      // the following cases where we do not need to write out the cdc file:
+      // case 1: all the data from the previous file slice are deleted. and no new data is inserted;
+      // case 2: all the data are new-coming,
+      return Option.empty();
+    }

Review Comment:
   Second that: CDC is non-trivial logic that will be proliferated across Writing Handles, therefore we should encapsulate its state along w/ well-defined API w/in single component



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -102,6 +118,15 @@
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter<IndexedRecord> fileWriter;
+  // a flag that indicate whether allow the change data to write out a cdc log file.
+  protected boolean cdcEnabled = false;
+  protected String cdcSupplementalLoggingMode;

Review Comment:
   Why is this a String?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -281,7 +318,19 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord ol
         return false;
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete);
+    boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
+    if (cdcEnabled) {
+      String recordKey = StringUtils.join(

Review Comment:
   This might not be matching the record key. We should be using proper record-key in here.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +453,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord createCDCRecord(HoodieCDCOperation operation, String recordKey, String partitionPath,
+                                               GenericRecord oldRecord, GenericRecord newRecord) {
+    GenericData.Record record;
+    if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER)) {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), instantTime,
+          oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
+    } else if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE)) {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey, oldRecord);
+    } else {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return new SerializableRecord(record);
+  }
+
+  protected GenericRecord addCommitMetadata(GenericRecord record, String recordKey, String partitionPath) {

Review Comment:
   Why are we adding the metadata to CDC payload? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +453,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord createCDCRecord(HoodieCDCOperation operation, String recordKey, String partitionPath,
+                                               GenericRecord oldRecord, GenericRecord newRecord) {
+    GenericData.Record record;
+    if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER)) {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), instantTime,
+          oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
+    } else if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE)) {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey, oldRecord);
+    } else {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return new SerializableRecord(record);
+  }
+
+  protected GenericRecord addCommitMetadata(GenericRecord record, String recordKey, String partitionPath) {
+    if (record != null && config.populateMetaFields()) {
+      GenericRecord rewriteRecord = rewriteRecord(record);
+      String seqId = HoodieRecord.generateSequenceId(instantTime, getPartitionId(), writtenRecordCount.get());
+      HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, seqId);
+      HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, partitionPath, newFilePath.getName());
+      return rewriteRecord;
+    }
+    return record;
+  }
+
+  protected Option<AppendResult> writeCDCData() {
+    if (!cdcEnabled || cdcData.isEmpty() || recordsWritten == 0L || (recordsWritten == insertRecordsWritten)) {
+      // the following cases where we do not need to write out the cdc file:
+      // case 1: all the data from the previous file slice are deleted. and no new data is inserted;
+      // case 2: all the data are new-coming,
+      return Option.empty();
+    }
+    try {
+      String keyField = config.populateMetaFields()
+          ? HoodieRecord.RECORD_KEY_METADATA_FIELD
+          : hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
+      if (cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER)) {

Review Comment:
   Let's abstract this conditional as a method (i believe you already have one below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org