You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/04 14:45:50 UTC

[GitHub] [flink] becketqin opened a new pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

becketqin opened a new pull request #11554:
URL: https://github.com/apache/flink/pull/11554


   ## What is the purpose of the change
   This patch is a part of [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface). It adds the implementation for `SourceCoordinator` which extends `OperatorCoordinator`.
   
   ## Brief change log
   The following major classes are added:
   * SourceCoordinator
   * SourceCoordinatorContext
   * SourceCoordinatorProvider
   * SplitAssignmentTracker
   
   ## Verifying this change
   This change added related unit tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (yes)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (JavaDocs)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/163393154",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * 6a0a147d96dc517574efb6a97786b8b7e8e9c10c Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/160993263) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752) 
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   * 360f3e0be00e518c2f268f26aab06d6f27764acf Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/163393154) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539) 
   * 5b241a0386f6ae029759e076afef6b155b10b328 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/163393154",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "triggerType" : "PUSH"
     }, {
       "hash" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=565",
       "triggerID" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ffa6112f5f8fd094f68aeb4013ebf5d026156d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2ffa6112f5f8fd094f68aeb4013ebf5d026156d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a460858a2f8ff4da137ac5d7533cf4cdab7c84f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a460858a2f8ff4da137ac5d7533cf4cdab7c84f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8720b50c34104ea0fe90d38bfebd8c0bd7132769",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=611",
       "triggerID" : "8720b50c34104ea0fe90d38bfebd8c0bd7132769",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   * 5b241a0386f6ae029759e076afef6b155b10b328 UNKNOWN
   * 475dd8aa6478eaf51b758d3d201f0d4a38c27dd8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=565) 
   * 2ffa6112f5f8fd094f68aeb4013ebf5d026156d8 UNKNOWN
   * a460858a2f8ff4da137ac5d7533cf4cdab7c84f1 UNKNOWN
   * 8720b50c34104ea0fe90d38bfebd8c0bd7132769 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=611) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bd27b1b4dd97173d85bb0867f7c377ac4c638af5 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/156295604) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793) 
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * 6a0a147d96dc517574efb6a97786b8b7e8e9c10c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
becketqin commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-623112338


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
becketqin commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-624119457


   merged to master:
   ed400497e56ea272722ac71697edf830b2d682ae


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
becketqin commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-623516373


   @rmetzger  Thanks for the help. I just tried to do a rebase as well as an empty commit. But it seems the CI test was still not triggered.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/163393154",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   * 360f3e0be00e518c2f268f26aab06d6f27764acf Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/163393154) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539) 
   * 5b241a0386f6ae029759e076afef6b155b10b328 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-621785104


   Thank you for addressing the changes. Looks good to me with two remarks:
   
   (1) Can we wrap the `ByteArrayOutputStream` into a `DataOutputStream` instead of using the `SerdeUtils`? It looks both simpler and more efficient and is also what other parts of the Flink code typically do.
   
   (2) About the "Fatal Error Handler": Virtually all of the really critical issues in Flink have fallen into the following category: An exception happens that we try to handle "fine grained" but it actually leaves something in an inconsistent state that is not recognized. Had we simply handled it "coarse grained" (fail process, rely on recovery) it would have been fine.
   
   That is why I think the default should be to escalate. If we feel that we are completely sure we can handle this more fine-grained, then it is fine.
   This issue is somewhat connected with the "closing" of the coordinator.
   
   How about the following approach:
     - We add a "process killing" uncaught exception handler, as the safety net.
     - We catch the exceptions in the runnables ourselves and cause a global job failure upon exception, unless the `SourceCoordinator` is closed already
     - The global job failure will close() the current coordinator re-create the coordinator from latest checkpoint (we need to still implement this)
     - closing the source coordinator gets the following changes
       - we call `shutdownNow()` on the executor to make sure all queued tasks do not get executed
       - we make sure the `SourceCoordinator` does not forward events from the Enumerator any more, so that a long-running task that is still being executed in the enumerator cannot affect anything any more
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/163393154",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "triggerType" : "PUSH"
     }, {
       "hash" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   * 360f3e0be00e518c2f268f26aab06d6f27764acf Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/163393154) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539) 
   * 5b241a0386f6ae029759e076afef6b155b10b328 UNKNOWN
   * 475dd8aa6478eaf51b758d3d201f0d4a38c27dd8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-621793826


   From my side, (1) looks like a nice to have (consistent way of doing thins in the code base) and for (2) we could for now just to the FatalErrorHandler (be on the safe side) and make error handling more fie grained in a follow-up.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * 6a0a147d96dc517574efb6a97786b8b7e8e9c10c Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/160993263) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752) 
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * 6a0a147d96dc517574efb6a97786b8b7e8e9c10c Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/160993263) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752) 
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   * 360f3e0be00e518c2f268f26aab06d6f27764acf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
becketqin commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-622981830


   Thanks for the review @StephanEwen. I just updated the patch. 
   
   Re: (1) Yes, keeping the code style consistent make sense. I did not find `DataOutputStream`, though. Do you mean `DataOutputViewStreamWrapper`?
   
   Re: (2) Sounds good to me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * 6a0a147d96dc517574efb6a97786b8b7e8e9c10c Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/160993263) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752) 
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rmetzger commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-623508089


   Sadly, Flinkbot does not re-run the CI if the last status is "UNKNOWN". You need to do another push to the branch (empty commit, rebase).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rmetzger commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
