You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2022/02/11 02:00:42 UTC

[GitHub] [gobblin] jack-moseley opened a new pull request #3466: [GOBBLIN-1608] Fix case where error GTE is incorrectly sent from MCE writer

jack-moseley opened a new pull request #3466:
URL: https://github.com/apache/gobblin/pull/3466


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [x] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1608
   
   
   ### Description
   - [x] Here are some details about my PR, including screenshots (if applicable):
   
   When `max.error.dateset` is reached and the container is failed, gobblin will cause close() on the writer, which will call flush(). This causes a couple issues with events due to assumptions on where flush() is called.
   
   - Move `tableOperationTypeMap.remove(tableString)` to before the container is failed, otherwise an extra flush() will be called on the table. This flush() shouldn't actually do anything since writer.reset has been called for the dataset already.
   - Don't send error GTEs for all tables if flush() is called from close(). This event emission is here because GMCE watermark is advanced when flush() is called periodically, but when flush() is called from close the watermark is not advanced so events should not be sent.
   
   ### Tests
   - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   Tested with one dataset in e2e pipeline.
   
   ### Commits
   - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] jack-moseley commented on pull request #3466: [GOBBLIN-1608] Fix case where error GTE is incorrectly sent from MCE writer

Posted by GitBox <gi...@apache.org>.
jack-moseley commented on pull request #3466:
URL: https://github.com/apache/gobblin/pull/3466#issuecomment-1035728545


   @ZihanLi58 please review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] ZihanLi58 merged pull request #3466: [GOBBLIN-1608] Fix case where error GTE is incorrectly sent from MCE writer

Posted by GitBox <gi...@apache.org>.
ZihanLi58 merged pull request #3466:
URL: https://github.com/apache/gobblin/pull/3466


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] jack-moseley commented on a change in pull request #3466: [GOBBLIN-1608] Fix case where error GTE is incorrectly sent from MCE writer

