You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/03/24 05:49:54 UTC

[GitHub] [gobblin] hanghangliu opened a new pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

hanghangliu opened a new pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-1413] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1413
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   Emit GMCE(Gobblin Metadata Change Event) as long as watermark moved on streaming pipeline. 
   
   Currently the GMCE won't be triggered within streaming pipeline if no new file being generated. This causes problem if watermarks moved, while no file being generated(for example, data been filtered out by quality checker), GMCE will be missed.
   
   Change GMCE publisher to produce GMCE when no file generated. Change IcebergMetadataWriter to correctly update watermark. 
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] autumnust commented on a change in pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252#discussion_r611866991



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -18,11 +18,13 @@
 package org.apache.gobblin.iceberg.publisher;
 
 import com.google.common.io.Closer;
+import java.util.PriorityQueue;

Review comment:
       Looks like the import order is not right here. Please consider using the style file on gobblin-oss webpage to help auto-formatting




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] hanghangliu closed pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

Posted by GitBox <gi...@apache.org>.
hanghangliu closed pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-io commented on pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252#issuecomment-809872859


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3252?src=pr&el=h1) Report
   > Merging [#3252](https://codecov.io/gh/apache/gobblin/pull/3252?src=pr&el=desc) (5c037a5) into [master](https://codecov.io/gh/apache/gobblin/commit/d9ae5353c74fdcd385835fca9b586b3fdb90971b?el=desc) (d9ae535) will **decrease** coverage by `37.41%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3252/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc)](https://codecov.io/gh/apache/gobblin/pull/3252?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #3252       +/-   ##
   ============================================
   - Coverage     46.42%   9.00%   -37.42%     
   + Complexity     9974    1738     -8236     
   ============================================
     Files          2034    2034               
     Lines         79042   79078       +36     
     Branches       8809    8815        +6     
   ============================================
   - Hits          36692    7121    -29571     
   - Misses        38938   71259    +32321     
   + Partials       3412     698     -2714     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3252?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...org/apache/gobblin/iceberg/GobblinMCEProducer.java](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree#diff-Z29iYmxpbi1pY2ViZXJnL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ljZWJlcmcvR29iYmxpbk1DRVByb2R1Y2VyLmphdmE=) | `0.00% <0.00%> (-48.49%)` | `0.00 <0.00> (-12.00)` | |
   | [...gobblin/iceberg/publisher/GobblinMCEPublisher.java](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree#diff-Z29iYmxpbi1pY2ViZXJnL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ljZWJlcmcvcHVibGlzaGVyL0dvYmJsaW5NQ0VQdWJsaXNoZXIuamF2YQ==) | `0.00% <0.00%> (-63.34%)` | `0.00 <0.00> (-12.00)` | |
   | [.../gobblin/iceberg/writer/IcebergMetadataWriter.java](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree#diff-Z29iYmxpbi1pY2ViZXJnL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ljZWJlcmcvd3JpdGVyL0ljZWJlcmdNZXRhZGF0YVdyaXRlci5qYXZh) | `0.00% <0.00%> (-67.59%)` | `0.00 <0.00> (-51.00)` | |
   | [...c/main/java/org/apache/gobblin/util/FileUtils.java](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvRmlsZVV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [...n/java/org/apache/gobblin/fork/CopyableSchema.java](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2ZvcmsvQ29weWFibGVTY2hlbWEuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...java/org/apache/gobblin/stream/ControlMessage.java](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc3RyZWFtL0NvbnRyb2xNZXNzYWdlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...va/org/apache/gobblin/dataset/DatasetResolver.java](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YXNldC9EYXRhc2V0UmVzb2x2ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...va/org/apache/gobblin/converter/EmptyIterable.java](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbnZlcnRlci9FbXB0eUl0ZXJhYmxlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [...org/apache/gobblin/ack/BasicAckableForTesting.java](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vYWNrL0Jhc2ljQWNrYWJsZUZvclRlc3RpbmcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...n/java/org/apache/gobblin/salesforce/SfConfig.java](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree#diff-Z29iYmxpbi1zYWxlc2ZvcmNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NhbGVzZm9yY2UvU2ZDb25maWcuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | ... and [1078 more](https://codecov.io/gh/apache/gobblin/pull/3252/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3252?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3252?src=pr&el=footer). Last update [d9ae535...5c037a5](https://codecov.io/gh/apache/gobblin/pull/3252?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] asfgit closed pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] ZihanLi58 commented on a change in pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on a change in pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252#discussion_r602639737



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +136,36 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
     return newFiles;
   }
 
+  /**
+   * Choose one file from the work unit state. There will be no modification to the file.
+   * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the DB and table name.
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeDummyFile (State state) throws IOException {

Review comment:
       This method will end up with list all files under the dataset, and only pick up the oldest one. And the oldest file might be removed by retention pipeline in a short time. Can you modify the algorithm to get the latest available data file?

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -198,6 +198,15 @@ private boolean verifyInput(Map<Path, Metrics> newFiles, List<String> oldFiles,
         }
         break;
       }
+      case change_property: {
+        if(newFiles != null || oldFiles != null) {
+          log.warn("{} new files and {} old files detected while no file alteration is performed",

Review comment:
       Since we always set new files to a dummy file, do you think we need to modify the condition for the warn log?

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -229,7 +229,7 @@ private Long getCurrentWaterMark(TableIdentifier tid, String topicPartition) {
    * The logic of this function will be:
    * 1. Check whether a table exists, if not then create the iceberg table
    * 2. Compute schema from the gmce and update the cache for candidate schemas
-   * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or dropFile
+   * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or dropFile. change_property means no operation needed

Review comment:
       change_property means we only update the table level property but not modify data?

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +136,36 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
     return newFiles;
   }
 
+  /**
+   * Choose one file from the work unit state. There will be no modification to the file.
+   * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the DB and table name.
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeDummyFile (State state) throws IOException {
+    Map<Path, Metrics> newFiles = new HashMap<>();
+    NameMapping mapping = getNameMapping();

Review comment:
       In this case, we don't need file metrics as it's just a dummy file. Also get nameMapping require the latest schema is set, but it's possible that we don't have this properties when no data written out.

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -275,6 +275,14 @@ public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSp
         dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
         break;
       }
+      case change_property: {

Review comment:
       On Line 244, we may also want to skip processing the event if the operation type is change_property and table does not exist?

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -92,9 +93,12 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
       Map<Path, Metrics> newFiles = computeFileMetrics(state);
       Map<String, String> offsetRange = getPartitionOffsetRange(OFFSET_RANGE_KEY);
       if (newFiles.isEmpty()) {
-        return;
+        // There'll be only one dummy file here. This file is parsed for DB and table name calculation.
+        newFiles = computeDummyFile(state);
+        this.producer.sendGMCE(newFiles, null, null, offsetRange, OperationType.change_property, SchemaSource.SCHEMAREGISTRY);

Review comment:
       You may want to set the schema source to be none in this case? since we don't want to update the schema?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] ZihanLi58 commented on a change in pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on a change in pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252#discussion_r604510346



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +137,36 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
     return newFiles;
   }
 
+  /**
+   * Choose the latest file from the work unit state. There will be no modification to the file.
+   * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the DB and table name.
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeDummyFile (State state) throws IOException {
+    Map<Path, Metrics> newFiles = new HashMap<>();
+    FileSystem fs = FileSystem.get(conf);
+    for (final String pathString : state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) {

Review comment:
       Oh one more thing I want to point out that if writer.partition.prefix is set, you may also want to include that value in the initial path, so that you won't end up with get the daily data which may cause the following operation to fail




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] hanghangliu commented on a change in pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

Posted by GitBox <gi...@apache.org>.
hanghangliu commented on a change in pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252#discussion_r611928389



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -18,11 +18,13 @@
 package org.apache.gobblin.iceberg.publisher;
 
 import com.google.common.io.Closer;
+import java.util.PriorityQueue;

Review comment:
       turned out to be the old imports were in wrong sequence. reformat the whole file's imports




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] hanghangliu commented on a change in pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

Posted by GitBox <gi...@apache.org>.
hanghangliu commented on a change in pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252#discussion_r603718830



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +136,36 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
     return newFiles;
   }
 
+  /**
+   * Choose one file from the work unit state. There will be no modification to the file.
+   * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the DB and table name.
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeDummyFile (State state) throws IOException {

Review comment:
       Used a priority queue to chose file based on max modification_time to ensure the dummy file is the latest available file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] hanghangliu commented on a change in pull request #3252: [GOBBLIN-1413]-Emit-GMCE-as-long-as-watermark-moved

Posted by GitBox <gi...@apache.org>.
hanghangliu commented on a change in pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252#discussion_r605873438



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +137,36 @@ public void publishData(Collection<? extends WorkUnitState> states) throws IOExc
     return newFiles;
   }
 
+  /**
+   * Choose the latest file from the work unit state. There will be no modification to the file.
+   * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the DB and table name.
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeDummyFile (State state) throws IOException {
+    Map<Path, Metrics> newFiles = new HashMap<>();
+    FileSystem fs = FileSystem.get(conf);
+    for (final String pathString : state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) {

Review comment:
       As we discussed and tested, this won't be an issue




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org