rmetzger commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-623522364


   The bot needs quite some time to update. I notified Chesnay already to look into it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
becketqin commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-623213157


   My updated patch did exactly what you said. I will merge the patch once the CI passes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/163393154",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "triggerType" : "PUSH"
     }, {
       "hash" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=565",
       "triggerID" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ffa6112f5f8fd094f68aeb4013ebf5d026156d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2ffa6112f5f8fd094f68aeb4013ebf5d026156d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a460858a2f8ff4da137ac5d7533cf4cdab7c84f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a460858a2f8ff4da137ac5d7533cf4cdab7c84f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   * 5b241a0386f6ae029759e076afef6b155b10b328 UNKNOWN
   * 475dd8aa6478eaf51b758d3d201f0d4a38c27dd8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=565) 
   * 2ffa6112f5f8fd094f68aeb4013ebf5d026156d8 UNKNOWN
   * a460858a2f8ff4da137ac5d7533cf4cdab7c84f1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/163393154",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "triggerType" : "PUSH"
     }, {
       "hash" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=565",
       "triggerID" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ffa6112f5f8fd094f68aeb4013ebf5d026156d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2ffa6112f5f8fd094f68aeb4013ebf5d026156d8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   * 5b241a0386f6ae029759e076afef6b155b10b328 UNKNOWN
   * 475dd8aa6478eaf51b758d3d201f0d4a38c27dd8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=565) 
   * 2ffa6112f5f8fd094f68aeb4013ebf5d026156d8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
becketqin commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-623476493


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/163393154",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "triggerType" : "PUSH"
     }, {
       "hash" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=565",
       "triggerID" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ffa6112f5f8fd094f68aeb4013ebf5d026156d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2ffa6112f5f8fd094f68aeb4013ebf5d026156d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a460858a2f8ff4da137ac5d7533cf4cdab7c84f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a460858a2f8ff4da137ac5d7533cf4cdab7c84f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8720b50c34104ea0fe90d38bfebd8c0bd7132769",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=611",
       "triggerID" : "8720b50c34104ea0fe90d38bfebd8c0bd7132769",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   * 5b241a0386f6ae029759e076afef6b155b10b328 UNKNOWN
   * 2ffa6112f5f8fd094f68aeb4013ebf5d026156d8 UNKNOWN
   * a460858a2f8ff4da137ac5d7533cf4cdab7c84f1 UNKNOWN
   * 8720b50c34104ea0fe90d38bfebd8c0bd7132769 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=611) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on issue #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * bd27b1b4dd97173d85bb0867f7c377ac4c638af5 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/156295604) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793) 
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * 6a0a147d96dc517574efb6a97786b8b7e8e9c10c Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160993263) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752) 
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] becketqin commented on a change in pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
becketqin commented on a change in pull request #11554:
URL: https://github.com/apache/flink/pull/11554#discussion_r410904702



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
##########
@@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinator.class);
+	/** A single-thread executor to handle all the changes to the coordinator. */
+	private final ExecutorService coordinatorExecutor;
+	/** The Source that is associated with this SourceCoordinator. */
+	private final Source<?, SplitT, EnumChkT> source;
+	/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
+	private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
+	/** The serializer for the SourceSplit of the associated Source. */
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
+	/** The context containing the states of the coordinator. */
+	private final SourceCoordinatorContext<SplitT> context;
+	/** The split enumerator created from the associated Source. */
+	private SplitEnumerator<SplitT, EnumChkT> enumerator;
+	/** A flag marking whether the coordinator has started. */
+	private boolean started;
+
+	public SourceCoordinator(
+			ExecutorService coordinatorExecutor,
+			Source<?, SplitT, EnumChkT> source,
+			SourceCoordinatorContext<SplitT> context) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.source = source;
+		this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+		this.splitSerializer = source.getSplitSerializer();
+		this.context = context;
+		this.enumerator = source.createEnumerator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		LOG.info("Starting split enumerator.");

