You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/01/24 19:04:15 UTC

[GitHub] [flink] azagrebin opened a new pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

azagrebin opened a new pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946
 
 
   ## What is the purpose of the change
   
   FLIP-49 calculates memory setup to start TM process. Atm, we reuse this logic to get necessary resource configuration options in running TM, although we do not need the full `TaskExecutorResourceSpec` which can be renamed to `TaskExecutorProcessSpec`.
   
   In case of local execution in mini cluster, the TM process is started outside of Flink framework and nothing is pre-calculated. It means that any configured process or Flink memory size can make no sense and can be ignored then to make things simpler and more explicit. Also the result of FLIP-49 calculation can contradict to the default values and other derived FLIP-49 memory components are not used by TM internally anyways. If some necessary options are not set, we can set them to reasonable defaults. 
   
   The configuration options required for running TM, which are expected to be calculated and set before its start, are the following (with defaults for local execution):
   
   - cpu cores (potentially for FLIP-56, default: `Double.MAX_VALUE`)
   - task heap memory (potentially for FLIP-56, default: `MemorySize.MAX_VALUE`)
   - task off-heap memory (potentially for FLIP-56, default: `MemorySize.MAX_VALUE`)
   - network memory (default: 64Mb)
   - managed memory (default: 128Mb)
   
   Additionally, we can refactor TM runner to not reuse current FLIP-49 computation but just check that the necessary options are set and create TaskExecutorResourceSpec which contains only them.
   
   ## Brief change log
   
     - Rename `TaskExecutorResourceSpec` to `TaskExecutorProcessSpec`
     - Introduce new `TaskExecutorResourceSpec` to use in running TM
     - Introduce new `TaskExecutorResourceUtils` to create new `TaskExecutorResourceSpec` from config
     - Set non-configured necessary options to defaults for local execution
     - Ignore other FLIP-49 options and warn if they are set for local execution
     - Adjust/Add unit tests
   
   ## Verifying this change
   
   unit tests
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578270885
 
 
   <!--
   Meta data
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145998908 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:53d83f31105ca3a2c89800c23ee2a2fb7885350d Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4625 TriggerType:PUSH TriggerID:53d83f31105ca3a2c89800c23ee2a2fb7885350d
   Hash:53d83f31105ca3a2c89800c23ee2a2fb7885350d Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146186647 TriggerType:PUSH TriggerID:53d83f31105ca3a2c89800c23ee2a2fb7885350d
   Hash:93cca213ab0be0c6f36ffa0dae176be7f0b790f1 Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146196734 TriggerType:PUSH TriggerID:93cca213ab0be0c6f36ffa0dae176be7f0b790f1
   Hash:93cca213ab0be0c6f36ffa0dae176be7f0b790f1 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4627 TriggerType:PUSH TriggerID:93cca213ab0be0c6f36ffa0dae176be7f0b790f1
   -->
   ## CI report:
   
   * 3df00757f4922238f9624bb9d92e1e4f2917a944 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145998908) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614) 
   * 53d83f31105ca3a2c89800c23ee2a2fb7885350d Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146186647) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4625) 
   * 93cca213ab0be0c6f36ffa0dae176be7f0b790f1 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146196734) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4627) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] azagrebin commented on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
