You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/02 07:29:46 UTC

[GitHub] [flink] aljoscha opened a new pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

aljoscha opened a new pull request #13530:
URL: https://github.com/apache/flink/pull/13530


   ## What is the purpose of the change
   
   `TestStreamEnvironment` only overrides `execute()` but not `executeAsync()`. Calls against the latter therefore don't use the shared `MiniCluster` but a new cluster is spun up for every job.
   
   This changes `TestStreamEnvironment` to always use the shared `MiniCluster` by injecting a custom `PipelineExecutorServiceLoader` and not overriding any of the `execute*()` methods.
   
   ## Brief change log
   
   As always, the commit messages describe what each commit does.
   
    - remodel `PerJobMiniClusterClient` to be usable for all `MiniCluster` job purposes
    - change `TestStreamEnvironment` to use a custom `PipelineExecutorServiceLoader`
    - the custom  `PipelineExecutorServiceLoader` goes directly against a `MiniCluster` instead of the `JobExecutor` because we want to do async job submission
    - remove now-unused `JobExecutor` interface
   
   This quote describes the rationale behind removing `JobExecutor` and mentions an alternative solution:
   
   > Instead, we use a custom PipelineExecutorServiceLoader to inject a
   > MiniClusterExecutor. This requires that we directly use MiniCluster
   > instead of the JobExecutor interface in the test environments because we
   > need to use asynchronous job submission. The alternative would be to
   > extend the JobExecutor interface to allow async job submission.
   
   ## Verifying this change
   
   Covered by existing tests that use `TestStreamEnvironment`.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-703460832


   Thanks for the review! I changed the boolean to an enum but I'm not sure about the naming, so very open to any suggestion.
   
   About formatting: I use the Google Java Style as the `google-java-format` tool applies but with tabs instead of spaces. That one consistently uses 2 levels of indentation for both arguments and parameters and continuation while using 1 level of indentation for blocks. I like how it visually makes arguments more distinct from code blocks and the internal consistency. Plus, it's compatible with our Checkstyle rules. I would prefer if we had a consistent code style that is enforceable and can automatically be applied but while we don't have that I'll use a style I like in new code that is compatible with our rules. Sorry for the longer block of text. 😅 I can still undo the whitespace changes here.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     }, {
       "hash" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196",
       "triggerID" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201",
       "triggerID" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47ca19a74e11c72842124852875262959477c459",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7207",
       "triggerID" : "47ca19a74e11c72842124852875262959477c459",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201) 
   * 47ca19a74e11c72842124852875262959477c459 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7207) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha merged pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
aljoscha merged pull request #13530:
URL: https://github.com/apache/flink/pull/13530


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 2513a954016debd48694a5097a5c2a48943ca532 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     }, {
       "hash" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196",
       "triggerID" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201",
       "triggerID" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     }, {
       "hash" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196",
       "triggerID" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 2513a954016debd48694a5097a5c2a48943ca532 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161) 
   * 66eea9dec0e82a98bd20f1716caa0bbb3be9146e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-703583316


   Thanks for the explanation of the formatting changes @aljoscha. I agree that an automated way is superior to an inconsistent manual approach. I guess adopting this would be easiest if someone shared a consistent set of settings for `google-java-format` (potentially also for other IDEs than IntelliJ). But this PR should not be blocked on this. I am fine with either formatting changes because it is not a big problem.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     }, {
       "hash" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 2513a954016debd48694a5097a5c2a48943ca532 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161) 
   * 66eea9dec0e82a98bd20f1716caa0bbb3be9146e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     }, {
       "hash" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196",
       "triggerID" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 2513a954016debd48694a5097a5c2a48943ca532 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161) 
   * 66eea9dec0e82a98bd20f1716caa0bbb3be9146e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196) 
   * 88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     }, {
       "hash" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196",
       "triggerID" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201",
       "triggerID" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 2513a954016debd48694a5097a5c2a48943ca532 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161) 
   * 66eea9dec0e82a98bd20f1716caa0bbb3be9146e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196) 
   * 88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tillrohrmann commented on a change in pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #13530:
URL: https://github.com/apache/flink/pull/13530#discussion_r498922308



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobClient.java
##########
@@ -43,23 +43,30 @@
 /**
  * A {@link JobClient} for a {@link MiniCluster}.
  */
