You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by shuai-xu <gi...@git.apache.org> on 2017/02/14 06:01:18 UTC

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

GitHub user shuai-xu opened a pull request:

    https://github.com/apache/flink/pull/3304

    [FLINK-5791] [runtime] Resource should be strictly matched when allocating for yarn

    This pr is for jira #[5791](https://issues.apache.org/jira/browse/FLINK-5791).
    
    It has following changes:
    1. Yarn RM will pass the real ResourceProfile to TM for initializing the slots.
    2. Add a SMRFSlotManager for allocating slots only resource strictly equal.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shuai-xu/flink jira-5791

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3304.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3304
    
----
commit 5d00959feed419ade08218a5570b884d88108dcf
Author: shuai.xus <sh...@alibaba-inc.com>
Date:   2017-02-14T05:48:17Z

    [FLINK-5791] [runtime] Resource should be strictly matched when allocating for yarn

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] [FLIP-6] Slots should be st...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r104160736
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ---
    @@ -119,6 +122,9 @@ public boolean equals(Object obj) {
     		}
     		else if (obj != null && obj.getClass() == ResourceProfile.class) {
     			ResourceProfile that = (ResourceProfile) obj;
    +			if (this == ResourceProfile.UNIVERSAL || that == ResourceProfile.UNIVERSAL) {
    +				return true;
    +			}
    --- End diff --
    
    Here you're basically breaking the Java contract that `if a.equals(b), then a.hashCode() == b.hashCode()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102899339
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
    @@ -168,6 +168,11 @@
     	public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
     
     	/**
    +     * The config parameter defining the task manager resource profile.
    +	 */
    +	public static final String TASK_MANAGER_RESOURCE_PROFILE_KEY = "taskmanager.resourceProfile";
    --- End diff --
    
    Yes, it is a internal config. I though flink put all config consts here and use @PublicEvolving to identify the ones can be used by users. Do you have any suggestion for where to put the internal configs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102907061
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -376,4 +388,23 @@ private int generatePriority(ResourceProfile resourceProfile) {
     		}
     	}
     
    +	/**
    +	 * Get resource profile by given the container priority. It should always be got from the mapping
    +	 * between resource profile and priority, otherwise it will throw the exception.
    +	 *
    +	 * @param priority The container priority used for distinguishing different resources.
    +	 * @return The resource profile corresponding with the priority.
    +	 * @throws Exception
    +	 */
    +	private ResourceProfile getResourceProfile(int priority) throws Exception{
    +		if (resourcePriorities.containsValue(priority)) {
    +			for (Map.Entry<ResourceProfile, Integer> entry : resourcePriorities.entrySet()) {
    +				if (entry.getValue() == priority) {
    +					return entry.getKey();
    +				}
    +			}
    +		}
    --- End diff --
    
    This is a confusing part in yarn. Now the priority is only used to distinguish requests of different resource. It has nothing to do with importance. For slot request of different resources, we should generate different priorities for them, and put priority and resource profile to the resourcePriorities map. When yarn containers come back, we know they should start the task manager with which resource profile by query the map with the container priority. And when task manager offer slot with the resource profile to job master, we can exactly match the slot with the original request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] [FLIP-6] Slots should be st...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu closed the pull request at:

    https://github.com/apache/flink/pull/3304


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3304: [FLINK-5791] [runtime] Resource should be strictly matche...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3304
  
    Hi @shuai-xu, thanks for your contribution :-) I'm currently working on a new implementation of the slot manager. So we might have to rebase this PR then. But the changes are not too big. I try to review your PR in the next week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102745192
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
    @@ -198,9 +197,9 @@ public static TaskManagerServices fromConfiguration(
     
     		final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths());
     
    -		final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
    +		final List<ResourceProfile> resourceProfiles = taskManagerServicesConfiguration.getResourceProfiles();
     
    -		for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
    +		for (int i = resourceProfiles.size(); i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
     			resourceProfiles.add(new ResourceProfile(1.0, 42L));
    --- End diff --
    
    I think we should make sure that Flink still works also if you don't have a resource profile specified. Maybe introducing a resource profile which matches all other resource profiles or by trying to derive it somehow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102684801
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -245,7 +248,7 @@ public void onContainersAllocated(List<Container> containers) {
     			try {
     				/** Context information used to start a TaskExecutor Java process */
     				ContainerLaunchContext taskExecutorLaunchContext =
    -						createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost());
    +						createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost(), container.getPriority());
    --- End diff --
    
    The resource profile for the allocated container should be retrievable from `container` itself. This more robust because we don't know whether Yarn could actually fulfil our request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3304: [FLINK-5791] [runtime] Resource should be strictly matche...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on the issue:

    https://github.com/apache/flink/pull/3304
  
    @tillrohrmann ,OK, thank you


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r103130387
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -342,6 +346,14 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
     		FiniteDuration teRegistrationTimeout = new FiniteDuration(timeout, TimeUnit.SECONDS);
     		final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
     				flinkConfig, "", 0, 1, teRegistrationTimeout);
    +		// Add resource profile of slots to task executor config. 
    +		// For yarn, all slots in a task executor have same resource profile 
    +		ByteArrayOutputStream output = new ByteArrayOutputStream();
    +		ObjectOutputStream rpOutput = new ObjectOutputStream(output);
    +		rpOutput.writeObject(getResourceProfile(priority.getPriority()));
    +		rpOutput.close();
    +		taskManagerConfig.setString(ConfigConstants.TASK_MANAGER_RESOURCE_PROFILE_KEY,
    +				new String(Base64.encodeBase64(output.toByteArray())));
    --- End diff --
    
    Thank @rmetzger for the explanation. I think maybe we can use flink-conf.yaml temporally and add a separate config file later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] [FLIP-6] Slots should be st...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r104159696
  
    --- Diff: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
    +import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
    +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
    +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.TestingSerialRpcService;
    +import org.apache.flink.util.TestLogger;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.util.LinkedList;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.mockito.Mockito.mock;
    +
    +public class YarnResourceManagerTest extends TestLogger {
    +
    +	private YarnResourceManager yarnResourceManager;
    +
    +	@Before
    +	public void setup() {
    +		Configuration conf = new Configuration();
    +		ResourceManagerConfiguration rmConfig =
    +				new ResourceManagerConfiguration(Time.seconds(1), Time.seconds(10));
    +		yarnResourceManager = new YarnResourceManager(conf, null, new TestingSerialRpcService(), rmConfig,
    +				mock(HighAvailabilityServices.class), mock(SlotManagerFactory.class),
    +				mock(MetricRegistry.class),	 mock(JobLeaderIdService.class),
    +                mock(FatalErrorHandler.class));
    +	}
    +
    +	@After
    +	public void teardown() {
    +	}
    --- End diff --
    
    If this method is empty, then it's better to not define it all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r103129000
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
    @@ -198,9 +197,9 @@ public static TaskManagerServices fromConfiguration(
     
     		final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths());
     
    -		final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
    +		final List<ResourceProfile> resourceProfiles = taskManagerServicesConfiguration.getResourceProfiles();
     
    -		for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
    +		for (int i = resourceProfiles.size(); i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
     			resourceProfiles.add(new ResourceProfile(1.0, 42L));
    --- End diff --
    
    Now only yarn resource manager will add resource profiles in the config, for standalone, there are none. so the resourceProfiles.size() will be zero, still need to add a ResourceProfile here. I think adding one can match all other resource profile is a good idea, I will introduce it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102909687
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -245,7 +248,7 @@ public void onContainersAllocated(List<Container> containers) {
     			try {
     				/** Context information used to start a TaskExecutor Java process */
     				ContainerLaunchContext taskExecutorLaunchContext =
    -						createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost());
    +						createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost(), container.getPriority());
    --- End diff --
    
    Yes, this is a problem. But if we use the resource profile retrieved from container, it may be different from the original request, they can not be matched. And only when the resource requested is too small, yarn will return a container bigger   than requested to avoid resource fragmentation, or else it will return what we requested. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102748761
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -342,6 +346,14 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
     		FiniteDuration teRegistrationTimeout = new FiniteDuration(timeout, TimeUnit.SECONDS);
     		final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration(
     				flinkConfig, "", 0, 1, teRegistrationTimeout);
    +		// Add resource profile of slots to task executor config. 
    +		// For yarn, all slots in a task executor have same resource profile 
    +		ByteArrayOutputStream output = new ByteArrayOutputStream();
    +		ObjectOutputStream rpOutput = new ObjectOutputStream(output);
    +		rpOutput.writeObject(getResourceProfile(priority.getPriority()));
    +		rpOutput.close();
    +		taskManagerConfig.setString(ConfigConstants.TASK_MANAGER_RESOURCE_PROFILE_KEY,
    +				new String(Base64.encodeBase64(output.toByteArray())));
    --- End diff --
    
    I think we shouldn't base64 encode the resource profile into the `taskManagerConfig`. Instead use `InstantiationUtil.writeObjectToConfig` to write serialized data to the configuration. 
    
    But I'm a little bit torn apart here because so far we used to transfer this kind of information via the environment variables. Maybe @rmetzger can chime in to say what the most idiomatic way to transfer TM data would be.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] [FLIP-6] Slots should be st...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r104160029
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
    @@ -168,6 +168,11 @@
     	public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
     
     	/**
    +     * The config parameter defining the task manager resource profile.
    +	 */
    +	public static final String TASK_MANAGER_RESOURCE_PROFILE_KEY = "taskmanager.resourceProfile";
    --- End diff --
    
    Please put it into `TaskManagerServicesConfiguration`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102684306
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -376,4 +388,23 @@ private int generatePriority(ResourceProfile resourceProfile) {
     		}
     	}
     
    +	/**
    +	 * Get resource profile by given the container priority. It should always be got from the mapping
    +	 * between resource profile and priority, otherwise it will throw the exception.
    +	 *
    +	 * @param priority The container priority used for distinguishing different resources.
    +	 * @return The resource profile corresponding with the priority.
    +	 * @throws Exception
    +	 */
    +	private ResourceProfile getResourceProfile(int priority) throws Exception{
    +		if (resourcePriorities.containsValue(priority)) {
    +			for (Map.Entry<ResourceProfile, Integer> entry : resourcePriorities.entrySet()) {
    +				if (entry.getValue() == priority) {
    +					return entry.getKey();
    +				}
    +			}
    +		}
    +		throw new Exception("There is no related resource profile for the priority : " + priority);
    --- End diff --
    
    Throwing an exception here is a little bit heavy. Better to return `null` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] [FLIP-6] Slots should be st...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r104158582
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -245,7 +248,7 @@ public void onContainersAllocated(List<Container> containers) {
     			try {
     				/** Context information used to start a TaskExecutor Java process */
     				ContainerLaunchContext taskExecutorLaunchContext =
    -						createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost());
    +						createTaskExecutorLaunchContext(container.getResource(), container.getId().toString(), container.getNodeId().getHost(), container.getPriority());
    --- End diff --
    
    Yes, but this should not be a problem. I think the resource manager should be able to deal with this kind of scenario. For example, if the allocated container offers more resources than requested, it could in the future also be used to fulfill other slot request which require more resources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102744865
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
    @@ -168,6 +168,11 @@
     	public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
     
     	/**
    +     * The config parameter defining the task manager resource profile.
    +	 */
    +	public static final String TASK_MANAGER_RESOURCE_PROFILE_KEY = "taskmanager.resourceProfile";
    --- End diff --
    
    I'm not sure whether we should expose this configuration parameter. The user should never have to do something with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102679342
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -323,7 +326,8 @@ private void requestYarnContainer(Resource resource, Priority priority) {
     				numPendingContainerRequests);
     	}
     
    -	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
    +	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, 
    +			String containerId, String host, Priority priority)
    --- End diff --
    
    If breaking parameter lists, then every parameter should be in a separate line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102684171
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -376,4 +388,23 @@ private int generatePriority(ResourceProfile resourceProfile) {
     		}
     	}
     
    +	/**
    +	 * Get resource profile by given the container priority. It should always be got from the mapping
    +	 * between resource profile and priority, otherwise it will throw the exception.
    +	 *
    +	 * @param priority The container priority used for distinguishing different resources.
    +	 * @return The resource profile corresponding with the priority.
    +	 * @throws Exception
    +	 */
    +	private ResourceProfile getResourceProfile(int priority) throws Exception{
    +		if (resourcePriorities.containsValue(priority)) {
    +			for (Map.Entry<ResourceProfile, Integer> entry : resourcePriorities.entrySet()) {
    +				if (entry.getValue() == priority) {
    +					return entry.getKey();
    +				}
    +			}
    +		}
    --- End diff --
    
    I think that is not the right way to query a map. If you have to traverse the whole map to find a value, then there is something wrong with the way you arrange your data.
    
    Could you explain me what the `resourcePriorities` actually does? To me it looks as if you use this to assign some kind of id to a yarn container which you then use to find out the mapping between the allocated container and the resource profile. That does not seem right at all. Priorities should tell the system how important it is to allocate a container.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] [FLIP-6] Slots should be st...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r104158286
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -376,4 +388,23 @@ private int generatePriority(ResourceProfile resourceProfile) {
     		}
     	}
     
    +	/**
    +	 * Get resource profile by given the container priority. It should always be got from the mapping
    +	 * between resource profile and priority, otherwise it will throw the exception.
    +	 *
    +	 * @param priority The container priority used for distinguishing different resources.
    +	 * @return The resource profile corresponding with the priority.
    +	 * @throws Exception
    +	 */
    +	private ResourceProfile getResourceProfile(int priority) throws Exception{
    +		if (resourcePriorities.containsValue(priority)) {
    +			for (Map.Entry<ResourceProfile, Integer> entry : resourcePriorities.entrySet()) {
    +				if (entry.getValue() == priority) {
    +					return entry.getKey();
    +				}
    +			}
    +		}
    --- End diff --
    
    I think this should be changed. Why not allowing the `YarnResourceManger` to accept also containers whose resource are larger than stated in the request? It should be ok for a slot request to get a slot assigned whose resources exceed those of the request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102681685
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---
    @@ -198,9 +197,9 @@ public static TaskManagerServices fromConfiguration(
     
     		final FileCache fileCache = new FileCache(taskManagerServicesConfiguration.getTmpDirPaths());
     
    -		final List<ResourceProfile> resourceProfiles = new ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
    +		final List<ResourceProfile> resourceProfiles = taskManagerServicesConfiguration.getResourceProfiles();
     
    -		for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
    +		for (int i = resourceProfiles.size(); i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
     			resourceProfiles.add(new ResourceProfile(1.0, 42L));
    --- End diff --
    
    Why do you still add the default resource profiles if you already got them from the `taskManagerServicesConfiguration`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] [FLIP-6] Slots should be st...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r104161807
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ---
    @@ -119,6 +122,9 @@ public boolean equals(Object obj) {
     		}
     		else if (obj != null && obj.getClass() == ResourceProfile.class) {
     			ResourceProfile that = (ResourceProfile) obj;
    +			if (this == ResourceProfile.UNIVERSAL || that == ResourceProfile.UNIVERSAL) {
    +				return true;
    +			}
    --- End diff --
    
    The contract of the universal slot should not be reflected via the `equals` method of an `Object`. This is highly dangerous.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] [FLIP-6] Slots should be st...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r104158707
  
    --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
    @@ -168,6 +168,11 @@
     	public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
     
     	/**
    +     * The config parameter defining the task manager resource profile.
    +	 */
    +	public static final String TASK_MANAGER_RESOURCE_PROFILE_KEY = "taskmanager.resourceProfile";
    --- End diff --
    
    `ConfigConstants` is annotated being `Public`. Thus, all elements which don't have an annotation, will inherit that annotation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102682612
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---
    @@ -353,5 +376,16 @@ private static void checkConfigParameter(boolean condition, Object parameter, St
     					name + " : " + parameter + " - " + errorMessage);
     		}
     	}
    +
    +	private void addResourceProfile(ResourceProfile resourceProfile) {
    +		this.resourceProfiles.add(resourceProfile);
    +	}
    --- End diff --
    
    imo the configuration object should be immutable. That way one cannot introduce side effects after the object has been created.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3304: [FLINK-5791] [runtime] [FLIP-6] Slots should be strictly ...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on the issue:

    https://github.com/apache/flink/pull/3304
  
    The failed case pass in my local work copy, I think maybe it is due to the environment of travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3304: [FLINK-5791] [runtime] Resource should be strictly matche...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on the issue:

    https://github.com/apache/flink/pull/3304
  
    hi @tillrohrmann , thank for you careful review. I modify this pr according to your comments. Adding universal resource profile if no resource profile are passed to task executor. And for resource manger, the priority is now only used to distinguish containers of different resource, this is also how the priority is used in yarn..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by shuai-xu <gi...@git.apache.org>.