Posted by GitBox <gi...@apache.org>.
jack-moseley commented on a change in pull request #3466:
URL: https://github.com/apache/gobblin/pull/3466#discussion_r804960876



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -368,29 +369,40 @@ private void flush(String dbName, String tableName) throws IOException {
    * Note that this is one of the place where the materialization of aggregated metadata happens.
    * When there's a change of {@link OperationType}, it also interrupts metadata aggregation,
    * and triggers materialization of metadata.
+   *
+   * @param emitAllErrors if true, go through all datasets in {@link #datasetErrorMap} and emit GTE for each. We only want
+   *                      to do this if a watermark is is being advanced.
+   *
    * @throws IOException
    */
-  @Override
-  public void flush() throws IOException {
+  public void flush(boolean emitAllErrors) throws IOException {
     log.info(String.format("start to flushing %s records", String.valueOf(recordCount.get())));
     for (String tableString : tableOperationTypeMap.keySet()) {
       List<String> tid = Splitter.on(TABLE_NAME_DELIMITER).splitToList(tableString);
       flush(tid.get(0), tid.get(1));
     }
     tableOperationTypeMap.clear();
     recordCount.lazySet(0L);
-    // Emit events for all current errors, since the GMCE watermark will be advanced
-    for (Map.Entry<String, Map<String, GobblinMetadataException>> entry : datasetErrorMap.entrySet()) {
-      for (GobblinMetadataException exception : entry.getValue().values()) {
-        submitFailureEvent(exception);
+    if (emitAllErrors) {

Review comment:
       My thought was that this way is better, because when we call flush(), there may be iceberg watermarks advanced for tables that actually do flush, and for those ones, we do want to send failure GTE (but not for ones that just failed and are not having a successful flush).
   
   >I think we only want flush when watermark is moving
   
   Did you mean to say only send GTE when watermark is moving or are you suggesting we remove the flush from close method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] jack-moseley commented on a change in pull request #3466: [GOBBLIN-1608] Fix case where error GTE is incorrectly sent from MCE writer

Posted by GitBox <gi...@apache.org>.
jack-moseley commented on a change in pull request #3466:
URL: https://github.com/apache/gobblin/pull/3466#discussion_r804960876



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -368,29 +369,40 @@ private void flush(String dbName, String tableName) throws IOException {
    * Note that this is one of the place where the materialization of aggregated metadata happens.
    * When there's a change of {@link OperationType}, it also interrupts metadata aggregation,
    * and triggers materialization of metadata.
+   *
+   * @param emitAllErrors if true, go through all datasets in {@link #datasetErrorMap} and emit GTE for each. We only want
+   *                      to do this if a watermark is is being advanced.
+   *
    * @throws IOException
    */
-  @Override
-  public void flush() throws IOException {
+  public void flush(boolean emitAllErrors) throws IOException {
     log.info(String.format("start to flushing %s records", String.valueOf(recordCount.get())));
     for (String tableString : tableOperationTypeMap.keySet()) {
       List<String> tid = Splitter.on(TABLE_NAME_DELIMITER).splitToList(tableString);
       flush(tid.get(0), tid.get(1));
     }
     tableOperationTypeMap.clear();
     recordCount.lazySet(0L);
-    // Emit events for all current errors, since the GMCE watermark will be advanced
-    for (Map.Entry<String, Map<String, GobblinMetadataException>> entry : datasetErrorMap.entrySet()) {
-      for (GobblinMetadataException exception : entry.getValue().values()) {
-        submitFailureEvent(exception);
+    if (emitAllErrors) {

Review comment:
       My thought was that this way is better, because when we call flush(), there may be iceberg watermarks advanced for tables that actually do flush, and for those ones, we do want to send failure GTE (but not for ones that just failed and are not having a successful flush).
   
   >I think we only want flush when watermark is moving
   Did you mean to say only send GTE when watermark is moving or are you suggesting we remove the flush from close method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] ZihanLi58 commented on a change in pull request #3466: [GOBBLIN-1608] Fix case where error GTE is incorrectly sent from MCE writer

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on a change in pull request #3466:
URL: https://github.com/apache/gobblin/pull/3466#discussion_r804952050



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -368,29 +369,40 @@ private void flush(String dbName, String tableName) throws IOException {
    * Note that this is one of the place where the materialization of aggregated metadata happens.
    * When there's a change of {@link OperationType}, it also interrupts metadata aggregation,
    * and triggers materialization of metadata.
+   *
+   * @param emitAllErrors if true, go through all datasets in {@link #datasetErrorMap} and emit GTE for each. We only want
+   *                      to do this if a watermark is is being advanced.
+   *
    * @throws IOException
    */
-  @Override
-  public void flush() throws IOException {
+  public void flush(boolean emitAllErrors) throws IOException {
     log.info(String.format("start to flushing %s records", String.valueOf(recordCount.get())));
     for (String tableString : tableOperationTypeMap.keySet()) {
       List<String> tid = Splitter.on(TABLE_NAME_DELIMITER).splitToList(tableString);
       flush(tid.get(0), tid.get(1));
     }
     tableOperationTypeMap.clear();
     recordCount.lazySet(0L);
-    // Emit events for all current errors, since the GMCE watermark will be advanced
-    for (Map.Entry<String, Map<String, GobblinMetadataException>> entry : datasetErrorMap.entrySet()) {
-      for (GobblinMetadataException exception : entry.getValue().values()) {
-        submitFailureEvent(exception);
+    if (emitAllErrors) {

Review comment:
       Should we just clear the datasetErrorMap when throw exception? Or do we actually need to flush() when writer is close? I think we only want flush when watermark is moving and this does not happen when close is called?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] jack-moseley commented on a change in pull request #3466: [GOBBLIN-1608] Fix case where error GTE is incorrectly sent from MCE writer

Posted by GitBox <gi...@apache.org>.
jack-moseley commented on a change in pull request #3466:
URL: https://github.com/apache/gobblin/pull/3466#discussion_r805090882



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -368,29 +369,40 @@ private void flush(String dbName, String tableName) throws IOException {
    * Note that this is one of the place where the materialization of aggregated metadata happens.
    * When there's a change of {@link OperationType}, it also interrupts metadata aggregation,
    * and triggers materialization of metadata.
+   *
+   * @param emitAllErrors if true, go through all datasets in {@link #datasetErrorMap} and emit GTE for each. We only want
+   *                      to do this if a watermark is is being advanced.
+   *
    * @throws IOException
    */
-  @Override
-  public void flush() throws IOException {
+  public void flush(boolean emitAllErrors) throws IOException {
     log.info(String.format("start to flushing %s records", String.valueOf(recordCount.get())));
     for (String tableString : tableOperationTypeMap.keySet()) {
       List<String> tid = Splitter.on(TABLE_NAME_DELIMITER).splitToList(tableString);
       flush(tid.get(0), tid.get(1));
     }
     tableOperationTypeMap.clear();
     recordCount.lazySet(0L);
-    // Emit events for all current errors, since the GMCE watermark will be advanced
-    for (Map.Entry<String, Map<String, GobblinMetadataException>> entry : datasetErrorMap.entrySet()) {
-      for (GobblinMetadataException exception : entry.getValue().values()) {
-        submitFailureEvent(exception);
+    if (emitAllErrors) {

Review comment:
       As discussed removed the flush from close() instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org