azagrebin commented on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578696849
 
 
   Thanks for the review @tillrohrmann 
   Comments are addressed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#discussion_r370944239
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
 ##########
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utility class for {@link TaskExecutorResourceSpec} of running {@link TaskExecutor}.
+ */
+public class TaskExecutorResourceUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorResourceUtils.class);
+
+	static final List<ConfigOption<?>> CONFIG_OPTIONS = Arrays.asList(
+		TaskManagerOptions.CPU_CORES,
+		TaskManagerOptions.TASK_HEAP_MEMORY,
+		TaskManagerOptions.TASK_OFF_HEAP_MEMORY,
+		TaskManagerOptions.NETWORK_MEMORY_MIN,
+		TaskManagerOptions.NETWORK_MEMORY_MAX,
+		TaskManagerOptions.MANAGED_MEMORY_SIZE
+	);
+
+	private static final List<ConfigOption<?>> UNUSED_CONFIG_OPTIONS = Arrays.asList(
+		TaskManagerOptions.TOTAL_PROCESS_MEMORY,
+		TaskManagerOptions.TOTAL_FLINK_MEMORY,
+		TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
+		TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY,
+		TaskManagerOptions.JVM_METASPACE,
+		TaskManagerOptions.JVM_OVERHEAD_MIN,
+		TaskManagerOptions.JVM_OVERHEAD_MAX,
+		TaskManagerOptions.JVM_OVERHEAD_FRACTION
+	);
+
+	static final MemorySize DEFAULT_SHUFFLE_MEMORY_SIZE = MemorySize.parse("64m");
+	static final MemorySize DEFAULT_MANAGED_MEMORY_SIZE = MemorySize.parse("128m");
+
+	private TaskExecutorResourceUtils() {}
+
+	static TaskExecutorResourceSpec resourceSpecFromConfig(Configuration config) {
+		try {
+			checkTaskExecutorResourceConfigSet(config);
+		} catch (IllegalConfigurationException e) {
+			throw new IllegalConfigurationException("Failed to create TaskExecutorResourceSpec", e);
+		}
+		return new TaskExecutorResourceSpec(
+			new CPUResource(config.getDouble(TaskManagerOptions.CPU_CORES)),
+			config.get(TaskManagerOptions.TASK_HEAP_MEMORY),
+			config.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY),
+			config.get(TaskManagerOptions.NETWORK_MEMORY_MIN),
+			config.get(TaskManagerOptions.MANAGED_MEMORY_SIZE)
+		);
+	}
+
+	private static void checkTaskExecutorResourceConfigSet(Configuration config) {
+		CONFIG_OPTIONS.forEach(option -> checkConfigOptionIsSet(config, option));
+		checkTaskExecutorNetworkConfigSet(config);
+	}
+
+	private static void checkTaskExecutorNetworkConfigSet(ReadableConfig config) {
+		if (!config.get(TaskManagerOptions.NETWORK_MEMORY_MIN).equals(config.get(TaskManagerOptions.NETWORK_MEMORY_MAX))) {
+			throw new IllegalConfigurationException(
+				"The network memory min (%s) and max (%s) mismatch, " +
+					"the network memory has to be fixed after task executor has started",
+				config.get(TaskManagerOptions.NETWORK_MEMORY_MIN),
+				config.get(TaskManagerOptions.NETWORK_MEMORY_MAX));
+		}
+	}
 
 Review comment:
   Is this condition required? If min and max are not equal, then we should be able to take any value from the specified range (e.g. the minimum).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] azagrebin commented on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource config, set to default for local execution

