You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by Matrix42 <gi...@git.apache.org> on 2018/07/15 17:30:22 UTC
[GitHub] flink pull request #5801: [FLINK-9121] [flip6] Remove Flip6 prefixes and oth...
Github user Matrix42 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5801#discussion_r202548776
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ---
@@ -49,30 +54,45 @@ public YarnClusterDescriptor(
@Override
protected String getYarnSessionClusterEntrypoint() {
- return YarnApplicationMasterRunner.class.getName();
+ return YarnSessionClusterEntrypoint.class.getName();
}
@Override
protected String getYarnJobClusterEntrypoint() {
- throw new UnsupportedOperationException("The old Yarn descriptor does not support proper per-job mode.");
+ return YarnJobClusterEntrypoint.class.getName();
}
@Override
- public YarnClusterClient deployJobCluster(
- ClusterSpecification clusterSpecification,
- JobGraph jobGraph,
- boolean detached) {
- throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
+ public ClusterClient<ApplicationId> deployJobCluster(
+ ClusterSpecification clusterSpecification,
+ JobGraph jobGraph,
+ boolean detached) throws ClusterDeploymentException {
+
+ // this is required because the slots are allocated lazily
+ jobGraph.setAllowQueuedScheduling(true);
+
+ try {
+ return deployInternal(
+ clusterSpecification,
+ "Flink per-job cluster",
+ getYarnJobClusterEntrypoint(),
+ jobGraph,
+ detached);
+ } catch (Exception e) {
+ throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
+ }
}
@Override
- protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
- return new YarnClusterClient(
- descriptor,
- numberTaskManagers,
- slotsPerTaskManager,
- report,
+ protected ClusterClient<ApplicationId> createYarnClusterClient(
+ AbstractYarnClusterDescriptor descriptor,
+ int numberTaskManagers,
+ int slotsPerTaskManager,
+ ApplicationReport report,
+ Configuration flinkConfiguration,
+ boolean perJobCluster) throws Exception {
+ return new RestClusterClient<>(
--- End diff --
why don't return a YarnClusterClient here?
---