You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/17 03:52:33 UTC

[GitHub] [flink] xinbinhuang opened a new pull request, #20289: Draft: send events from reader to enumerator

xinbinhuang opened a new pull request, #20289:
URL: https://github.com/apache/flink/pull/20289

   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
     - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact*
     - *Deployments RPC transmits only the blob storage reference*
     - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (100MB)*
     - *Extended integration test for recovery after master (JobManager) failure*
     - *Added test that validates that TaskInfo is transferred only once across recoveries*
     - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no)
     - The serializers: (yes / no / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / no / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
     - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xinbinhuang commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1196291153

   
   @tweise Thank you so much for reviewing the PR! I just realized that I might have misread jira issue as `HybridSource: Support dynamic stop position in HybridSource` instead of `HybridSource: Support dynamic stop position in FileSource`. So this PR actually aimed to design an _**generic interface to allow any sources to participate in dynamic source switch**_. With that in mind. Let me explain how I came up with the current design & implementation.
   
   After reviewing the current logic of the hybrid source (amazing work 🎉 🎉 !!), I understand that the current implementation support transferring the end position to the next enumerator. However, it lacks the mechanism to know where is the end position (i.e. offset for a kafka partition). And these "end positions" are probably unknown beforehand, or it would be the same as [fixed start position](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#fixed-start-position-at-graph-construction-time)?
   Therefore, I think the key is to transfer the "end position" to the enumerator from source reader during source switch. 
   There are a few points to consider:
   
   1. What would be the "end position" to transfer to the next source? 
   
   I believe this varied by use cases. Some may find it enough to use `file.path`, while some may require to derive the timestamp or offset from the content of the file (i.e. kafka archive, and the implementation can vary by companies.). Since we can't anticipate all use cases, passing all finished splits seem to be a reasonable solution here and let the developer to decide how to derive the position from them.
   
   2. Where to make the changes?
   
   I aimed to implement this s.t. most existing sources can benefit from it out of the box with minimal changes and no breaking changes to them.
   
   
   3. How to store the "finished splits" before source switch?
   
   Per FLIP-27, the enumerator only knows what splits to consume but not the actual progress - only source reader knows about it. So we need to store them and transfer them to the enumerator during source switch. However, most existing sources implements `SourceReaderBase` and it [purges them from state](https://github.com/apache/flink/blob/3e2620bc785b3b4d82f1f188eb7b1e0e129b14d3/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java#L193-L194) once it reaches the end position. One naive solution would be to adjust `SourceReaderBase` to also store finished splits into the state. However, this'll affect all sources running in production and is probably a big backward incompatible changes right away. Therefore, I decided to store them only in the `HybridSourceReader`, and the existing sources only need to implement one method (`DynamicHybridSourceReader::getFinishedSplits`) that allows the finished splits to be extracted
  during checkpoint and source switch. This process is transparent to all existing sources, and only happens when used with the `HybridSource`.
   
   With the above points, the current implementation works as follow:
   - On each checkpoint,  `HybridSourceReader`  retrieve the finished states (marked with  `HybridSourceSplit.isFinished = true`) from the underlying reader and checkpoint them for persistent along with the unfinished states.
   - Upon source switch,  `HybridSourceReader`  will send all the finished splits in  `SourceReaderFinishedEvent`  to the enumerator.
   - Enumerator will pass along those finished splits to in  `SourceSwitchContext`  to the next source. And the next source can them use the splits to derive the start positions.
   
   #### Changes required on existing non hybrid sources:
   - Implements `DynamicHybridSourceReader::getFinishedSplits` on `SourceReaderBase`.
   
   #### API changes on `HybridSource`
   - Added `SourceSwitchContext::getPreviousSplits` which returns finished splits from the previous source.
   
   It's a lot of words, so really appreciate your patience for reading this. Let me know if there are anything unclear, I'm happy to chat more about this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xinbinhuang commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1272048838

   @tweise was distracted by other works. Let me get back to this.
   > I think we need to zoom in why or why not the enumerator knows the actual stop position without involvement of the reader.
   
   Our use case is to expose end offset or timestamp based on the content of the file. We're archiving out-of-retention messages into S3 using a long-running job. Normally there are multiple messages inside the files, and the timestamp of the last message may not align with the file metadata. So we'll need to actually parse the file content to find out either the last timestamp or offset. That's why I think sending back the split would make sense, since it's already processed there. 
   
   Do you have any recommendations around this? Or do you think this's too complex to implement?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xinbinhuang commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1188474251

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tweise commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
tweise commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1196887340

   @xinbinhuang thanks for describing the thought process.
   
   As you already mentioned, the goal of the JIRA was to add the passing of end position to file source and when we implemented FLIP-150 we presumably already added everything that is required to achieve that goal to the HybridSource. 
   
   I think we need to zoom in why or why not the enumerator knows the actual stop position without involvement of the reader. 
   
   It is correct that we do not know the stop position at graph construction time or otherwise we would not need any runtime behavior. However, the enumerator already knows what splits have been processed because it has passed those to the readers and the readers have finished the splits that they got assigned. Remember that we are dealing with **bounded** sources. So there really should be no need to pass splits back to the enumerator. Now it will depend on the specific type of source how the enumerator communicates the end position to the next enumerator. In most typical cases that will simply come from the partition metadata (iceberg, files).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tweise commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
tweise commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1196163399

   @xinbinhuang looking at all the modifications to HybridSource itself I think it is necessary to take a step back here and discuss the design aspects first.
   
   The underlying sources are a sequence of bounded sources, optionally followed by an unbounded source. Therefore, there should be no need to have a "dynamic reader" that does special things. The enumerator knows upfront which splits need to be processed and when it is finished.
   
   The HybridSource already has the support to transfer the end position to the next enumerator. That was part of the FLIP and the details can be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source and you can find an example in the tests: HybridSourceITCase.sourceWithDynamicSwitchPosition
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xinbinhuang commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1355320070

   @MartijnVisser I would love to move it forward. 
   @tweise would you live to take a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xinbinhuang commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1186666153

   cc: @tweise This is the first draft of the implementation. PTAL! 
   (i'm planning to add more tests gradually)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xinbinhuang commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1186664448

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xinbinhuang commented on a diff in pull request #20289: Draft: send events from reader to enumerator

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on code in PR #20289:
URL: https://github.com/apache/flink/pull/20289#discussion_r922891812


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java:
##########
@@ -258,12 +263,25 @@ private void switchEnumerator() {
             currentSourceIndex++;
         }
 
-        HybridSource.SourceSwitchContext<?> switchContext =
-                new HybridSource.SourceSwitchContext<Object>() {
+        List<SourceSplit> previousSplits =
+                finishedSplits.stream()
+                        .filter(
+                                split ->
+                                        split.isFinished
+                                                && split.sourceIndex() == previousSourceIndex)

Review Comment:
   this is a paranoid check..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20289: Draft: send events from reader to enumerator

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1186390388

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fd75f65e6b3a9952aa30b0dc99b25e1900fb8998",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fd75f65e6b3a9952aa30b0dc99b25e1900fb8998",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fd75f65e6b3a9952aa30b0dc99b25e1900fb8998 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource [flink]

Posted by "waywtdcc (via GitHub)" <gi...@apache.org>.
waywtdcc commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1869298738

   How is this going?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xinbinhuang commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1190689007

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] xinbinhuang commented on a diff in pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
xinbinhuang commented on code in PR #20289:
URL: https://github.com/apache/flink/pull/20289#discussion_r922925804


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java:
##########
@@ -224,7 +226,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
             }
 
             // track readers that have finished processing for current enumerator
+            // TODO: should finishedReaders be reset after switching to a new numerator?

Review Comment:
   @tweise It seems that `finishedReaders` will keep increasing after the first switch unless the job is restarted. So if there are more than 2 sources in the chain, the 3rd and latter source may never get triggered. Is my understand correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #20289: [FLINK-23633][FLIP-150][connector/common] HybridSource: Support dynamic stop position in FileSource

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #20289:
URL: https://github.com/apache/flink/pull/20289#issuecomment-1353265333

   @xinbinhuang @tweise What's the status of this PR? Can we somehow bring this forward and resolve it for 1.17?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org