Posted by GitBox <gi...@apache.org>.
azagrebin commented on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource config, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578787950
 
 
   Thanks a lot @tillrohrmann for the review and addressing the comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] azagrebin commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#discussion_r371176550
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceSpec.java
 ##########
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+
+/**
+ * Specification of resources used in running {@link org.apache.flink.runtime.taskexecutor.TaskExecutor}.
 
 Review comment:
   This class is specifically meant for the only resources which matter and are explicitly used in a running TM.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578259866
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 4c1ef854dc40df4200aa3bd897db39a42ea37e89 (Fri Jan 24 19:07:03 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578270885
 
 
   <!--
   Meta data
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/145998908 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   -->
   ## CI report:
   
   * 3df00757f4922238f9624bb9d92e1e4f2917a944 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/145998908) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578270885
 
 
   <!--
   Meta data
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   -->
   ## CI report:
   
   * 3df00757f4922238f9624bb9d92e1e4f2917a944 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578270885
 
 
   <!--
   Meta data
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145998908 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:53d83f31105ca3a2c89800c23ee2a2fb7885350d Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4625 TriggerType:PUSH TriggerID:53d83f31105ca3a2c89800c23ee2a2fb7885350d
   Hash:53d83f31105ca3a2c89800c23ee2a2fb7885350d Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146186647 TriggerType:PUSH TriggerID:53d83f31105ca3a2c89800c23ee2a2fb7885350d
   Hash:93cca213ab0be0c6f36ffa0dae176be7f0b790f1 Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:93cca213ab0be0c6f36ffa0dae176be7f0b790f1
   -->
   ## CI report:
   
   * 3df00757f4922238f9624bb9d92e1e4f2917a944 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145998908) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614) 
   * 53d83f31105ca3a2c89800c23ee2a2fb7885350d Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146186647) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4625) 
   * 93cca213ab0be0c6f36ffa0dae176be7f0b790f1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#discussion_r370944521
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtilsTest.java
 ##########
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+
+/** Test suite for {@link TaskExecutorResourceUtils}. */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+	@Test
+	public void testResourceSpecFromConfig() {
+		double cpuCores = 1.0;
+		MemorySize taskHeap = MemorySize.ofMebiBytes(1);
+		MemorySize taskOffHeap = MemorySize.ofMebiBytes(2);
+		MemorySize network = MemorySize.ofMebiBytes(3);
+		MemorySize managed = MemorySize.ofMebiBytes(4);
+
+		Configuration configuration = new Configuration();
+		configuration.set(TaskManagerOptions.CPU_CORES, cpuCores);
+		configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, taskHeap);
+		configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, taskOffHeap);
+		configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, network);
+		configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, network);
+		configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, managed);
+
+		TaskExecutorResourceSpec resourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
+
+		assertThat(resourceSpec.getCpuCores(), is(new CPUResource(cpuCores)));
+		assertThat(resourceSpec.getTaskHeapSize(), is(taskHeap));
+		assertThat(resourceSpec.getTaskOffHeapSize(), is(taskOffHeap));
+		assertThat(resourceSpec.getNetworkMemSize(), is(network));
+		assertThat(resourceSpec.getManagedMemorySize(), is(managed));
+	}
+
+	@Test
+	public void testResourceSpecFromConfigFailsIfRequiredOptionIsNotSet() {
+		TaskExecutorResourceUtils.CONFIG_OPTIONS.forEach(option -> {
+			try {
+				TaskExecutorResourceUtils.resourceSpecFromConfig(setAllRequiredOptionsExceptOne(option));
+				fail("should fail with " + IllegalConfigurationException.class.getSimpleName());
+			} catch (IllegalConfigurationException e) {
+				// expected
+			}
+		});
+	}
+
+	private static Configuration setAllRequiredOptionsExceptOne(ConfigOption<?> optionToNotSet) {
+		Configuration configuration = new Configuration();
+		if (!TaskManagerOptions.CPU_CORES.equals(optionToNotSet)) {
+			configuration.set(TaskManagerOptions.CPU_CORES, 1.0);
+		}
+
+		//noinspection unchecked
+		TaskExecutorResourceUtils.CONFIG_OPTIONS
+			.stream()
+			.filter(option -> option.equals(TaskManagerOptions.CPU_CORES))
+			.filter(option -> option.equals(optionToNotSet))
 
 Review comment:
   same here. The negation is missing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#discussion_r370944515
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtilsTest.java
 ##########
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+
+/** Test suite for {@link TaskExecutorResourceUtils}. */
+public class TaskExecutorResourceUtilsTest extends TestLogger {
+	@Test
+	public void testResourceSpecFromConfig() {
+		double cpuCores = 1.0;
+		MemorySize taskHeap = MemorySize.ofMebiBytes(1);
+		MemorySize taskOffHeap = MemorySize.ofMebiBytes(2);
+		MemorySize network = MemorySize.ofMebiBytes(3);
+		MemorySize managed = MemorySize.ofMebiBytes(4);
+
+		Configuration configuration = new Configuration();
+		configuration.set(TaskManagerOptions.CPU_CORES, cpuCores);
+		configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, taskHeap);
+		configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, taskOffHeap);
+		configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, network);
+		configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, network);
+		configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, managed);
+
+		TaskExecutorResourceSpec resourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
+
+		assertThat(resourceSpec.getCpuCores(), is(new CPUResource(cpuCores)));
+		assertThat(resourceSpec.getTaskHeapSize(), is(taskHeap));
+		assertThat(resourceSpec.getTaskOffHeapSize(), is(taskOffHeap));
+		assertThat(resourceSpec.getNetworkMemSize(), is(network));
+		assertThat(resourceSpec.getManagedMemorySize(), is(managed));
+	}
+
+	@Test
+	public void testResourceSpecFromConfigFailsIfRequiredOptionIsNotSet() {
+		TaskExecutorResourceUtils.CONFIG_OPTIONS.forEach(option -> {
+			try {
+				TaskExecutorResourceUtils.resourceSpecFromConfig(setAllRequiredOptionsExceptOne(option));
+				fail("should fail with " + IllegalConfigurationException.class.getSimpleName());
+			} catch (IllegalConfigurationException e) {
+				// expected
+			}
+		});
+	}
+
+	private static Configuration setAllRequiredOptionsExceptOne(ConfigOption<?> optionToNotSet) {
+		Configuration configuration = new Configuration();
+		if (!TaskManagerOptions.CPU_CORES.equals(optionToNotSet)) {
+			configuration.set(TaskManagerOptions.CPU_CORES, 1.0);
+		}
+
+		//noinspection unchecked
+		TaskExecutorResourceUtils.CONFIG_OPTIONS
+			.stream()
+			.filter(option -> option.equals(TaskManagerOptions.CPU_CORES))
 
 Review comment:
   the negation is missing here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578270885
 
 
   <!--
   Meta data
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145998908 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   -->
   ## CI report:
   
   * 3df00757f4922238f9624bb9d92e1e4f2917a944 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145998908) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource config, set to default for local execution

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource config, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#discussion_r371257686
 
 

 ##########
 File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtilsTest.java
 ##########
 @@ -78,8 +78,9 @@ private static Configuration setAllRequiredOptionsExceptOne(ConfigOption<?> opti
 		//noinspection unchecked
 		TaskExecutorResourceUtils.CONFIG_OPTIONS
 			.stream()
-			.filter(option -> option.equals(TaskManagerOptions.CPU_CORES))
-			.filter(option -> option.equals(optionToNotSet))
+			.filter(option -> !option.equals(TaskManagerOptions.CPU_CORES))
+			.filter(ConfigOption::hasDefaultValue)
 
 Review comment:
   I guess this should go to the call site where we filter out `CONFIG_OPTIONS` which have a default value.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#discussion_r370944071
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceSpec.java
 ##########
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+
+/**
+ * Specification of resources used in running {@link org.apache.flink.runtime.taskexecutor.TaskExecutor}.
 
 Review comment:
   ```suggestion
    * Specification of resources for {@link org.apache.flink.runtime.taskexecutor.TaskExecutor}.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] azagrebin commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#discussion_r371169251
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
 ##########
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utility class for {@link TaskExecutorResourceSpec} of running {@link TaskExecutor}.
+ */
+public class TaskExecutorResourceUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorResourceUtils.class);
+
+	static final List<ConfigOption<?>> CONFIG_OPTIONS = Arrays.asList(
+		TaskManagerOptions.CPU_CORES,
+		TaskManagerOptions.TASK_HEAP_MEMORY,
+		TaskManagerOptions.TASK_OFF_HEAP_MEMORY,
+		TaskManagerOptions.NETWORK_MEMORY_MIN,
+		TaskManagerOptions.NETWORK_MEMORY_MAX,
+		TaskManagerOptions.MANAGED_MEMORY_SIZE
+	);
+
+	private static final List<ConfigOption<?>> UNUSED_CONFIG_OPTIONS = Arrays.asList(
+		TaskManagerOptions.TOTAL_PROCESS_MEMORY,
+		TaskManagerOptions.TOTAL_FLINK_MEMORY,
+		TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
+		TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY,
+		TaskManagerOptions.JVM_METASPACE,
+		TaskManagerOptions.JVM_OVERHEAD_MIN,
+		TaskManagerOptions.JVM_OVERHEAD_MAX,
+		TaskManagerOptions.JVM_OVERHEAD_FRACTION
+	);
+
+	static final MemorySize DEFAULT_SHUFFLE_MEMORY_SIZE = MemorySize.parse("64m");
+	static final MemorySize DEFAULT_MANAGED_MEMORY_SIZE = MemorySize.parse("128m");
+
+	private TaskExecutorResourceUtils() {}
+
+	static TaskExecutorResourceSpec resourceSpecFromConfig(Configuration config) {
+		try {
+			checkTaskExecutorResourceConfigSet(config);
+		} catch (IllegalConfigurationException e) {
+			throw new IllegalConfigurationException("Failed to create TaskExecutorResourceSpec", e);
+		}
+		return new TaskExecutorResourceSpec(
+			new CPUResource(config.getDouble(TaskManagerOptions.CPU_CORES)),
+			config.get(TaskManagerOptions.TASK_HEAP_MEMORY),
+			config.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY),
+			config.get(TaskManagerOptions.NETWORK_MEMORY_MIN),
+			config.get(TaskManagerOptions.MANAGED_MEMORY_SIZE)
+		);
+	}
+
+	private static void checkTaskExecutorResourceConfigSet(Configuration config) {
+		CONFIG_OPTIONS.forEach(option -> checkConfigOptionIsSet(config, option));
+		checkTaskExecutorNetworkConfigSet(config);
+	}
+
+	private static void checkTaskExecutorNetworkConfigSet(ReadableConfig config) {
+		if (!config.get(TaskManagerOptions.NETWORK_MEMORY_MIN).equals(config.get(TaskManagerOptions.NETWORK_MEMORY_MAX))) {
+			throw new IllegalConfigurationException(
+				"The network memory min (%s) and max (%s) mismatch, " +
+					"the network memory has to be fixed after task executor has started",
+				config.get(TaskManagerOptions.NETWORK_MEMORY_MIN),
+				config.get(TaskManagerOptions.NETWORK_MEMORY_MAX));
+		}
+	}
 
 Review comment:
   As discussed offline:
   If min is not equal to max, it means that the network memory is not resolved to a fixed value before starting TM and we want to avoid this.
   
   Now we use FLIP-49 and some default values for local execution to prepare TM process for running. Being too strict enforces no magic happening in TM and signals that something is not prepared before starting TM. This can be annoying in some tests which create TM runner w/o mini cluster but with an empty configuration. At the moment, we do not have many of those tests, later we can think about some better refactoring to provide a default `TaskExecutorResourceSpec`.
   
   Another idea to investigate is to add an explicit (maybe only internal) [network memory size](https://issues.apache.org/jira/browse/FLINK-15774) option as a result of TM config resolution and expect it in TM instead of confusing min=max.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578270885
 
 
   <!--
   Meta data
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145998908 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:53d83f31105ca3a2c89800c23ee2a2fb7885350d Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4625 TriggerType:PUSH TriggerID:53d83f31105ca3a2c89800c23ee2a2fb7885350d
   Hash:53d83f31105ca3a2c89800c23ee2a2fb7885350d Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/146186647 TriggerType:PUSH TriggerID:53d83f31105ca3a2c89800c23ee2a2fb7885350d
   -->
   ## CI report:
   
   * 3df00757f4922238f9624bb9d92e1e4f2917a944 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145998908) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614) 
   * 53d83f31105ca3a2c89800c23ee2a2fb7885350d Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/146186647) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4625) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann closed pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource config, set to default for local execution

