You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by Matrix42 <gi...@git.apache.org> on 2018/07/15 17:30:22 UTC

[GitHub] flink pull request #5801: [FLINK-9121] [flip6] Remove Flip6 prefixes and oth...

Github user Matrix42 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5801#discussion_r202548776
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ---
    @@ -49,30 +54,45 @@ public YarnClusterDescriptor(
     
     	@Override
     	protected String getYarnSessionClusterEntrypoint() {
    -		return YarnApplicationMasterRunner.class.getName();
    +		return YarnSessionClusterEntrypoint.class.getName();
     	}
     
     	@Override
     	protected String getYarnJobClusterEntrypoint() {
    -		throw new UnsupportedOperationException("The old Yarn descriptor does not support proper per-job mode.");
    +		return YarnJobClusterEntrypoint.class.getName();
     	}
     
     	@Override
    -	public YarnClusterClient deployJobCluster(
    -			ClusterSpecification clusterSpecification,
    -			JobGraph jobGraph,
    -			boolean detached) {
    -		throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
    +	public ClusterClient<ApplicationId> deployJobCluster(
    +		ClusterSpecification clusterSpecification,
    +		JobGraph jobGraph,
    +		boolean detached) throws ClusterDeploymentException {
    +
    +		// this is required because the slots are allocated lazily
    +		jobGraph.setAllowQueuedScheduling(true);
    +
    +		try {
    +			return deployInternal(
    +				clusterSpecification,
    +				"Flink per-job cluster",
    +				getYarnJobClusterEntrypoint(),
    +				jobGraph,
    +				detached);
    +		} catch (Exception e) {
    +			throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
    +		}
     	}
     
     	@Override
    -	protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration flinkConfiguration, boolean perJobCluster) throws Exception {
    -		return new YarnClusterClient(
    -			descriptor,
    -			numberTaskManagers,
    -			slotsPerTaskManager,
    -			report,
    +	protected ClusterClient<ApplicationId> createYarnClusterClient(
    +			AbstractYarnClusterDescriptor descriptor,
    +			int numberTaskManagers,
    +			int slotsPerTaskManager,
    +			ApplicationReport report,
    +			Configuration flinkConfiguration,
    +			boolean perJobCluster) throws Exception {
    +		return new RestClusterClient<>(
    --- End diff --
    
    why don't return a YarnClusterClient here?


---