You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/07/06 14:58:50 UTC

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4271

    [FLINK-7113] Make ClusterDescriptor independent of cluster size

    This PR is based on #4270.
    
    In order to pull some of the cluster deployment logic out of the `AbstractYarnClusterDescriptor`, this PR introduces a `ClusterSpecification`. This specification contains information about the size of the cluster to be started. Instead of setting these values via setters directly on the `ClusterDescriptor`, we provide the information at deployment time via the `ClusterSpecification`.
    
    The way I envision the `ClusterDescriptor` to work, is to only contain information which is necessary to talk to a resource manager, such as Mesos or Yarn. All Flink cluster specific information should be provided to the `deploy` methods. This also allows us better share code between different `ClusterDescriptor`, e.g. Mesos and Yarn.
    
    This is only the start of a refactoring of the `ClusterDescriptor`. Much more information can be pulled out of the `AbstractYarnClusterDescriptor`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink refactorClusterDescriptor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4271.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4271
    
----
commit 9cfb9718d7b64e253aaf636dc17b8939e2592a98
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-06T07:33:18Z

    [FLINK-7110] [client] Add per-job cluster deployment interface

commit 0143d7e33b7d99ce8371e455458b730ba2c13d06
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-07-06T12:00:21Z

    [FLINK-7113] Make ClusterDescriptor independent of cluster size
    
    The deploySession method now is given a ClusterSpecification which specifies the
    size of the cluster which it is supposed to deploy.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r125944216
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws Exception {
     			taskManagerMemoryMb =  yarnMinAllocationMB;
     		}
     
    -		// Create application via yarnClient
    -		final YarnClientApplication yarnApplication = yarnClient.createApplication();
    -		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
    -
    -		Resource maxRes = appResponse.getMaximumResourceCapability();
     		final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
    -		if (jobManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    +		if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
    -				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
    +				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
     		}
     
    -		if (taskManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    +		if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
    -				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
    +				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
     		}
     
     		final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
     			"connecting from the beginning because the resources are currently not available in the cluster. " +
     			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
     			"the resources become available.";
     		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
    -		ClusterResourceDescription freeClusterMem;
    -		try {
    -			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    -		} catch (YarnException | IOException e) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    -			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
    -		}
     
    -		if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
    +
    +		if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
     			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
    -				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
    +				+ "There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
     
     		}
    -		if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
    +		if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
    -				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
    +				+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
     		}
    -		if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
    +		if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
    -				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
    +				+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
     		}
     
     		// ----------------- check if the requested containers fit into the cluster.
     
    -		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
    +		int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
     		// first, allocate the jobManager somewhere.
     		if (!allocateResource(nmFree, jobManagerMemoryMb)) {
     			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
     				"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
    -				Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
    +				Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
     		}
     		// allocate TaskManagers
     		for (int i = 0; i < taskManagerCount; i++) {
     			if (!allocateResource(nmFree, taskManagerMemoryMb)) {
     				LOG.warn("There is not enough memory available in the YARN cluster. " +
     					"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
    -					"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
    +					"NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
     					"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
     					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + noteRsc);
     			}
     		}
     
    -		ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);
    +		return new ClusterSpecification(
    +			jobManagerMemoryMb,
    +			taskManagerMemoryMb,
    +			clusterSpecification.getNumberTaskManagers(),
    +			clusterSpecification.getSlotsPerTaskManager());
    +
    +	}
    +
    +	protected void logClusterSpecification(ClusterSpecification clusterSpecification) {
    --- End diff --
    
    I would remove this method as it is only used once and isn't really more readable than the inline alternative.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4271


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r129531644
  
    --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---
    @@ -631,12 +634,15 @@ protected static void sendOutput() {
     	protected static class Runner extends Thread {
     		private final String[] args;
     		private final int expectedReturnValue;
    +		private final org.apache.flink.configuration.Configuration configuration;
    +
     		private RunTypes type;
     		private FlinkYarnSessionCli yCli;
     		private Throwable runnerError;
     
    -		public Runner(String[] args, RunTypes type, int expectedReturnValue) {
    +		public Runner(String[] args, org.apache.flink.configuration.Configuration configuration, RunTypes type, int expectedReturnValue) {
    --- End diff --
    
    True. Then I think it is not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r129532134
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws Exception {
     			taskManagerMemoryMb =  yarnMinAllocationMB;
     		}
     
    -		// Create application via yarnClient
    -		final YarnClientApplication yarnApplication = yarnClient.createApplication();
    -		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
    -
    -		Resource maxRes = appResponse.getMaximumResourceCapability();
     		final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
    -		if (jobManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    +		if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
    -				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
    +				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
     		}
     
    -		if (taskManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    +		if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
    -				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
    +				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
     		}
     
     		final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
     			"connecting from the beginning because the resources are currently not available in the cluster. " +
     			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
     			"the resources become available.";
     		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
    -		ClusterResourceDescription freeClusterMem;
    -		try {
    -			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    -		} catch (YarnException | IOException e) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    -			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
    -		}
     
    -		if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
    +
    +		if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
     			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
    -				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
    +				+ "There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
     
     		}
    -		if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
    +		if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
    -				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
    +				+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
     		}
    -		if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
    +		if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
    -				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
    +				+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
     		}
     
     		// ----------------- check if the requested containers fit into the cluster.
     
    -		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
    +		int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
     		// first, allocate the jobManager somewhere.
     		if (!allocateResource(nmFree, jobManagerMemoryMb)) {
     			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
     				"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
    -				Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
    +				Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
     		}
     		// allocate TaskManagers
     		for (int i = 0; i < taskManagerCount; i++) {
     			if (!allocateResource(nmFree, taskManagerMemoryMb)) {
     				LOG.warn("There is not enough memory available in the YARN cluster. " +
     					"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
    -					"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
    +					"NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
     					"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
     					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + noteRsc);
     			}
     		}
     
    -		ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);
    +		return new ClusterSpecification(
    +			jobManagerMemoryMb,
    +			taskManagerMemoryMb,
    +			clusterSpecification.getNumberTaskManagers(),
    +			clusterSpecification.getSlotsPerTaskManager());
    +
    +	}
    +
    +	protected void logClusterSpecification(ClusterSpecification clusterSpecification) {
    --- End diff --
    
    True. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4271: [FLINK-7113] Make ClusterDescriptor independent of cluste...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/4271
  
    Thanks for the review @zentol. I've addressed your comments and once Travis gives green light, I'll merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r125938365
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.client.deployment;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +
    +/**
    + * Description of the cluster to start by the {@link ClusterDescriptor}.
    + */
    +public class ClusterSpecification {
    +	private final int masterMemoryMB;
    +	private final int taskManagerMemoryMB;
    +	private final int numberTaskManagers;
    +	private final int slotsPerTaskManager;
    +
    +	public ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
    --- End diff --
    
    I would add a builder for this class since constructor calls look like this `new ClusterSpecification(1,2,3,4)` which is rather unreadable and easily leads to mistakes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r125938448
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.client.deployment;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +
    +/**
    + * Description of the cluster to start by the {@link ClusterDescriptor}.
    + */
    +public class ClusterSpecification {
    +	private final int masterMemoryMB;
    +	private final int taskManagerMemoryMB;
    +	private final int numberTaskManagers;
    +	private final int slotsPerTaskManager;
    +
    +	public ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
    +		this.masterMemoryMB = masterMemoryMB;
    +		this.taskManagerMemoryMB = taskManagerMemoryMB;
    +		this.numberTaskManagers = numberTaskManagers;
    +		this.slotsPerTaskManager = slotsPerTaskManager;
    +	}
    +
    +	public int getMasterMemoryMB() {
    +		return masterMemoryMB;
    +	}
    +
    +	public int getTaskManagerMemoryMB() {
    +		return taskManagerMemoryMB;
    +	}
    +
    +	public int getNumberTaskManagers() {
    +		return numberTaskManagers;
    +	}
    +
    +	public int getSlotsPerTaskManager() {
    +		return slotsPerTaskManager;
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "ClusterSpecification{" +
    +			"masterMemoryMB=" + masterMemoryMB +
    +			", taskManagerMemoryMB=" + taskManagerMemoryMB +
    +			", numberTaskManagers=" + numberTaskManagers +
    +			", slotsPerTaskManager=" + slotsPerTaskManager +
    +			'}';
    +	}
    +
    +	public static ClusterSpecification fromConfiguration(Configuration configuration) {
    +		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
    +
    +		int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
    +		int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
    +
    +
    --- End diff --
    
    remove 2nd empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r129526492
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.client.deployment;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +
    +/**
    + * Description of the cluster to start by the {@link ClusterDescriptor}.
    + */
    +public class ClusterSpecification {
    +	private final int masterMemoryMB;
    +	private final int taskManagerMemoryMB;
    +	private final int numberTaskManagers;
    +	private final int slotsPerTaskManager;
    +
    +	public ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
    +		this.masterMemoryMB = masterMemoryMB;
    +		this.taskManagerMemoryMB = taskManagerMemoryMB;
    +		this.numberTaskManagers = numberTaskManagers;
    +		this.slotsPerTaskManager = slotsPerTaskManager;
    +	}
    +
    +	public int getMasterMemoryMB() {
    +		return masterMemoryMB;
    +	}
    +
    +	public int getTaskManagerMemoryMB() {
    +		return taskManagerMemoryMB;
    +	}
    +
    +	public int getNumberTaskManagers() {
    +		return numberTaskManagers;
    +	}
    +
    +	public int getSlotsPerTaskManager() {
    +		return slotsPerTaskManager;
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "ClusterSpecification{" +
    +			"masterMemoryMB=" + masterMemoryMB +
    +			", taskManagerMemoryMB=" + taskManagerMemoryMB +
    +			", numberTaskManagers=" + numberTaskManagers +
    +			", slotsPerTaskManager=" + slotsPerTaskManager +
    +			'}';
    +	}
    +
    +	public static ClusterSpecification fromConfiguration(Configuration configuration) {
    +		int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
    +
    +		int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
    +		int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
    +
    +
    --- End diff --
    
    Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r125944668
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws Exception {
     			taskManagerMemoryMb =  yarnMinAllocationMB;
     		}
     
    -		// Create application via yarnClient
    -		final YarnClientApplication yarnApplication = yarnClient.createApplication();
    -		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
    -
    -		Resource maxRes = appResponse.getMaximumResourceCapability();
     		final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
    -		if (jobManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    +		if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
    -				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
    +				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
     		}
     
    -		if (taskManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    +		if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
    -				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
    +				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
     		}
     
     		final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
     			"connecting from the beginning because the resources are currently not available in the cluster. " +
     			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
     			"the resources become available.";
     		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
    -		ClusterResourceDescription freeClusterMem;
    -		try {
    -			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    -		} catch (YarnException | IOException e) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    -			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
    -		}
     
    -		if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
    +
    +		if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
     			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
    -				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
    +				+ "There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
     
     		}
    -		if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
    +		if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
    -				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
    +				+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
     		}
    -		if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
    +		if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
    -				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
    +				+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
     		}
     
     		// ----------------- check if the requested containers fit into the cluster.
     
    -		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
    +		int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
     		// first, allocate the jobManager somewhere.
     		if (!allocateResource(nmFree, jobManagerMemoryMb)) {
     			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
     				"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
    -				Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
    +				Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
     		}
     		// allocate TaskManagers
     		for (int i = 0; i < taskManagerCount; i++) {
     			if (!allocateResource(nmFree, taskManagerMemoryMb)) {
     				LOG.warn("There is not enough memory available in the YARN cluster. " +
     					"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
    -					"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
    +					"NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
     					"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
     					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + noteRsc);
     			}
     		}
     
    -		ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);
    +		return new ClusterSpecification(
    +			jobManagerMemoryMb,
    +			taskManagerMemoryMb,
    +			clusterSpecification.getNumberTaskManagers(),
    +			clusterSpecification.getSlotsPerTaskManager());
    +
    +	}
    +
    +	protected void logClusterSpecification(ClusterSpecification clusterSpecification) {
    +		LOG.info("Cluster specification: {}", clusterSpecification);
    +	}
    +
    +	/**
    +	 * This method will block until the ApplicationMaster/JobManager have been
    +	 * deployed on YARN.
    +	 */
    +	protected YarnClusterClient deployInternal(ClusterSpecification clusterSpecification) throws Exception {
    +
    +		isReadyForDeployment(clusterSpecification);
    +
    +		final YarnClient yarnClient = getYarnClient();
    +
    +		// ------------------ Check if the specified queue exists --------------------
    +
    +		checkYarnQueues(yarnClient);
    +
    +		// ------------------ Add dynamic properties to local flinkConfiguraton ------
    +		Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
    +		for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
    +			flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
    +		}
    +
    --- End diff --
    
    semove second empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r125943654
  
    --- Diff: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---
    @@ -631,12 +634,15 @@ protected static void sendOutput() {
     	protected static class Runner extends Thread {
     		private final String[] args;
     		private final int expectedReturnValue;
    +		private final org.apache.flink.configuration.Configuration configuration;
    +
     		private RunTypes type;
     		private FlinkYarnSessionCli yCli;
     		private Throwable runnerError;
     
    -		public Runner(String[] args, RunTypes type, int expectedReturnValue) {
    +		public Runner(String[] args, org.apache.flink.configuration.Configuration configuration, RunTypes type, int expectedReturnValue) {
    --- End diff --
    
    what is the configuration argument for? It doesn't appear to be used anywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r129526011
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.client.deployment;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.JobManagerOptions;
    +import org.apache.flink.configuration.TaskManagerOptions;
    +
    +/**
    + * Description of the cluster to start by the {@link ClusterDescriptor}.
    + */
    +public class ClusterSpecification {
    +	private final int masterMemoryMB;
    +	private final int taskManagerMemoryMB;
    +	private final int numberTaskManagers;
    +	private final int slotsPerTaskManager;
    +
    +	public ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
    --- End diff --
    
    Good point. I'll add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r129526528
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---
    @@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws Exception {
     			taskManagerMemoryMb =  yarnMinAllocationMB;
     		}
     
    -		// Create application via yarnClient
    -		final YarnClientApplication yarnApplication = yarnClient.createApplication();
    -		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
    -
    -		Resource maxRes = appResponse.getMaximumResourceCapability();
     		final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
    -		if (jobManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    +		if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
    -				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
    +				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
     		}
     
    -		if (taskManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    +		if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
    -				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
    +				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
     		}
     
     		final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
     			"connecting from the beginning because the resources are currently not available in the cluster. " +
     			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
     			"the resources become available.";
     		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
    -		ClusterResourceDescription freeClusterMem;
    -		try {
    -			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    -		} catch (YarnException | IOException e) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    -			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
    -		}
     
    -		if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
    +
    +		if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
     			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
    -				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
    +				+ "There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
     
     		}
    -		if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
    +		if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
    -				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
    +				+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
     		}
    -		if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
    +		if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
    -				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
    +				+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
     		}
     
     		// ----------------- check if the requested containers fit into the cluster.
     
    -		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
    +		int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
     		// first, allocate the jobManager somewhere.
     		if (!allocateResource(nmFree, jobManagerMemoryMb)) {
     			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
     				"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
    -				Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
    +				Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
     		}
     		// allocate TaskManagers
     		for (int i = 0; i < taskManagerCount; i++) {
     			if (!allocateResource(nmFree, taskManagerMemoryMb)) {
     				LOG.warn("There is not enough memory available in the YARN cluster. " +
     					"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
    -					"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
    +					"NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
     					"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
     					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + noteRsc);
     			}
     		}
     
    -		ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);
    +		return new ClusterSpecification(
    +			jobManagerMemoryMb,
    +			taskManagerMemoryMb,
    +			clusterSpecification.getNumberTaskManagers(),
    +			clusterSpecification.getSlotsPerTaskManager());
    +
    +	}
    +
    +	protected void logClusterSpecification(ClusterSpecification clusterSpecification) {
    +		LOG.info("Cluster specification: {}", clusterSpecification);
    +	}
    +
    +	/**
    +	 * This method will block until the ApplicationMaster/JobManager have been
    +	 * deployed on YARN.
    +	 */
    +	protected YarnClusterClient deployInternal(ClusterSpecification clusterSpecification) throws Exception {
    +
    +		isReadyForDeployment(clusterSpecification);
    +
    +		final YarnClient yarnClient = getYarnClient();
    +
    +		// ------------------ Check if the specified queue exists --------------------
    +
    +		checkYarnQueues(yarnClient);
    +
    +		// ------------------ Add dynamic properties to local flinkConfiguraton ------
    +		Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
    +		for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
    +			flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
    +		}
    +
    --- End diff --
    
    Good catch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---