Posted by GitBox <gi...@apache.org>.
tillrohrmann closed pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource config, set to default for local execution
URL: https://github.com/apache/flink/pull/10946
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
tillrohrmann commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#discussion_r370944739
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
 ##########
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utility class for {@link TaskExecutorResourceSpec} of running {@link TaskExecutor}.
+ */
+public class TaskExecutorResourceUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorResourceUtils.class);
+
+	static final List<ConfigOption<?>> CONFIG_OPTIONS = Arrays.asList(
+		TaskManagerOptions.CPU_CORES,
+		TaskManagerOptions.TASK_HEAP_MEMORY,
+		TaskManagerOptions.TASK_OFF_HEAP_MEMORY,
+		TaskManagerOptions.NETWORK_MEMORY_MIN,
+		TaskManagerOptions.NETWORK_MEMORY_MAX,
+		TaskManagerOptions.MANAGED_MEMORY_SIZE
+	);
+
+	private static final List<ConfigOption<?>> UNUSED_CONFIG_OPTIONS = Arrays.asList(
+		TaskManagerOptions.TOTAL_PROCESS_MEMORY,
+		TaskManagerOptions.TOTAL_FLINK_MEMORY,
+		TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
+		TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY,
+		TaskManagerOptions.JVM_METASPACE,
+		TaskManagerOptions.JVM_OVERHEAD_MIN,
+		TaskManagerOptions.JVM_OVERHEAD_MAX,
+		TaskManagerOptions.JVM_OVERHEAD_FRACTION
+	);
+
+	static final MemorySize DEFAULT_SHUFFLE_MEMORY_SIZE = MemorySize.parse("64m");
+	static final MemorySize DEFAULT_MANAGED_MEMORY_SIZE = MemorySize.parse("128m");
+
+	private TaskExecutorResourceUtils() {}
+
+	static TaskExecutorResourceSpec resourceSpecFromConfig(Configuration config) {
+		try {
+			checkTaskExecutorResourceConfigSet(config);
+		} catch (IllegalConfigurationException e) {
+			throw new IllegalConfigurationException("Failed to create TaskExecutorResourceSpec", e);
+		}
+		return new TaskExecutorResourceSpec(
+			new CPUResource(config.getDouble(TaskManagerOptions.CPU_CORES)),
+			config.get(TaskManagerOptions.TASK_HEAP_MEMORY),
+			config.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY),
+			config.get(TaskManagerOptions.NETWORK_MEMORY_MIN),
+			config.get(TaskManagerOptions.MANAGED_MEMORY_SIZE)
+		);
+	}
+
+	private static void checkTaskExecutorResourceConfigSet(Configuration config) {
+		CONFIG_OPTIONS.forEach(option -> checkConfigOptionIsSet(config, option));
+		checkTaskExecutorNetworkConfigSet(config);
+	}
+
+	private static void checkTaskExecutorNetworkConfigSet(ReadableConfig config) {
+		if (!config.get(TaskManagerOptions.NETWORK_MEMORY_MIN).equals(config.get(TaskManagerOptions.NETWORK_MEMORY_MAX))) {
+			throw new IllegalConfigurationException(
+				"The network memory min (%s) and max (%s) mismatch, " +
+					"the network memory has to be fixed after task executor has started",
+				config.get(TaskManagerOptions.NETWORK_MEMORY_MIN),
+				config.get(TaskManagerOptions.NETWORK_MEMORY_MAX));
+		}
+	}
+
+	private static void checkConfigOptionIsSet(Configuration config, ConfigOption<?> option) {
+		if (!config.contains(option)) {
 
 Review comment:
   Why aren't default values allowed here? I think this check is too strict in the sense that we must always set the values although we might simply take the default values of some `ConfigOptions`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578270885
 
 
   <!--
   Meta data
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145998908 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:53d83f31105ca3a2c89800c23ee2a2fb7885350d Status:SUCCESS URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4625 TriggerType:PUSH TriggerID:53d83f31105ca3a2c89800c23ee2a2fb7885350d
   Hash:53d83f31105ca3a2c89800c23ee2a2fb7885350d Status:SUCCESS URL:https://travis-ci.com/flink-ci/flink/builds/146186647 TriggerType:PUSH TriggerID:53d83f31105ca3a2c89800c23ee2a2fb7885350d
   Hash:93cca213ab0be0c6f36ffa0dae176be7f0b790f1 Status:PENDING URL:https://travis-ci.com/flink-ci/flink/builds/146196734 TriggerType:PUSH TriggerID:93cca213ab0be0c6f36ffa0dae176be7f0b790f1
   Hash:93cca213ab0be0c6f36ffa0dae176be7f0b790f1 Status:PENDING URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4627 TriggerType:PUSH TriggerID:93cca213ab0be0c6f36ffa0dae176be7f0b790f1
   -->
   ## CI report:
   
   * 3df00757f4922238f9624bb9d92e1e4f2917a944 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145998908) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614) 
   * 53d83f31105ca3a2c89800c23ee2a2fb7885350d Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/146186647) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4625) 
   * 93cca213ab0be0c6f36ffa0dae176be7f0b790f1 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/146196734) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4627) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] azagrebin commented on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
