You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/03/27 00:31:34 UTC

[GitHub] [gobblin] ZihanLi58 commented on a change in pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

ZihanLi58 commented on a change in pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252#discussion_r602639737



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +136,36 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
     return newFiles;
   }
 
+  /**
+   * Choose one file from the work unit state. There will be no modification to the file.
+   * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the DB and table name.
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeDummyFile (State state) throws IOException {

Review comment:
       This method will end up with list all files under the dataset, and only pick up the oldest one. And the oldest file might be removed by retention pipeline in a short time. Can you modify the algorithm to get the latest available data file?

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -198,6 +198,15 @@ private boolean verifyInput(Map<Path, Metrics> newFiles, List<String> oldFiles,
         }
         break;
       }
+      case change_property: {
+        if(newFiles != null || oldFiles != null) {
+          log.warn("{} new files and {} old files detected while no file alteration is performed",

Review comment:
       Since we always set new files to a dummy file, do you think we need to modify the condition for the warn log?

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -229,7 +229,7 @@ private Long getCurrentWaterMark(TableIdentifier tid, String topicPartition) {
    * The logic of this function will be:
    * 1. Check whether a table exists, if not then create the iceberg table
    * 2. Compute schema from the gmce and update the cache for candidate schemas
-   * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or dropFile
+   * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or dropFile. change_property means no operation needed

Review comment:
       change_property means we only update the table level property but not modify data?

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +136,36 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
     return newFiles;
   }
 
+  /**
+   * Choose one file from the work unit state. There will be no modification to the file.
+   * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the DB and table name.
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeDummyFile (State state) throws IOException {
+    Map<Path, Metrics> newFiles = new HashMap<>();
+    NameMapping mapping = getNameMapping();

Review comment:
       In this case, we don't need file metrics as it's just a dummy file. Also get nameMapping require the latest schema is set, but it's possible that we don't have this properties when no data written out.

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -275,6 +275,14 @@ public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSp
         dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
         break;
       }
+      case change_property: {

Review comment:
       On Line 244, we may also want to skip processing the event if the operation type is change_property and table does not exist?

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -92,9 +93,12 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
       Map<Path, Metrics> newFiles = computeFileMetrics(state);
       Map<String, String> offsetRange = getPartitionOffsetRange(OFFSET_RANGE_KEY);
       if (newFiles.isEmpty()) {
-        return;
+        // There'll be only one dummy file here. This file is parsed for DB and table name calculation.
+        newFiles = computeDummyFile(state);
+        this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.change_property, SchemaSource.SCHEMAREGISTRY);

Review comment:
       You may want to set the schema source to be none in this case? since we don't want to update the schema?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org