Github user shuai-xu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102907598
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
    @@ -376,4 +388,23 @@ private int generatePriority(ResourceProfile resourceProfile) {
     		}
     	}
     
    +	/**
    +	 * Get resource profile by given the container priority. It should always be got from the mapping
    +	 * between resource profile and priority, otherwise it will throw the exception.
    +	 *
    +	 * @param priority The container priority used for distinguishing different resources.
    +	 * @return The resource profile corresponding with the priority.
    +	 * @throws Exception
    +	 */
    +	private ResourceProfile getResourceProfile(int priority) throws Exception{
    +		if (resourcePriorities.containsValue(priority)) {
    +			for (Map.Entry<ResourceProfile, Integer> entry : resourcePriorities.entrySet()) {
    +				if (entry.getValue() == priority) {
    +					return entry.getKey();
    +				}
    +			}
    +		}
    --- End diff --
    
    Querying a map does seem a little trick, but the map is very small, for most flink job, it may only has several elements, as it matters only with the number of operators with different resources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3304: [FLINK-5791] [runtime] Resource should be strictly...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3304#discussion_r102684945
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---
    @@ -94,6 +102,8 @@ public TaskManagerServicesConfiguration(
     		checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
     			"service shutdown timeout must be greater or equal to 0.");
     		this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
    +
    +		this.resourceProfiles = new LinkedList<>();
    --- End diff --
    
    `LinkedLists` are slow. Please use an `ArrayList`. Even better to pass the value for `resourceProfiles` to the constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---