You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/18 13:11:45 UTC

[GitHub] [flink] zhuzhurk opened a new pull request, #20299: [FLINK-28586] Enable speculative execution for new sources

zhuzhurk opened a new pull request, #20299:
URL: https://github.com/apache/flink/pull/20299

   ## What is the purpose of the change
   
   This PR enables new sources(FLIP-27) for speculative execution.
   It mainly improved the SourceCoordinator and SourceCoordinatorContext to deal with attempt level split requests and source events.
   
   
   ## Verifying this change
   
     - *Added unit tests SourceCoordinatorConcurrentAttemptsTest*
     - *Added test cases in SourceCoordinatorTest*
     - *Covered by existing tests of SourceCoordinator and SourceCoordinatorContext*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**yes** / no / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] beyond1920 commented on a diff in pull request #20299: [FLINK-28586] Enable speculative execution for new sources

Posted by GitBox <gi...@apache.org>.
beyond1920 commented on code in PR #20299:
URL: https://github.com/apache/flink/pull/20299#discussion_r925348071


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java:
##########
@@ -431,4 +489,125 @@ private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage)
             throw new FlinkRuntimeException(errorMessage, t);
         }
     }
+
+    private void assignSplitsToAttempts(SplitsAssignment<SplitT> assignment) {
+        assignment.assignment().forEach((index, splits) -> assignSplitsToAttempts(index, splits));
+    }
+
+    private void assignSplitsToAttempts(int subtaskIndex, List<SplitT> splits) {
+        getRegisteredAttempts(subtaskIndex)
+                .forEach(attempt -> assignSplitsToAttempt(subtaskIndex, attempt, splits));
+    }
+
+    private void assignSplitsToAttempt(int subtaskIndex, int attemptNumber, List<SplitT> splits) {
+        checkAttemptReaderReady(subtaskIndex, attemptNumber);
+
+        final AddSplitEvent<SplitT> addSplitEvent;
+        try {
+            addSplitEvent = new AddSplitEvent<>(splits, splitSerializer);

Review Comment:
   It's better not send this message if splits is empty or null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20299: [FLINK-28586] Enable speculative execution for new sources

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20299:
URL: https://github.com/apache/flink/pull/20299#issuecomment-1187404775

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "f5db081e402e4787cd726eb2d722ea65d275abe5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f5db081e402e4787cd726eb2d722ea65d275abe5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f5db081e402e4787cd726eb2d722ea65d275abe5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on pull request #20299: [FLINK-28586] Enable speculative execution for new sources

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on PR #20299:
URL: https://github.com/apache/flink/pull/20299#issuecomment-1192159653

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk closed pull request #20299: [FLINK-28586] Enable speculative execution for new sources

Posted by GitBox <gi...@apache.org>.
zhuzhurk closed pull request #20299: [FLINK-28586] Enable speculative execution for new sources
URL: https://github.com/apache/flink/pull/20299


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20299: [FLINK-28586] Enable speculative execution for new sources

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20299:
URL: https://github.com/apache/flink/pull/20299#discussion_r925447573


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java:
##########
@@ -431,4 +489,125 @@ private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage)
             throw new FlinkRuntimeException(errorMessage, t);
         }
     }
+
+    private void assignSplitsToAttempts(SplitsAssignment<SplitT> assignment) {
+        assignment.assignment().forEach((index, splits) -> assignSplitsToAttempts(index, splits));
+    }
+
+    private void assignSplitsToAttempts(int subtaskIndex, List<SplitT> splits) {
+        getRegisteredAttempts(subtaskIndex)
+                .forEach(attempt -> assignSplitsToAttempt(subtaskIndex, attempt, splits));
+    }
+
+    private void assignSplitsToAttempt(int subtaskIndex, int attemptNumber, List<SplitT> splits) {
+        checkAttemptReaderReady(subtaskIndex, attemptNumber);
+
+        final AddSplitEvent<SplitT> addSplitEvent;
+        try {
+            addSplitEvent = new AddSplitEvent<>(splits, splitSerializer);

Review Comment:
   fixed. Note that the `splits` is not possible to be null (checked before) so we only check if it is empty.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on a diff in pull request #20299: [FLINK-28586] Enable speculative execution for new sources

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on code in PR #20299:
URL: https://github.com/apache/flink/pull/20299#discussion_r925447573


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java:
##########
@@ -431,4 +489,125 @@ private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage)
             throw new FlinkRuntimeException(errorMessage, t);
         }
     }
+
+    private void assignSplitsToAttempts(SplitsAssignment<SplitT> assignment) {
+        assignment.assignment().forEach((index, splits) -> assignSplitsToAttempts(index, splits));
+    }
+
+    private void assignSplitsToAttempts(int subtaskIndex, List<SplitT> splits) {
+        getRegisteredAttempts(subtaskIndex)
+                .forEach(attempt -> assignSplitsToAttempt(subtaskIndex, attempt, splits));
+    }
+
+    private void assignSplitsToAttempt(int subtaskIndex, int attemptNumber, List<SplitT> splits) {
+        checkAttemptReaderReady(subtaskIndex, attemptNumber);
+
+        final AddSplitEvent<SplitT> addSplitEvent;
+        try {
+            addSplitEvent = new AddSplitEvent<>(splits, splitSerializer);

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zhuzhurk commented on pull request #20299: [FLINK-28586] Enable speculative execution for new sources

Posted by GitBox <gi...@apache.org>.
zhuzhurk commented on PR #20299:
URL: https://github.com/apache/flink/pull/20299#issuecomment-1192504215

   Also verified with TPC-DS benchmarks on real cluster. Speculative execution is working and there is no suspicious error.
   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org