You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/08/05 23:20:24 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #3285: [HUDI-1771] Propagate CDC format for hoodie

vinothchandar commented on a change in pull request #3285:
URL: https://github.com/apache/hudi/pull/3285#discussion_r683844723



##########
File path: hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
##########
@@ -620,23 +633,31 @@ public boolean reachedEnd() throws IOException {
       while (logKeysIterator.hasNext()) {
         final String curKey = logKeysIterator.next();
         if (!keyToSkip.contains(curKey)) {
-          Option<IndexedRecord> insertAvroRecord =
-              scanner.getRecords().get(curKey).getData().getInsertValue(tableSchema);
+          Option<IndexedRecord> insertAvroRecord = getInsetValue(curKey);
           if (insertAvroRecord.isPresent()) {
             // the record is a DELETE if insertAvroRecord not present, skipping
-            GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
+            GenericRecord avroRecord = buildAvroRecordBySchema(
                 insertAvroRecord.get(),
                 requiredSchema,
                 requiredPos,
                 recordBuilder);
-            this.currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
+            this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord);
+            this.currentRecord.setRowKind(FormatUtils.getRowKind(insertAvroRecord.get(), this.operationPos));
             return false;
           }
         }
       }
       return true;
     }
 
+    private Option<IndexedRecord> getInsetValue(String curKey) throws IOException {

Review comment:
       typo

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java
##########
@@ -120,8 +136,10 @@ public static long generateChecksum(byte[] data) {
   public static <R> R convertToHoodieRecordPayload(GenericRecord rec, String payloadClazz, Pair<String, String> recordKeyPartitionPathPair) {
     String recKey = rec.get(recordKeyPartitionPathPair.getLeft()).toString();
     String partitionPath = rec.get(recordKeyPartitionPathPair.getRight()).toString();
+    String operation = getNullableValAsString(rec, HoodieRecord.OPERATION_METADATA_FIELD);

Review comment:
       use an `Option`?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##########
@@ -141,6 +141,13 @@ private static Schema getWriteSchema(HoodieWriteConfig config) {
     return new Schema.Parser().parse(config.getWriteSchema());
   }
 
+  /**
+   * Whether to include the '_hoodie_operation' field in the metadata fields.
+   */
+  protected boolean withOperationField() {

Review comment:
       why can't this be just a `WriteConfig`? that we can set to different values for spark and flink

##########
File path: hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
##########
@@ -350,18 +352,18 @@ public HoodieRestoreMetadata restore(HoodieEngineContext context, String restore
   protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
                                               Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
     if (requireSortedRecords()) {
-      return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
+      return new FlinkCompactorSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,

Review comment:
       adv of doing this as a `WriteConfig` would be that these new classes need not exist?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -191,6 +192,17 @@ protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
     return hoodieRecord.getCurrentLocation() != null;
   }
 
+  /**
+   * Add cdc operation to the record, default do nothing.
+   *
+   * @param record The record.
+   * @param flag   The change flag name.
+   * @see HoodieOperation
+   */
+  protected void addOperationToRecord(GenericRecord record, String flag) {

Review comment:
       let's have a enum for the op flag?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org