Review comment:
       I like the idea. Makes a lot of sense.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
##########
@@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinator.class);
+	/** A single-thread executor to handle all the changes to the coordinator. */
+	private final ExecutorService coordinatorExecutor;
+	/** The Source that is associated with this SourceCoordinator. */
+	private final Source<?, SplitT, EnumChkT> source;
+	/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
+	private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
+	/** The serializer for the SourceSplit of the associated Source. */
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
+	/** The context containing the states of the coordinator. */
+	private final SourceCoordinatorContext<SplitT> context;
+	/** The split enumerator created from the associated Source. */
+	private SplitEnumerator<SplitT, EnumChkT> enumerator;
+	/** A flag marking whether the coordinator has started. */
+	private boolean started;
+
+	public SourceCoordinator(
+			ExecutorService coordinatorExecutor,
+			Source<?, SplitT, EnumChkT> source,
+			SourceCoordinatorContext<SplitT> context) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.source = source;
+		this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+		this.splitSerializer = source.getSplitSerializer();
+		this.context = context;
+		this.enumerator = source.createEnumerator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		LOG.info("Starting split enumerator.");
+		enumerator.start();
+		started = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		ensureStarted();

Review comment:
       Good point. We probably don't have to enforce this.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
##########
@@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinator.class);
+	/** A single-thread executor to handle all the changes to the coordinator. */
+	private final ExecutorService coordinatorExecutor;
+	/** The Source that is associated with this SourceCoordinator. */
+	private final Source<?, SplitT, EnumChkT> source;
+	/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
+	private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
+	/** The serializer for the SourceSplit of the associated Source. */
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
+	/** The context containing the states of the coordinator. */
+	private final SourceCoordinatorContext<SplitT> context;
+	/** The split enumerator created from the associated Source. */
+	private SplitEnumerator<SplitT, EnumChkT> enumerator;
+	/** A flag marking whether the coordinator has started. */
+	private boolean started;
+
+	public SourceCoordinator(
+			ExecutorService coordinatorExecutor,
+			Source<?, SplitT, EnumChkT> source,
+			SourceCoordinatorContext<SplitT> context) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.source = source;
+		this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+		this.splitSerializer = source.getSplitSerializer();
+		this.context = context;
+		this.enumerator = source.createEnumerator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		LOG.info("Starting split enumerator.");
+		enumerator.start();
+		started = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		ensureStarted();
+		LOG.info("Closing SourceCoordinator.");
+		enumerator.close();
+		coordinatorExecutor.shutdown();
+		LOG.info("Source coordinator closed.");
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
+		ensureStarted();
+		coordinatorExecutor.execute(() -> {
+			LOG.debug("Handling event from operator: {}", event);
+			if (event instanceof SourceEventWrapper) {
+				enumerator.handleSourceEvent(subtask, ((SourceEventWrapper) event).getSourceEvent());
+			} else if (event instanceof ReaderRegistrationEvent) {
+				handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
+			}
+		});
+	}
+
+	@Override
+	public void subtaskFailed(int subtaskId) {
+		ensureStarted();
+		coordinatorExecutor.execute(() -> {
+			LOG.info("Handling subtask {} failure.", subtaskId);
+			List<SplitT> splitsToAddBack = context.getAndRemoveUncheckpointedAssignment(subtaskId);
+			context.unregisterSourceReader(subtaskId);
+			LOG.debug("Adding {} back to the split enumerator.", splitsToAddBack);
+			enumerator.addSplitsBack(splitsToAddBack, subtaskId);
+		});
+	}
+
+	@Override
+	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) throws Exception {
+		ensureStarted();
+		return CompletableFuture.supplyAsync(() -> {
+			try {
+				LOG.debug("Taking a state snapshot for checkpoint {}", checkpointId);
+				return toBytes(checkpointId);
+			} catch (Exception e) {
+				throw new FlinkRuntimeException("Failed to checkpoint coordinator due to ", e);
+			}
+		}, coordinatorExecutor);
+	}
+
+	@Override
+	public void checkpointComplete(long checkpointId) {
+		ensureStarted();
+		coordinatorExecutor.execute(() -> {
+			LOG.info("Marking checkpoint {} as completed.", checkpointId);
+			context.onCheckpointComplete(checkpointId);
+		});
+	}
+
+	@Override
+	public void resetToCheckpoint(byte[] checkpointData) throws Exception {
+		if (started) {

Review comment:
       I was not quite sure about whether we should create a new instance of the `SourceCoordinator` or reuse the current one. The main concern is around some dangling tasks in the coordinator executor that are scheduled by the `SplitEnumerator`. Reseting coordinator state on a running split enumerator seems hard to have a determined behavior. To avoid potential impact from the leftover instances, there might be two ways:
   
   1. Do the clean up to close the `SourceCoordinator`, create a new instance and rest the state. This approach is simple to implement and unlikely to have bugs.
   2. Introduce an epoch and bump it up on state restore. Any request or state change from an old epoch will be ignored. This will avoid some object creations, but is more complicated to make right. I am not sure if the gain in the recovery time is enough to justify the complexity.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
##########
@@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the {@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorCoordinator.class);
+	/** A single-thread executor to handle all the changes to the coordinator. */
+	private final ExecutorService coordinatorExecutor;
+	/** The Source that is associated with this SourceCoordinator. */
+	private final Source<?, SplitT, EnumChkT> source;
+	/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
+	private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
+	/** The serializer for the SourceSplit of the associated Source. */
+	private final SimpleVersionedSerializer<SplitT> splitSerializer;
+	/** The context containing the states of the coordinator. */
+	private final SourceCoordinatorContext<SplitT> context;
+	/** The split enumerator created from the associated Source. */
+	private SplitEnumerator<SplitT, EnumChkT> enumerator;
+	/** A flag marking whether the coordinator has started. */
+	private boolean started;
+
+	public SourceCoordinator(
+			ExecutorService coordinatorExecutor,
+			Source<?, SplitT, EnumChkT> source,
+			SourceCoordinatorContext<SplitT> context) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.source = source;
+		this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
+		this.splitSerializer = source.getSplitSerializer();
+		this.context = context;
+		this.enumerator = source.createEnumerator(context);
+		this.started = false;
+	}
+
+	@Override
+	public void start() throws Exception {
+		LOG.info("Starting split enumerator.");
+		enumerator.start();
+		started = true;
+	}
+
+	@Override
+	public void close() throws Exception {
+		ensureStarted();
+		LOG.info("Closing SourceCoordinator.");
+		enumerator.close();
+		coordinatorExecutor.shutdown();

Review comment:
       I was trying to avoid blocking the JM main thread. The executor only takes tasks from either `JobMaster` or `SplitEnumerator`. Given that the `SplitEnumerator` is already closed and the JM probably should already stopped invoking anything on this coordinator at this point. I would expect the shutdown to pretty much be a no-op. Maybe it does not hurt to just wait for the coordinator executor to completely shutdown.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -0,0 +1,259 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.BiConsumer;
+
+/**
+ * A context class for the {@link OperatorCoordinator}. Compared with {@link SplitEnumeratorContext} this class
+ * allows interaction with state and sending {@link OperatorEvent} to the SourceOperator while
+ * {@link SplitEnumeratorContext} only allows sending {@link SourceEvent}.
+ *
+ * <p>The context serves a few purposes:
+ * <ul>
+ *     <li>
+ *         Information provider - The context provides necessary information to the enumerator for it to
+ *         know what is the status of the source readers and their split assignments. These information
+ *         allows the split enumerator to do the coordination.
+ *     </li>
+ *     <li>
+ *         Action taker - The context also provides a few actions that the enumerator can take to carry
+ *         out the coordination. So far there are two actions: 1) assign splits to the source readers.
+ *         and 2) sens a custom {@link SourceEvent SourceEvents} to the source readers.
+ *     </li>
+ *     <li>
+ *         Thread model enforcement - The context ensures that all the manipulations to the coordinator state
+ *         are handled by the same thread.
+ *     </li>
+ * </ul>
+ * @param <SplitT> the type of the splits.
+ */
+@Internal
+public class SourceCoordinatorContext<SplitT extends SourceSplit> implements SplitEnumeratorContext<SplitT> {
+	private final ExecutorService coordinatorExecutor;
+	private final ExecutorNotifier notifier;
+	private final OperatorCoordinator.Context operatorCoordinatorContext;
+	private final ConcurrentMap<Integer, ReaderInfo> registeredReaders;
+	private final SplitAssignmentTracker<SplitT> assignmentTracker;
+	private final String coordinatorThreadName;
+
+	public SourceCoordinatorContext(
+			ExecutorService coordinatorExecutor,
+			String coordinatorThreadName,
+			int numWorkerThreads,
+			OperatorCoordinator.Context operatorCoordinatorContext) {
+		this(coordinatorExecutor, coordinatorThreadName, numWorkerThreads, operatorCoordinatorContext,
+				new SplitAssignmentTracker<>());
+	}
+
+	// Package private method for unit test.
+	SourceCoordinatorContext(
+			ExecutorService coordinatorExecutor,
+			String coordinatorThreadName,
+			int numWorkerThreads,
+			OperatorCoordinator.Context operatorCoordinatorContext,
+			SplitAssignmentTracker<SplitT> splitAssignmentTracker) {
+		this.coordinatorExecutor = coordinatorExecutor;
+		this.notifier = new ExecutorNotifier(
+				Executors.newScheduledThreadPool(numWorkerThreads, new ThreadFactory() {
+					private int index = 0;
+					@Override
+					public Thread newThread(Runnable r) {
+						return new Thread(r, coordinatorThreadName + "-worker-" + index++);
+					}
+				}),
+				coordinatorExecutor);
+		this.operatorCoordinatorContext = operatorCoordinatorContext;
+		this.registeredReaders = new ConcurrentHashMap<>();
+		this.assignmentTracker = splitAssignmentTracker;
+		this.coordinatorThreadName = coordinatorThreadName;
+	}
+
+	@Override
+	public MetricGroup metricGroup() {
+		return null;
+	}
+
+	@Override
+	public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
+		try {
+			operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
+		} catch (TaskNotRunningException e) {
+			throw new FlinkRuntimeException(String.format("Failed to send event %s to subtask %d",
+					event,
+					subtaskId), e);
+		}
+	}
+
+	@Override
+	public int currentParallelism() {
+		return operatorCoordinatorContext.currentParallelism();
+	}
+
+	@Override
+	public Map<Integer, ReaderInfo> registeredReaders() {
+		return Collections.unmodifiableMap(registeredReaders);
+	}
+
+	@Override
+	public void assignSplits(SplitsAssignment<SplitT> assignment) {
+		// Ensure the split assignment is done by the the coordinator executor.
+		if (!Thread.currentThread().getName().equals(coordinatorThreadName)) {

Review comment:
       The thread was created by a thread factory lazily, therefore I took a shortcut... You caught me here :)

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
##########
@@ -0,0 +1,73 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.BiConsumer;
+
+/**
+ * The provider of {@link SourceCoordinator}.
+ */
+public class SourceCoordinatorProvider<SplitT extends SourceSplit>
+		implements OperatorCoordinator.Provider {
+	private final OperatorID operatorID;
+	private final Source<?, SplitT, ?> source;
+	private final int numWorkerThreads;
+
+	/**
+	 * Construct the {@link SourceCoordinatorProvider}.
+	 *
+	 * @param operatorID the ID of the operator this coordinator corresponds to.
+	 * @param source the Source that will be used for this coordinator.
+	 * @param numWorkerThreads the number of threads the should provide to the SplitEnumerator
+	 *                         for doing async calls. See
+	 *                         {@link org.apache.flink.api.connector.source.SplitEnumeratorContext#callAsync(Callable, BiConsumer)
+	 *                         SplitEnumeratorContext.callAsync()}.
+	 */
+	public SourceCoordinatorProvider(
+			OperatorID operatorID,
+			Source<?, SplitT, ?> source,
+			int numWorkerThreads) {
+		this.operatorID = operatorID;
+		this.source = source;
+		this.numWorkerThreads = numWorkerThreads;
+	}
+
+	@Override
+	public OperatorID getOperatorId() {
+		return operatorID;
+	}
+
+	@Override
+	public OperatorCoordinator create(OperatorCoordinator.Context context) {
+		final String coordinatorThreadName = "SourceCoordinator-" + operatorID;
+		ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(

Review comment:
       `DispatcherThreadFactory` seems a little overkilling. It calls System.exit() which cause the entire JM to exit. In our case, I am wondering if it is sufficient to just fail the job?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-605459909


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6785",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156136835",
       "triggerID" : "7f2e408128e5d7929dd4301362a20d8d90c69597",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14e9fe3bfdeeae480047848801243f9fbed03cb4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/156295604",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "bd27b1b4dd97173d85bb0867f7c377ac4c638af5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6793",
       "triggerID" : "605641471",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160993263",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7752",
       "triggerID" : "6a0a147d96dc517574efb6a97786b8b7e8e9c10c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a11dbc2d6b25ff16ef3cff4ecc751538d6867e68",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/163393154",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=539",
       "triggerID" : "360f3e0be00e518c2f268f26aab06d6f27764acf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5b241a0386f6ae029759e076afef6b155b10b328",
       "triggerType" : "PUSH"
     }, {
       "hash" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=565",
       "triggerID" : "475dd8aa6478eaf51b758d3d201f0d4a38c27dd8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ffa6112f5f8fd094f68aeb4013ebf5d026156d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2ffa6112f5f8fd094f68aeb4013ebf5d026156d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a460858a2f8ff4da137ac5d7533cf4cdab7c84f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a460858a2f8ff4da137ac5d7533cf4cdab7c84f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8720b50c34104ea0fe90d38bfebd8c0bd7132769",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8720b50c34104ea0fe90d38bfebd8c0bd7132769",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14e9fe3bfdeeae480047848801243f9fbed03cb4 UNKNOWN
   * a11dbc2d6b25ff16ef3cff4ecc751538d6867e68 UNKNOWN
   * 5b241a0386f6ae029759e076afef6b155b10b328 UNKNOWN
   * 475dd8aa6478eaf51b758d3d201f0d4a38c27dd8 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=565) 
   * 2ffa6112f5f8fd094f68aeb4013ebf5d026156d8 UNKNOWN
   * a460858a2f8ff4da137ac5d7533cf4cdab7c84f1 UNKNOWN
   * 8720b50c34104ea0fe90d38bfebd8c0bd7132769 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] StephanEwen commented on pull request #11554: [FLINK-15101][connector/common] Add SourceCoordinator implementation.

Posted by GitBox <gi...@apache.org>.
StephanEwen commented on pull request #11554:
URL: https://github.com/apache/flink/pull/11554#issuecomment-623172729


   To expedite the merge a bit, how about just (1) switching to `DataOutputStream` (that is a JDK class, not a Flink class, I assume you searched for it in the Flink codebase?) and (2) doing the fatal error handler?
   
   I would be up for taking a patch to make failure handling more fine grained as a next step (it is not a strict must-have, just a nice-to-have).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org