You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/27 08:46:53 UTC

[GitHub] [inlong] yunqingmoswu opened a new pull request, #6308: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink of DorisLoadNode

yunqingmoswu opened a new pull request, #6308:
URL: https://github.com/apache/inlong/pull/6308

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   Title: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink of DorisLoadNode
   
   *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)*
   
   Fixes #6307
   
   ### Motivation
   
   Add whether to ignore single-table error policy processing for multiple sink of DorisLoadNode
   In multiple sink writing scenario, a table may have an error during the writing process. At this time, we can either ignore this error or throw this error to ensure data consistency, so i add the option 'sink.multiple.ignore-single-table-errors' to support this strategy choice.
   
   ### Modifications
   
   1.Add a option 'sink.multiple.ignore-single-table-errors' for org.apache.inlong.sort.base.Constants
   2.Update DorisDynamicSchemaOutputFormat,DorisDynamicTableFactory,DorisDynamicTableSink to support this feature
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
     *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu closed pull request #6308: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink of DorisLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu closed pull request #6308: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink of DorisLoadNode
URL: https://github.com/apache/inlong/pull/6308


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on pull request #6308: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink of DorisLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on PR #6308:
URL: https://github.com/apache/inlong/pull/6308#issuecomment-1293202521

   Is this scene different with with schema update scene?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6308: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink of DorisLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on code in PR #6308:
URL: https://github.com/apache/inlong/pull/6308#discussion_r1006604660


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -130,26 +133,28 @@ public void open(int taskNumber, int numTasks) throws IOException {
             this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
                 synchronized (DorisDynamicSchemaOutputFormat.this) {
                     if (!closed) {
-                        try {
-                            flush();
-                        } catch (Exception e) {
-                            flushException = e;
-                        }
+                        flush();
                     }
                 }
             }, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
         }
     }
 
-    private void checkFlushException() {
-        if (flushException != null) {
-            throw new RuntimeException("Writing records to streamload failed.", flushException);
+    private boolean checkFlushException(String tableIdentifier) {
+        Exception flushException = flushExceptionMap.get(tableIdentifier);
+        if (flushException == null) {
+            return false;
         }
+        if (!ignoreSingleTableErrors) {
+            throw new RuntimeException(

Review Comment:
   Since the error is not ignored, it should theoretically be thrown.  But this block may be supported as an advanced feature in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow merged pull request #6308: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink of DorisLoadNode

Posted by GitBox <gi...@apache.org>.
healchow merged PR #6308:
URL: https://github.com/apache/inlong/pull/6308


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] thesumery commented on a diff in pull request #6308: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink of DorisLoadNode

Posted by GitBox <gi...@apache.org>.
thesumery commented on code in PR #6308:
URL: https://github.com/apache/inlong/pull/6308#discussion_r1006592055


##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -130,26 +133,28 @@ public void open(int taskNumber, int numTasks) throws IOException {
             this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
                 synchronized (DorisDynamicSchemaOutputFormat.this) {
                     if (!closed) {
-                        try {
-                            flush();
-                        } catch (Exception e) {
-                            flushException = e;
-                        }
+                        flush();
                     }
                 }
             }, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
         }
     }
 
-    private void checkFlushException() {
-        if (flushException != null) {
-            throw new RuntimeException("Writing records to streamload failed.", flushException);
+    private boolean checkFlushException(String tableIdentifier) {
+        Exception flushException = flushExceptionMap.get(tableIdentifier);
+        if (flushException == null) {
+            return false;
         }
+        if (!ignoreSingleTableErrors) {
+            throw new RuntimeException(

Review Comment:
   does here need a blacklist mechanism?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] yunqingmoswu commented on pull request #6308: [INLONG-6307][Sort] Add whether to ignore single-table error policy processing for multiple sink of DorisLoadNode

Posted by GitBox <gi...@apache.org>.
yunqingmoswu commented on PR #6308:
URL: https://github.com/apache/inlong/pull/6308#issuecomment-1293221944

   > Is this scene different with with schema update scene?
   
   This part should mainly solve the problem of whether a table error affects other tables during the synchronization process of the entire database, which is not necessarily related to the schema change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org