azagrebin commented on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578259001
 
 
   cc @tillrohrmann @StephanEwen 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] azagrebin commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
azagrebin commented on a change in pull request #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#discussion_r371169251
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorResourceUtils.java
 ##########
 @@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Utility class for {@link TaskExecutorResourceSpec} of running {@link TaskExecutor}.
+ */
+public class TaskExecutorResourceUtils {
+	private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorResourceUtils.class);
+
+	static final List<ConfigOption<?>> CONFIG_OPTIONS = Arrays.asList(
+		TaskManagerOptions.CPU_CORES,
+		TaskManagerOptions.TASK_HEAP_MEMORY,
+		TaskManagerOptions.TASK_OFF_HEAP_MEMORY,
+		TaskManagerOptions.NETWORK_MEMORY_MIN,
+		TaskManagerOptions.NETWORK_MEMORY_MAX,
+		TaskManagerOptions.MANAGED_MEMORY_SIZE
+	);
+
+	private static final List<ConfigOption<?>> UNUSED_CONFIG_OPTIONS = Arrays.asList(
+		TaskManagerOptions.TOTAL_PROCESS_MEMORY,
+		TaskManagerOptions.TOTAL_FLINK_MEMORY,
+		TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
+		TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY,
+		TaskManagerOptions.JVM_METASPACE,
+		TaskManagerOptions.JVM_OVERHEAD_MIN,
+		TaskManagerOptions.JVM_OVERHEAD_MAX,
+		TaskManagerOptions.JVM_OVERHEAD_FRACTION
+	);
+
+	static final MemorySize DEFAULT_SHUFFLE_MEMORY_SIZE = MemorySize.parse("64m");
+	static final MemorySize DEFAULT_MANAGED_MEMORY_SIZE = MemorySize.parse("128m");
+
+	private TaskExecutorResourceUtils() {}
+
+	static TaskExecutorResourceSpec resourceSpecFromConfig(Configuration config) {
+		try {
+			checkTaskExecutorResourceConfigSet(config);
+		} catch (IllegalConfigurationException e) {
+			throw new IllegalConfigurationException("Failed to create TaskExecutorResourceSpec", e);
+		}
+		return new TaskExecutorResourceSpec(
+			new CPUResource(config.getDouble(TaskManagerOptions.CPU_CORES)),
+			config.get(TaskManagerOptions.TASK_HEAP_MEMORY),
+			config.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY),
+			config.get(TaskManagerOptions.NETWORK_MEMORY_MIN),
+			config.get(TaskManagerOptions.MANAGED_MEMORY_SIZE)
+		);
+	}
+
+	private static void checkTaskExecutorResourceConfigSet(Configuration config) {
+		CONFIG_OPTIONS.forEach(option -> checkConfigOptionIsSet(config, option));
+		checkTaskExecutorNetworkConfigSet(config);
+	}
+
+	private static void checkTaskExecutorNetworkConfigSet(ReadableConfig config) {
+		if (!config.get(TaskManagerOptions.NETWORK_MEMORY_MIN).equals(config.get(TaskManagerOptions.NETWORK_MEMORY_MAX))) {
+			throw new IllegalConfigurationException(
+				"The network memory min (%s) and max (%s) mismatch, " +
+					"the network memory has to be fixed after task executor has started",
+				config.get(TaskManagerOptions.NETWORK_MEMORY_MIN),
+				config.get(TaskManagerOptions.NETWORK_MEMORY_MAX));
+		}
+	}
 
 Review comment:
   As discussed offline:
   If min is not equal to max, it means that the network memory is not resolved to a fixed value before starting TM and we want to avoid this.
   
   Now we use FLIP-49 and some default values for local execution to prepare TM process for running. Being too strict enforces no magic happening in TM and signals that something is not prepared before starting TM. This can be annoying in some tests which create TM runner w/o mini cluster but with an empty configuration. At the moment, we do not have many of those tests, later we can think about some better refactoring to provide a default `TaskExecutorResourceSpec`.
   
   Another idea to investigate is to add an explicit (maybe only internal) network memory size option as a result of TM config resolution and expect it in TM instead of confusing min=max.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #10946: [FLINK-15763][Runtime] Running TM checks only necessary resource cofig, set to default for local execution
URL: https://github.com/apache/flink/pull/10946#issuecomment-578270885
 
 
   <!--
   Meta data
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://travis-ci.com/flink-ci/flink/builds/145998908 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:3df00757f4922238f9624bb9d92e1e4f2917a944 Status:FAILURE URL:https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614 TriggerType:PUSH TriggerID:3df00757f4922238f9624bb9d92e1e4f2917a944
   Hash:53d83f31105ca3a2c89800c23ee2a2fb7885350d Status:UNKNOWN URL:TBD TriggerType:PUSH TriggerID:53d83f31105ca3a2c89800c23ee2a2fb7885350d
   -->
   ## CI report:
   
   * 3df00757f4922238f9624bb9d92e1e4f2917a944 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/145998908) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=4614) 
   * 53d83f31105ca3a2c89800c23ee2a2fb7885350d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services