-public final class PerJobMiniClusterJobClient implements JobClient, CoordinationRequestGateway {
+public final class MiniClusterJobClient implements JobClient, CoordinationRequestGateway {
 
-	private static final Logger LOG = LoggerFactory.getLogger(PerJobMiniClusterJobClient.class);
+	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobClient.class);
 
 	private final JobID jobID;
 	private final MiniCluster miniCluster;
-	private final CompletableFuture<JobResult> jobResultFuture;
 	private final ClassLoader classLoader;
+	private final CompletableFuture<JobResult> jobResultFuture;
 
-	public PerJobMiniClusterJobClient(JobID jobID, MiniCluster miniCluster, ClassLoader classLoader) {
+	/**
+	 * Creates a {@link MiniClusterJobClient} for the given {@link JobID} and {@link MiniCluster}.
+	 * This will shut down the {@code MiniCluster} after job result retrieval if {@code
+	 * shutdownCluster} is {@code true}.
+	 */
+	public MiniClusterJobClient(JobID jobID, MiniCluster miniCluster, ClassLoader classLoader, boolean shutdownCluster) {

Review comment:
       I tend to prefer enums instead of booleans for parameters because it makes it explicit what `true` and `false` means.

##########
File path: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
##########
@@ -19,52 +19,37 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.client.deployment.executors.LocalExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.JobExecutor;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.util.Preconditions;
 
 import java.net.URL;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
 /**
- * A {@link ExecutionEnvironment} implementation which executes its jobs on a
- * {@link MiniCluster}.
+ * A {@link ExecutionEnvironment} implementation which executes its jobs on a {@link MiniCluster}.
  */
 public class TestEnvironment extends ExecutionEnvironment {
 
-	private final JobExecutor jobExecutor;
-
-	private final Collection<Path> jarFiles;
-
-	private final Collection<URL> classPaths;
+	private final MiniCluster miniCluster;
 
 	private TestEnvironment lastEnv;
 
 	public TestEnvironment(
-			JobExecutor jobExecutor,
+			MiniCluster miniCluster,
 			int parallelism,
 			boolean isObjectReuseEnabled,
 			Collection<Path> jarFiles,
 			Collection<URL> classPaths) {
-		this.jobExecutor = Preconditions.checkNotNull(jobExecutor);
-		this.jarFiles = Preconditions.checkNotNull(jarFiles);
-		this.classPaths = Preconditions.checkNotNull(classPaths);
-		getConfiguration().set(DeploymentOptions.TARGET, LocalExecutor.NAME);
-		getConfiguration().set(DeploymentOptions.ATTACHED, true);
+		super(
+				new MiniClusterPipelineExecutorServiceLoader(miniCluster),
+				MiniClusterPipelineExecutorServiceLoader.createConfiguration(jarFiles, classPaths),
+				null);

Review comment:
       nit: this looks too deeply intended.

##########
File path: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
##########
@@ -78,58 +63,33 @@ public TestEnvironment(
 	}
 
 	public TestEnvironment(
-			JobExecutor executor,
+			MiniCluster executor,
 			int parallelism,
 			boolean isObjectReuseEnabled) {
 		this(
-			executor,
-			parallelism,
-			isObjectReuseEnabled,
-			Collections.emptyList(),
-			Collections.emptyList());
+				executor,
+				parallelism,
+				isObjectReuseEnabled,
+				Collections.emptyList(),
+				Collections.emptyList());

Review comment:
       I think an extra tab of indentation is probably not correct here.

##########
File path: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
##########
@@ -174,15 +129,15 @@ public ExecutionEnvironment createExecutionEnvironment() {
 	 * environment executes the given jobs on a Flink mini cluster with the given default
 	 * parallelism and the additional jar files and class paths.
 	 *
-	 * @param jobExecutor The executor to run the jobs on
+	 * @param miniCluster The MiniCluster to execute jobs on.
 	 * @param parallelism The default parallelism
 	 */
-	public static void setAsContext(final JobExecutor jobExecutor, final int parallelism) {
+	public static void setAsContext(final MiniCluster miniCluster, final int parallelism) {
 		setAsContext(
-			jobExecutor,
-			parallelism,
-			Collections.emptyList(),
-			Collections.emptyList());
+				miniCluster,
+				parallelism,
+				Collections.emptyList(),
+				Collections.emptyList());

Review comment:
       nit: I am not sure about this indentation here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     }, {
       "hash" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196",
       "triggerID" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201",
       "triggerID" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47ca19a74e11c72842124852875262959477c459",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "47ca19a74e11c72842124852875262959477c459",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201) 
   * 47ca19a74e11c72842124852875262959477c459 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     }, {
       "hash" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196",
       "triggerID" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201",
       "triggerID" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "triggerType" : "PUSH"
     }, {
       "hash" : "47ca19a74e11c72842124852875262959477c459",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7207",
       "triggerID" : "47ca19a74e11c72842124852875262959477c459",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 47ca19a74e11c72842124852875262959477c459 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7207) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702575381


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit c0c3a012813e6bf76ee374073218ab647f1aa1be (Fri Oct 02 07:32:45 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 2513a954016debd48694a5097a5c2a48943ca532 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 2513a954016debd48694a5097a5c2a48943ca532 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13530: [FLINK-19123] Use shared MiniCluster for executeAsync() in TestStreamEnvironment

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13530:
URL: https://github.com/apache/flink/pull/13530#issuecomment-702582807


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c0c3a012813e6bf76ee374073218ab647f1aa1be",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2513a954016debd48694a5097a5c2a48943ca532",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7161",
       "triggerID" : "2513a954016debd48694a5097a5c2a48943ca532",
       "triggerType" : "PUSH"
     }, {
       "hash" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196",
       "triggerID" : "66eea9dec0e82a98bd20f1716caa0bbb3be9146e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201",
       "triggerID" : "88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c0c3a012813e6bf76ee374073218ab647f1aa1be UNKNOWN
   * 66eea9dec0e82a98bd20f1716caa0bbb3be9146e Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7196) 
   * 88ec25aa3dc2db1f5d35a4351d5a26a5e353c7cc Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7201) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org