You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/07/26 23:24:03 UTC
flink git commit: [FLINK-7113] Make ClusterDescriptor independent of
cluster size
Repository: flink
Updated Branches:
refs/heads/master 5e19a0da6 -> 7cf997d16
[FLINK-7113] Make ClusterDescriptor independent of cluster size
The deploySession method now is given a ClusterSpecification which specifies the
size of the cluster which it is supposed to deploy.
Remove 2 line breaks, unnecessary parameters for YarnTestBase#Runner, add builder for ClusterSpecification
This closes #4271.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cf997d1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cf997d1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cf997d1
Branch: refs/heads/master
Commit: 7cf997d16bbf8c7a6bbb3522aeba0d9e7c9af6b6
Parents: 5e19a0d
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 6 14:00:21 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 26 18:55:10 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/client/CliFrontend.java | 56 ++--
.../org/apache/flink/client/cli/DefaultCLI.java | 5 +-
.../client/deployment/ClusterDescriptor.java | 3 +-
.../client/deployment/ClusterSpecification.java | 119 +++++++
.../deployment/StandaloneClusterDescriptor.java | 2 +-
...CliFrontendYarnAddressConfigurationTest.java | 5 +-
.../flink/yarn/FlinkYarnSessionCliTest.java | 39 ++-
.../flink/yarn/YARNHighAvailabilityITCase.java | 13 +-
.../flink/yarn/YARNSessionFIFOITCase.java | 13 +-
.../org/apache/flink/yarn/YarnTestBase.java | 8 +-
.../yarn/AbstractYarnClusterDescriptor.java | 318 ++++++++++---------
.../apache/flink/yarn/YarnClusterClient.java | 19 +-
.../apache/flink/yarn/YarnClusterClientV2.java | 25 +-
.../org/apache/flink/yarn/cli/FlinkYarnCLI.java | 10 +-
.../flink/yarn/cli/FlinkYarnSessionCli.java | 94 +++---
.../flink/yarn/YarnClusterDescriptorTest.java | 108 +++++--
16 files changed, 540 insertions(+), 297 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index f89b016..4c7f6a8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -90,7 +90,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -126,16 +125,27 @@ public class CliFrontend {
// --------------------------------------------------------------------------------------------
- private static final List<CustomCommandLine> customCommandLine = new LinkedList<>();
+ private static final List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(3);
static {
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
// Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
// active CustomCommandLine in order and DefaultCLI isActive always return true.
- loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn");
- loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnCLI", "y", "yarn");
- customCommandLine.add(new DefaultCLI());
+ final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
+ final String flinkYarnCLI = "org.apache.flink.yarn.cli.FlinkYarnCLI";
+ try {
+ customCommandLines.add(loadCustomCommandLine(flinkYarnSessionCLI, "y", "yarn"));
+ } catch (Exception e) {
+ LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
+ }
+
+ try {
+ customCommandLines.add(loadCustomCommandLine(flinkYarnCLI, "y", "yarn"));
+ } catch (Exception e) {
+ LOG.warn("Could not load CLI class {}.", flinkYarnCLI, e);
+ }
+ customCommandLines.add(new DefaultCLI());
}
// --------------------------------------------------------------------------------------------
@@ -1172,7 +1182,7 @@ public class CliFrontend {
* @return custom command-line which is active (may only be one at a time)
*/
public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
- for (CustomCommandLine cli : customCommandLine) {
+ for (CustomCommandLine cli : customCommandLines) {
if (cli.isActive(commandLine, config)) {
return cli;
}
@@ -1184,8 +1194,8 @@ public class CliFrontend {
* Retrieves the loaded custom command-lines.
* @return An unmodifiyable list of loaded custom command-lines.
*/
- public static List<CustomCommandLine> getCustomCommandLineList() {
- return Collections.unmodifiableList(customCommandLine);
+ public static List<CustomCommandLine<?>> getCustomCommandLineList() {
+ return Collections.unmodifiableList(customCommandLines);
}
/**
@@ -1193,29 +1203,21 @@ public class CliFrontend {
* @param className The fully-qualified class name to load.
* @param params The constructor parameters
*/
- private static void loadCustomCommandLine(String className, Object... params) {
+ private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
- try {
- Class<? extends CustomCommandLine> customCliClass =
- Class.forName(className).asSubclass(CustomCommandLine.class);
-
- // construct class types from the parameters
- Class<?>[] types = new Class<?>[params.length];
- for (int i = 0; i < params.length; i++) {
- Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
- types[i] = params[i].getClass();
- }
+ Class<? extends CustomCommandLine> customCliClass =
+ Class.forName(className).asSubclass(CustomCommandLine.class);
- Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
- final CustomCommandLine cli = constructor.newInstance(params);
+ // construct class types from the parameters
+ Class<?>[] types = new Class<?>[params.length];
+ for (int i = 0; i < params.length; i++) {
+ Preconditions.checkNotNull(params[i], "Parameters for custom command-lines may not be null.");
+ types[i] = params[i].getClass();
+ }
- customCommandLine.add(cli);
+ Constructor<? extends CustomCommandLine> constructor = customCliClass.getConstructor(types);
- } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException
- | InvocationTargetException e) {
- LOG.warn("Unable to locate custom CLI class {}. " +
- "Flink is not compiled with support for this class.", className, e);
- }
+ return constructor.newInstance(params);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 042a44a..ec55c91 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -19,6 +19,7 @@
package org.apache.flink.client.cli;
import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
@@ -83,6 +84,8 @@ public class DefaultCLI implements CustomCommandLine<StandaloneClusterClient> {
List<URL> userJarFiles) throws UnsupportedOperationException {
StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor(config);
- return descriptor.deploySessionCluster();
+ ClusterSpecification clusterSpecification = ClusterSpecification.fromConfiguration(config);
+
+ return descriptor.deploySessionCluster(clusterSpecification);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index e2ffafa..ba83607 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -42,10 +42,11 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> {
/**
* Triggers deployment of a cluster.
+ * @param clusterSpecification Cluster specification defining the cluster to deploy
* @return Client for the cluster
* @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
*/
- ClientType deploySessionCluster() throws UnsupportedOperationException;
+ ClientType deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException;
/**
* Deploys a per-job cluster with the given job on the cluster.
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
new file mode 100644
index 0000000..8650cab
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterSpecification.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+/**
+ * Description of the cluster to start by the {@link ClusterDescriptor}.
+ */
+public final class ClusterSpecification {
+ private final int masterMemoryMB;
+ private final int taskManagerMemoryMB;
+ private final int numberTaskManagers;
+ private final int slotsPerTaskManager;
+
+ private ClusterSpecification(int masterMemoryMB, int taskManagerMemoryMB, int numberTaskManagers, int slotsPerTaskManager) {
+ this.masterMemoryMB = masterMemoryMB;
+ this.taskManagerMemoryMB = taskManagerMemoryMB;
+ this.numberTaskManagers = numberTaskManagers;
+ this.slotsPerTaskManager = slotsPerTaskManager;
+ }
+
+ public int getMasterMemoryMB() {
+ return masterMemoryMB;
+ }
+
+ public int getTaskManagerMemoryMB() {
+ return taskManagerMemoryMB;
+ }
+
+ public int getNumberTaskManagers() {
+ return numberTaskManagers;
+ }
+
+ public int getSlotsPerTaskManager() {
+ return slotsPerTaskManager;
+ }
+
+ @Override
+ public String toString() {
+ return "ClusterSpecification{" +
+ "masterMemoryMB=" + masterMemoryMB +
+ ", taskManagerMemoryMB=" + taskManagerMemoryMB +
+ ", numberTaskManagers=" + numberTaskManagers +
+ ", slotsPerTaskManager=" + slotsPerTaskManager +
+ '}';
+ }
+
+ public static ClusterSpecification fromConfiguration(Configuration configuration) {
+ int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+
+ int jobManagerMemoryMb = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+ int taskManagerMemoryMb = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+
+ return new ClusterSpecificationBuilder()
+ .setMasterMemoryMB(jobManagerMemoryMb)
+ .setTaskManagerMemoryMB(taskManagerMemoryMb)
+ .setNumberTaskManagers(1)
+ .setSlotsPerTaskManager(slots)
+ .createClusterSpecification();
+ }
+
+ /**
+ * Builder for the {@link ClusterSpecification} instance.
+ */
+ public static class ClusterSpecificationBuilder {
+ private int masterMemoryMB = 768;
+ private int taskManagerMemoryMB = 768;
+ private int numberTaskManagers = 1;
+ private int slotsPerTaskManager = 1;
+
+ public ClusterSpecificationBuilder setMasterMemoryMB(int masterMemoryMB) {
+ this.masterMemoryMB = masterMemoryMB;
+ return this;
+ }
+
+ public ClusterSpecificationBuilder setTaskManagerMemoryMB(int taskManagerMemoryMB) {
+ this.taskManagerMemoryMB = taskManagerMemoryMB;
+ return this;
+ }
+
+ public ClusterSpecificationBuilder setNumberTaskManagers(int numberTaskManagers) {
+ this.numberTaskManagers = numberTaskManagers;
+ return this;
+ }
+
+ public ClusterSpecificationBuilder setSlotsPerTaskManager(int slotsPerTaskManager) {
+ this.slotsPerTaskManager = slotsPerTaskManager;
+ return this;
+ }
+
+ public ClusterSpecification createClusterSpecification() {
+ return new ClusterSpecification(
+ masterMemoryMB,
+ taskManagerMemoryMB,
+ numberTaskManagers,
+ slotsPerTaskManager);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index ebdd0a8..435692f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -51,7 +51,7 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
}
@Override
- public StandaloneClusterClient deploySessionCluster() throws UnsupportedOperationException {
+ public StandaloneClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index d59e5b4..6811511 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.commons.cli.CommandLine;
@@ -66,7 +67,7 @@ import static org.junit.Assert.assertEquals;
* Tests that verify that the CLI client picks up the correct address for the JobManager
* from configuration and configs.
*/
-public class CliFrontendYarnAddressConfigurationTest {
+public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -378,6 +379,8 @@ public class CliFrontendYarnAddressConfigurationTest {
@Override
protected YarnClusterClient createYarnClusterClient(
AbstractYarnClusterDescriptor descriptor,
+ int numberTaskManagers,
+ int slotsPerTaskManager,
YarnClient yarnClient,
ApplicationReport report,
Configuration flinkConfiguration,
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index a2497a1..f6c8bbb 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -21,10 +21,11 @@ package org.apache.flink.yarn;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.RunOptions;
+import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.commons.cli.CommandLine;
@@ -48,7 +49,7 @@ import java.util.Map;
/**
* Tests for the FlinkYarnSessionCli.
*/
-public class FlinkYarnSessionCliTest {
+public class FlinkYarnSessionCliTest extends TestLogger {
@Rule
public TemporaryFolder tmp = new TemporaryFolder();
@@ -57,12 +58,11 @@ public class FlinkYarnSessionCliTest {
public void testDynamicProperties() throws Exception {
Map<String, String> map = new HashMap<String, String>(System.getenv());
- File tmpFolder = tmp.newFolder();
- File fakeConf = new File(tmpFolder, "flink-conf.yaml");
- fakeConf.createNewFile();
- map.put(ConfigConstants.ENV_FLINK_CONF_DIR, tmpFolder.getAbsolutePath());
TestBaseUtils.setEnv(map);
- FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false);
+ FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
+ "",
+ "",
+ false);
Options options = new Options();
cli.addGeneralOptions(options);
cli.addRunOptions(options);
@@ -96,11 +96,11 @@ public class FlinkYarnSessionCliTest {
FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
- AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());
+ ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), runOptions.getCommandLine());
// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
- Assert.assertEquals(4, descriptor.getTaskManagerSlots());
- Assert.assertEquals(2, descriptor.getTaskManagerCount());
+ Assert.assertEquals(4, clusterSpecification.getSlotsPerTaskManager());
+ Assert.assertEquals(2, clusterSpecification.getNumberTaskManagers());
}
@Test
@@ -118,14 +118,19 @@ public class FlinkYarnSessionCliTest {
FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());
+ final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), runOptions.getCommandLine());
// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
- Assert.assertEquals(3, descriptor.getTaskManagerSlots());
- Assert.assertEquals(2, descriptor.getTaskManagerCount());
+ Assert.assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
+ Assert.assertEquals(2, clusterSpecification.getNumberTaskManagers());
Configuration config = new Configuration();
CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("localhost", 9000));
- ClusterClient client = new TestingYarnClusterClient(descriptor, config);
+ ClusterClient client = new TestingYarnClusterClient(
+ descriptor,
+ clusterSpecification.getNumberTaskManagers(),
+ clusterSpecification.getSlotsPerTaskManager(),
+ config);
Assert.assertEquals(6, client.getMaxSlots());
}
@@ -170,8 +175,14 @@ public class FlinkYarnSessionCliTest {
private static class TestingYarnClusterClient extends YarnClusterClient {
- public TestingYarnClusterClient(AbstractYarnClusterDescriptor descriptor, Configuration config) throws Exception {
+ public TestingYarnClusterClient(
+ AbstractYarnClusterDescriptor descriptor,
+ int numberTaskManagers,
+ int slotsPerTaskManager,
+ Configuration config) throws Exception {
super(descriptor,
+ numberTaskManagers,
+ slotsPerTaskManager,
Mockito.mock(YarnClient.class),
Mockito.mock(ApplicationReport.class),
config,
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index df8133d..308cdc1 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
@@ -107,9 +108,6 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor();
Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
- flinkYarnClient.setTaskManagerCount(1);
- flinkYarnClient.setJobManagerMemory(768);
- flinkYarnClient.setTaskManagerMemory(1024);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
@@ -136,8 +134,15 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
HighAvailabilityServices highAvailabilityServices = null;
+ final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(768)
+ .setTaskManagerMemoryMB(1024)
+ .setNumberTaskManagers(1)
+ .setSlotsPerTaskManager(1)
+ .createClusterSpecification();
+
try {
- yarnCluster = flinkYarnClient.deploySessionCluster();
+ yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
final ClusterClient finalYarnCluster = yarnCluster;
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 8e1e877..2bbaadf 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -225,9 +226,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor();
Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
- flinkYarnClient.setTaskManagerCount(1);
- flinkYarnClient.setJobManagerMemory(768);
- flinkYarnClient.setTaskManagerMemory(1024);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
@@ -235,10 +233,17 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
+ final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(768)
+ .setTaskManagerMemoryMB(1024)
+ .setNumberTaskManagers(1)
+ .setSlotsPerTaskManager(1)
+ .createClusterSpecification();
+
// deploy
ClusterClient yarnCluster = null;
try {
- yarnCluster = flinkYarnClient.deploySessionCluster();
+ yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
} catch (Exception e) {
LOG.warn("Failing test", e);
Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 68a1509..e701104 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -136,6 +136,8 @@ public abstract class YarnTestBase extends TestLogger {
*/
protected static File tempConfPathForSecureRun = null;
+ protected static org.apache.flink.configuration.Configuration flinkConfiguration = new org.apache.flink.configuration.Configuration();
+
static {
YARN_CONFIGURATION = new YarnConfiguration();
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
@@ -631,6 +633,7 @@ public abstract class YarnTestBase extends TestLogger {
protected static class Runner extends Thread {
private final String[] args;
private final int expectedReturnValue;
+
private RunTypes type;
private FlinkYarnSessionCli yCli;
private Throwable runnerError;
@@ -647,7 +650,10 @@ public abstract class YarnTestBase extends TestLogger {
int returnValue;
switch (type) {
case YARN_SESSION:
- yCli = new FlinkYarnSessionCli("", "", false);
+ yCli = new FlinkYarnSessionCli(
+ "",
+ "",
+ false);
returnValue = yCli.run(args);
break;
case CLI_FRONTEND:
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index c5277a1..51b7600 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -27,7 +28,6 @@ import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -115,17 +115,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
*/
private Path sessionFilesDir;
- /**
- * If the user has specified a different number of slots, we store them here.
- */
- private int slots = -1;
-
- private int jobManagerMemoryMb = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue();
-
- private int taskManagerMemoryMb = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.defaultValue();
-
- private int taskManagerCount = 1;
-
private String yarnQueue;
private String configurationDirectory;
@@ -174,11 +163,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
flinkConfigurationPath = new Path(confFile.getAbsolutePath());
- slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-
- jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
- taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
-
userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
} catch (Exception e) {
LOG.debug("Config couldn't be loaded from environment variable.", e);
@@ -190,22 +174,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
*/
protected abstract Class<?> getApplicationMasterClass();
- public void setJobManagerMemory(int memoryMb) {
- if (memoryMb < MIN_JM_MEMORY) {
- throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
- + "of " + MIN_JM_MEMORY + " MB");
- }
- this.jobManagerMemoryMb = memoryMb;
- }
-
- public void setTaskManagerMemory(int memoryMb) {
- if (memoryMb < MIN_TM_MEMORY) {
- throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
- + "of " + MIN_TM_MEMORY + " MB");
- }
- this.taskManagerMemoryMb = memoryMb;
- }
-
public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
this.flinkConfiguration = conf;
@@ -216,17 +184,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
return flinkConfiguration;
}
- public void setTaskManagerSlots(int slots) {
- if (slots <= 0) {
- throw new IllegalArgumentException("Number of TaskManager slots must be positive");
- }
- this.slots = slots;
- }
-
- public int getTaskManagerSlots() {
- return this.slots;
- }
-
public void setQueue(String queue) {
this.yarnQueue = queue;
}
@@ -246,17 +203,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
this.configurationDirectory = configurationDirectory;
}
- public void setTaskManagerCount(int tmCount) {
- if (tmCount < 1) {
- throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
- }
- this.taskManagerCount = tmCount;
- }
-
- public int getTaskManagerCount() {
- return this.taskManagerCount;
- }
-
public void addShipFiles(List<File> shipFiles) {
for (File shipFile: shipFiles) {
// remove uberjar from ship list (by default everything in the lib/ folder is added to
@@ -311,8 +257,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
return this.dynamicPropertiesEncoded;
}
- private void isReadyForDeployment() throws YarnDeploymentException {
- if (taskManagerCount <= 0) {
+ private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws YarnDeploymentException {
+
+ if (clusterSpecification.getNumberTaskManagers() <= 0) {
throw new YarnDeploymentException("Taskmanager count must be positive");
}
if (this.flinkJarPath == null) {
@@ -332,7 +279,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// The number of cores can be configured in the config.
// If not configured, it is set to the number of task slots
int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
- int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, slots);
+ int configuredVcores = flinkConfiguration.getInteger(ConfigConstants.YARN_VCORES, clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of vcores
if (configuredVcores > numYarnVcores) {
throw new IllegalConfigurationException(
@@ -419,7 +366,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfiguration.setString(JobManagerOptions.ADDRESS, appReport.getHost());
flinkConfiguration.setInteger(JobManagerOptions.PORT, appReport.getRpcPort());
- return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
+ return createYarnClusterClient(
+ this,
+ -1, // we don't know the number of task managers of a started Flink cluster
+ -1, // we don't know how many slots each task manager has for a started Flink cluster
+ yarnClient,
+ appReport,
+ flinkConfiguration,
+ false);
} catch (Exception e) {
if (null != yarnClient) {
yarnClient.stop();
@@ -429,7 +383,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
@Override
- public YarnClusterClient deploySessionCluster() {
+ public YarnClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) {
try {
if (UserGroupInformation.isSecurityEnabled()) {
// note: UGI::hasKerberosCredentials inaccurately reports false
@@ -445,66 +399,32 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
"does not have Kerberos credentials");
}
}
- return deployInternal();
+ return deployInternal(clusterSpecification);
} catch (Exception e) {
throw new RuntimeException("Couldn't deploy Yarn cluster", e);
}
}
- /**
- * This method will block until the ApplicationMaster/JobManager have been
- * deployed on YARN.
- */
- protected YarnClusterClient deployInternal() throws Exception {
- isReadyForDeployment();
- LOG.info("Using values:");
- LOG.info("\tTaskManager count = {}", taskManagerCount);
- LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
- LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
+ protected ClusterSpecification validateClusterResources(
+ ClusterSpecification clusterSpecification,
+ int yarnMinAllocationMB,
+ Resource maximumResourceCapability,
+ ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
- final YarnClient yarnClient = getYarnClient();
-
- // ------------------ Check if the specified queue exists --------------------
+ int taskManagerCount = clusterSpecification.getNumberTaskManagers();
+ int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
+ int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
- try {
- List<QueueInfo> queues = yarnClient.getAllQueues();
- if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
- boolean queueFound = false;
- for (QueueInfo queue : queues) {
- if (queue.getQueueName().equals(this.yarnQueue)) {
- queueFound = true;
- break;
- }
- }
- if (!queueFound) {
- String queueNames = "";
- for (QueueInfo queue : queues) {
- queueNames += queue.getQueueName() + ", ";
- }
- LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
- "Available queues: " + queueNames);
- }
- } else {
- LOG.debug("The YARN cluster does not have any queues configured");
- }
- } catch (Throwable e) {
- LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Error details", e);
- }
+ if (jobManagerMemoryMb < MIN_JM_MEMORY) {
+ LOG.warn("The minimum JobManager memory is {}. Will set the JobManager memory to this value.", MIN_JM_MEMORY);
+ jobManagerMemoryMb = MIN_JM_MEMORY;
}
- // ------------------ Add dynamic properties to local flinkConfiguraton ------
- Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
- for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
- flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
+ if (taskManagerMemoryMb < MIN_TM_MEMORY) {
+ LOG.warn("The minimum TaskManager memory is {}. Will set the Taskmanager memory to this value.", MIN_TM_MEMORY);
+ taskManagerMemoryMb = MIN_TM_MEMORY;
}
- // ------------------ Check if the YARN ClusterClient has the requested resources --------------
-
- // the yarnMinAllocationMB specifies the smallest possible container allocation size.
- // all allocations below this value are automatically set to this value.
- final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
@@ -520,22 +440,15 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
taskManagerMemoryMb = yarnMinAllocationMB;
}
- // Create application via yarnClient
- final YarnClientApplication yarnApplication = yarnClient.createApplication();
- GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
-
- Resource maxRes = appResponse.getMaximumResourceCapability();
final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
- if (jobManagerMemoryMb > maxRes.getMemory()) {
- failSessionDuringDeployment(yarnClient, yarnApplication);
+ if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
- + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
+ + "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
}
- if (taskManagerMemoryMb > maxRes.getMemory()) {
- failSessionDuringDeployment(yarnClient, yarnApplication);
+ if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
- + "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
+ + "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
}
final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
@@ -543,49 +456,107 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
- ClusterResourceDescription freeClusterMem;
- try {
- freeClusterMem = getCurrentFreeClusterResources(yarnClient);
- } catch (YarnException | IOException e) {
- failSessionDuringDeployment(yarnClient, yarnApplication);
- throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
- }
- if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+ if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
- + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + noteRsc);
+ + "There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
}
- if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
+ if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
- + "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
+ + "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
}
- if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
+ if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
- + "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
+ + "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
}
// ----------------- check if the requested containers fit into the cluster.
- int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
+ int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
// first, allocate the jobManager somewhere.
if (!allocateResource(nmFree, jobManagerMemoryMb)) {
LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
- Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
+ Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
}
// allocate TaskManagers
for (int i = 0; i < taskManagerCount; i++) {
if (!allocateResource(nmFree, taskManagerMemoryMb)) {
LOG.warn("There is not enough memory available in the YARN cluster. " +
"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
- "NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
+ "NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
"the following NodeManagers are available: " + Arrays.toString(nmFree) + noteRsc);
}
}
- ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);
+ return new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(jobManagerMemoryMb)
+ .setTaskManagerMemoryMB(taskManagerMemoryMb)
+ .setNumberTaskManagers(clusterSpecification.getNumberTaskManagers())
+ .setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager())
+ .createClusterSpecification();
+
+ }
+
+ /**
+ * This method will block until the ApplicationMaster/JobManager have been
+ * deployed on YARN.
+ */
+ protected YarnClusterClient deployInternal(ClusterSpecification clusterSpecification) throws Exception {
+
+ isReadyForDeployment(clusterSpecification);
+
+ final YarnClient yarnClient = getYarnClient();
+
+ // ------------------ Check if the specified queue exists --------------------
+
+ checkYarnQueues(yarnClient);
+
+ // ------------------ Add dynamic properties to local flinkConfiguraton ------
+ Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
+ for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
+ flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
+ }
+
+ // ------------------ Check if the YARN ClusterClient has the requested resources --------------
+
+ // Create application via yarnClient
+ final YarnClientApplication yarnApplication = yarnClient.createApplication();
+ final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
+
+ Resource maxRes = appResponse.getMaximumResourceCapability();
+
+ final ClusterResourceDescription freeClusterMem;
+ try {
+ freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+ } catch (YarnException | IOException e) {
+ failSessionDuringDeployment(yarnClient, yarnApplication);
+ throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
+ }
+
+ final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+
+ final ClusterSpecification validClusterSpecification;
+ try {
+ validClusterSpecification = validateClusterResources(
+ clusterSpecification,
+ yarnMinAllocationMB,
+ maxRes,
+ freeClusterMem);
+ } catch (YarnDeploymentException yde) {
+ failSessionDuringDeployment(yarnClient, yarnApplication);
+ throw yde;
+ }
+
+ LOG.info("Cluster specification: {}", validClusterSpecification);
+
+ ApplicationReport report = startAppMaster(
+ null,
+ yarnClient,
+ yarnApplication,
+ clusterSpecification);
String host = report.getHost();
int port = report.getRpcPort();
@@ -595,10 +566,51 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
// the Flink cluster is deployed in YARN. Represent cluster
- return createYarnClusterClient(this, yarnClient, report, flinkConfiguration, true);
+ return createYarnClusterClient(
+ this,
+ clusterSpecification.getNumberTaskManagers(),
+ clusterSpecification.getSlotsPerTaskManager(),
+ yarnClient,
+ report,
+ flinkConfiguration,
+ true);
+ }
+
+ private void checkYarnQueues(YarnClient yarnClient) {
+ try {
+ List<QueueInfo> queues = yarnClient.getAllQueues();
+ if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
+ boolean queueFound = false;
+ for (QueueInfo queue : queues) {
+ if (queue.getQueueName().equals(this.yarnQueue)) {
+ queueFound = true;
+ break;
+ }
+ }
+ if (!queueFound) {
+ String queueNames = "";
+ for (QueueInfo queue : queues) {
+ queueNames += queue.getQueueName() + ", ";
+ }
+ LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
+ "Available queues: " + queueNames);
+ }
+ } else {
+ LOG.debug("The YARN cluster does not have any queues configured");
+ }
+ } catch (Throwable e) {
+ LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Error details", e);
+ }
+ }
}
- public ApplicationReport startAppMaster(JobGraph jobGraph, YarnClient yarnClient, YarnClientApplication yarnApplication) throws Exception {
+ public ApplicationReport startAppMaster(
+ JobGraph jobGraph,
+ YarnClient yarnClient,
+ YarnClientApplication yarnApplication,
+ ClusterSpecification clusterSpecification) throws Exception {
// ------------------ Set default file system scheme -------------------------
@@ -802,7 +814,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
localResources.put(Utils.KEYTAB_FILE_NAME, keytabResource);
}
- final ContainerLaunchContext amContainer = setupApplicationMasterContainer(hasLogback, hasLog4j, hasKrb5);
+ final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
+ hasLogback,
+ hasLog4j,
+ hasKrb5,
+ clusterSpecification.getMasterMemoryMB());
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
@@ -821,13 +837,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());
// set Flink on YARN internal configuration values
- appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(taskManagerCount));
- appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
+ appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(clusterSpecification.getNumberTaskManagers()));
+ appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(clusterSpecification.getTaskManagerMemoryMB()));
appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
- appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(slots));
+ appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager()));
appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, yarnFilesDir.toUri().toString());
@@ -858,12 +874,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(jobManagerMemoryMb);
+ capability.setMemory(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(1);
String name;
if (customName == null) {
- name = "Flink session with " + taskManagerCount + " TaskManagers";
+ name = "Flink session with " + clusterSpecification.getNumberTaskManagers() + " TaskManagers";
if (detached) {
name += " (detached)";
}
@@ -1288,9 +1304,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
- protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogback,
- boolean hasLog4j,
- boolean hasKrb5) {
+ protected ContainerLaunchContext setupApplicationMasterContainer(
+ boolean hasLogback,
+ boolean hasLog4j,
+ boolean hasKrb5,
+ int jobManagerMemoryMb) {
// ------------------ Prepare Application Master Container ------------------------------
// respect custom JVM options in the YAML file
@@ -1365,12 +1383,16 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
*/
protected YarnClusterClient createYarnClusterClient(
AbstractYarnClusterDescriptor descriptor,
+ int numberTaskManagers,
+ int slotsPerTaskManager,
YarnClient yarnClient,
ApplicationReport report,
org.apache.flink.configuration.Configuration flinkConfiguration,
boolean perJobCluster) throws Exception {
return new YarnClusterClient(
descriptor,
+ numberTaskManagers,
+ slotsPerTaskManager,
yarnClient,
report,
flinkConfiguration,
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index a435ef7..ceca29d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -78,6 +78,8 @@ public class YarnClusterClient extends ClusterClient {
//---------- Class internal fields -------------------
private final AbstractYarnClusterDescriptor clusterDescriptor;
+ private final int numberTaskManagers;
+ private final int slotsPerTaskManager;
private final LazApplicationClientLoader applicationClient;
private final FiniteDuration akkaDuration;
private final ApplicationReport appReport;
@@ -93,6 +95,8 @@ public class YarnClusterClient extends ClusterClient {
* Create a new Flink on YARN cluster.
*
* @param clusterDescriptor The descriptor used at cluster creation
+ * @param numberTaskManagers The number of task managers, -1 if unknown
+ * @param slotsPerTaskManager Slots per task manager, -1 if unknown
* @param yarnClient Client to talk to YARN
* @param appReport the YARN application ID
* @param flinkConfig Flink configuration
@@ -102,6 +106,8 @@ public class YarnClusterClient extends ClusterClient {
*/
public YarnClusterClient(
final AbstractYarnClusterDescriptor clusterDescriptor,
+ final int numberTaskManagers,
+ final int slotsPerTaskManager,
final YarnClient yarnClient,
final ApplicationReport appReport,
Configuration flinkConfig,
@@ -111,6 +117,8 @@ public class YarnClusterClient extends ClusterClient {
this.akkaDuration = AkkaUtils.getTimeout(flinkConfig);
this.clusterDescriptor = clusterDescriptor;
+ this.numberTaskManagers = numberTaskManagers;
+ this.slotsPerTaskManager = slotsPerTaskManager;
this.yarnClient = yarnClient;
this.appReport = appReport;
this.appId = appReport.getApplicationId();
@@ -186,7 +194,8 @@ public class YarnClusterClient extends ClusterClient {
@Override
public int getMaxSlots() {
- int maxSlots = clusterDescriptor.getTaskManagerCount() * clusterDescriptor.getTaskManagerSlots();
+ // TODO: this should be retrieved from the running Flink cluster
+ int maxSlots = numberTaskManagers * slotsPerTaskManager;
return maxSlots > 0 ? maxSlots : -1;
}
@@ -506,8 +515,8 @@ public class YarnClusterClient extends ClusterClient {
currentStatus = getClusterStatus();
if (currentStatus != null && !currentStatus.equals(lastStatus)) {
logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/"
- + clusterDescriptor.getTaskManagerCount() + ")");
- if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) {
+ + numberTaskManagers + ")");
+ if (currentStatus.numRegisteredTaskManagers() >= numberTaskManagers) {
logAndSysout("All TaskManagers are connected");
break;
}
@@ -529,14 +538,14 @@ public class YarnClusterClient extends ClusterClient {
private static class LazApplicationClientLoader {
- private final org.apache.flink.configuration.Configuration flinkConfig;
+ private final Configuration flinkConfig;
private final LazyActorSystemLoader actorSystemLoader;
private final HighAvailabilityServices highAvailabilityServices;
private ActorRef applicationClient;
private LazApplicationClientLoader(
- org.apache.flink.configuration.Configuration flinkConfig,
+ Configuration flinkConfig,
LazyActorSystemLoader actorSystemLoader,
HighAvailabilityServices highAvailabilityServices) {
this.flinkConfig = Preconditions.checkNotNull(flinkConfig, "flinkConfig");
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
index f58e6aa..d1af6dc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientV2.java
@@ -26,10 +26,7 @@ import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +76,7 @@ public class YarnClusterClientV2 extends ClusterClient {
@Override
public int getMaxSlots() {
- // Now need not set max slot
+ // No need not set max slot
return 0;
}
@@ -90,25 +87,7 @@ public class YarnClusterClientV2 extends ClusterClient {
@Override
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
- try {
- // Create application via yarnClient
- final YarnClientApplication yarnApplication = yarnClient.createApplication();
- ApplicationReport report = this.clusterDescriptor.startAppMaster(jobGraph, yarnClient, yarnApplication);
- if (report.getYarnApplicationState().equals(YarnApplicationState.RUNNING)) {
- appId = report.getApplicationId();
- trackingURL = report.getTrackingUrl();
- logAndSysout("Please refer to " + getWebInterfaceURL()
- + " for the running status of job " + jobGraph.getJobID().toString());
- //TODO: not support attach mode now
- return new JobSubmissionResult(jobGraph.getJobID());
- }
- else {
- throw new ProgramInvocationException("Fail to submit the job.");
- }
- }
- catch (Exception e) {
- throw new ProgramInvocationException("Fail to submit the job", e.getCause());
- }
+ throw new UnsupportedOperationException("Not yet implemented.");
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
index aaa9bac..af49fa8 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnCLI.java
@@ -144,12 +144,6 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
}
- // JobManager Memory
- if (cmd.hasOption(jmMemory.getOpt())) {
- int jmMemory = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt()));
- yarnClusterDescriptor.setJobManagerMemory(jmMemory);
- }
-
String[] dynamicProperties = null;
if (cmd.hasOption(this.dynamicProperties.getOpt())) {
dynamicProperties = cmd.getOptionValues(this.dynamicProperties.getOpt());
@@ -232,7 +226,9 @@ public class FlinkYarnCLI implements CustomCommandLine<YarnClusterClientV2> {
yarnClusterDescriptor.setFlinkConfiguration(config);
yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
- return new YarnClusterClientV2(yarnClusterDescriptor, config);
+ return new YarnClusterClientV2(
+ yarnClusterDescriptor,
+ config);
}
private void logAndSysout(String message) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 60223ef..a887e54 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -20,11 +20,14 @@ package org.apache.flink.yarn.cli;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.security.SecurityUtils;
@@ -258,13 +261,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
AbstractYarnClusterDescriptor yarnClusterDescriptor = getClusterDescriptor();
- if (!cmd.hasOption(container.getOpt())) { // number of containers is required option!
- LOG.error("Missing required argument {}", container.getOpt());
- printUsage();
- throw new IllegalArgumentException("Missing required argument " + container.getOpt());
- }
- yarnClusterDescriptor.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(container.getOpt())));
-
// Jar Path
Path localJarPath;
if (cmd.hasOption(flinkJar.getOpt())) {
@@ -309,23 +305,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
}
- // JobManager Memory
- if (cmd.hasOption(jmMemory.getOpt())) {
- int jmMemory = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt()));
- yarnClusterDescriptor.setJobManagerMemory(jmMemory);
- }
-
- // Task Managers memory
- if (cmd.hasOption(tmMemory.getOpt())) {
- int tmMemory = Integer.valueOf(cmd.getOptionValue(this.tmMemory.getOpt()));
- yarnClusterDescriptor.setTaskManagerMemory(tmMemory);
- }
-
- if (cmd.hasOption(slots.getOpt())) {
- int slots = Integer.valueOf(cmd.getOptionValue(this.slots.getOpt()));
- yarnClusterDescriptor.setTaskManagerSlots(slots);
- }
-
String[] dynamicProperties = null;
if (cmd.hasOption(dynamicproperties.getOpt())) {
dynamicProperties = cmd.getOptionValues(dynamicproperties.getOpt());
@@ -353,28 +332,60 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespace);
}
- // ----- Convenience -----
+ return yarnClusterDescriptor;
+ }
+
+ public ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) {
+ if (!cmd.hasOption(container.getOpt())) { // number of containers is required option!
+ LOG.error("Missing required argument {}", container.getOpt());
+ printUsage();
+ throw new IllegalArgumentException("Missing required argument " + container.getOpt());
+ }
+
+ int numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt()));
- // the number of slots available from YARN:
- int yarnTmSlots = yarnClusterDescriptor.getTaskManagerSlots();
- if (yarnTmSlots == -1) {
- yarnTmSlots = 1;
- yarnClusterDescriptor.setTaskManagerSlots(yarnTmSlots);
+ // JobManager Memory
+ final int jobManagerMemoryMB;
+ if (cmd.hasOption(jmMemory.getOpt())) {
+ jobManagerMemoryMB = Integer.valueOf(cmd.getOptionValue(this.jmMemory.getOpt()));
+ } else {
+ jobManagerMemoryMB = configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
}
- int maxSlots = yarnTmSlots * yarnClusterDescriptor.getTaskManagerCount();
+ // Task Managers memory
+ final int taskManagerMemoryMB;
+ if (cmd.hasOption(tmMemory.getOpt())) {
+ taskManagerMemoryMB = Integer.valueOf(cmd.getOptionValue(this.tmMemory.getOpt()));
+ } else {
+ taskManagerMemoryMB = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+ }
+
+ int slotsPerTaskManager;
+ if (cmd.hasOption(slots.getOpt())) {
+ slotsPerTaskManager = Integer.valueOf(cmd.getOptionValue(this.slots.getOpt()));
+ } else {
+ slotsPerTaskManager = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+ }
+
+ // convenience
int userParallelism = Integer.valueOf(cmd.getOptionValue(CliFrontendParser.PARALLELISM_OPTION.getOpt(), "-1"));
+ int maxSlots = slotsPerTaskManager * numberTaskManagers;
if (userParallelism != -1) {
- int slotsPerTM = (int) Math.ceil((double) userParallelism / yarnClusterDescriptor.getTaskManagerCount());
+ int slotsPerTM = (int) Math.ceil((double) userParallelism / numberTaskManagers);
String message = "The YARN cluster has " + maxSlots + " slots available, " +
"but the user requested a parallelism of " + userParallelism + " on YARN. " +
- "Each of the " + yarnClusterDescriptor.getTaskManagerCount() + " TaskManagers " +
+ "Each of the " + numberTaskManagers + " TaskManagers " +
"will get " + slotsPerTM + " slots.";
logAndSysout(message);
- yarnClusterDescriptor.setTaskManagerSlots(slotsPerTM);
+ slotsPerTaskManager = slotsPerTM;
}
- return yarnClusterDescriptor;
+ return new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(jobManagerMemoryMB)
+ .setTaskManagerMemoryMB(taskManagerMemoryMB)
+ .setNumberTaskManagers(numberTaskManagers)
+ .setSlotsPerTaskManager(slotsPerTaskManager)
+ .createClusterSpecification();
}
private void printUsage() {
@@ -477,8 +488,8 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
public static void main(final String[] args) throws Exception {
- final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
+ final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfiguration));
int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
@Override
@@ -547,11 +558,12 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
Preconditions.checkNotNull(userJarFiles, "User jar files should not be null.");
AbstractYarnClusterDescriptor yarnClusterDescriptor = createDescriptor(applicationName, cmdLine);
+ ClusterSpecification clusterSpecification = createClusterSpecification(config, cmdLine);
yarnClusterDescriptor.setFlinkConfiguration(config);
yarnClusterDescriptor.setProvidedUserJarFiles(userJarFiles);
try {
- return yarnClusterDescriptor.deploySessionCluster();
+ return yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
} catch (Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
}
@@ -626,8 +638,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
return 1;
}
+ final ClusterSpecification clusterSpecification = createClusterSpecification(yarnDescriptor.getFlinkConfiguration(), cmd);
+
try {
- yarnCluster = yarnDescriptor.deploySessionCluster();
+ yarnCluster = yarnDescriptor.deploySessionCluster(clusterSpecification);
} catch (Exception e) {
System.err.println("Error while deploying YARN cluster: " + e.getMessage());
e.printStackTrace(System.err);
@@ -646,9 +660,9 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
Properties yarnProps = new Properties();
yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString());
- if (yarnDescriptor.getTaskManagerSlots() != -1) {
+ if (clusterSpecification.getSlotsPerTaskManager() != -1) {
String parallelism =
- Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount());
+ Integer.toString(clusterSpecification.getSlotsPerTaskManager() * clusterSpecification.getNumberTaskManagers());
yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
}
// add dynamic properties
http://git-wip-us.apache.org/repos/asf/flink/blob/7cf997d1/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 14738e6..a862b50 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -18,10 +18,12 @@
package org.apache.flink.yarn;
+import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.Path;
@@ -40,7 +42,7 @@ import static org.junit.Assert.fail;
/**
* Tests for the {@link YarnClusterDescriptor}.
*/
-public class YarnClusterDescriptorTest {
+public class YarnClusterDescriptorTest extends TestLogger {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -65,11 +67,15 @@ public class YarnClusterDescriptorTest {
clusterDescriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
clusterDescriptor.setConfigurationFilePath(new Path(flinkConf.getPath()));
- // configure slots too high
- clusterDescriptor.setTaskManagerSlots(Integer.MAX_VALUE);
+ ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(-1)
+ .setTaskManagerMemoryMB(-1)
+ .setNumberTaskManagers(1)
+ .setSlotsPerTaskManager(Integer.MAX_VALUE)
+ .createClusterSpecification();
try {
- clusterDescriptor.deploySessionCluster();
+ clusterDescriptor.deploySessionCluster(clusterSpecification);
fail("The deploy call should have failed.");
} catch (RuntimeException e) {
@@ -95,10 +101,15 @@ public class YarnClusterDescriptorTest {
clusterDescriptor.setConfigurationFilePath(new Path(flinkConf.getPath()));
// configure slots
- clusterDescriptor.setTaskManagerSlots(1);
+ ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(-1)
+ .setTaskManagerMemoryMB(-1)
+ .setNumberTaskManagers(1)
+ .setSlotsPerTaskManager(1)
+ .createClusterSpecification();
try {
- clusterDescriptor.deploySessionCluster();
+ clusterDescriptor.deploySessionCluster(clusterSpecification);
fail("The deploy call should have failed.");
} catch (RuntimeException e) {
@@ -132,6 +143,7 @@ public class YarnClusterDescriptorTest {
final String redirects =
"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err";
+ final int jobManagerMemory = 1024;
// no logging, with/out krb5
assertEquals(
@@ -140,7 +152,11 @@ public class YarnClusterDescriptorTest {
" " + // logging
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(false, false, false)
+ .setupApplicationMasterContainer(
+ false,
+ false,
+ false,
+ jobManagerMemory)
.getCommands().get(0));
assertEquals(
@@ -149,7 +165,11 @@ public class YarnClusterDescriptorTest {
" " + // logging
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(false, false, true)
+ .setupApplicationMasterContainer(
+ false,
+ false,
+ true,
+ jobManagerMemory)
.getCommands().get(0));
// logback only, with/out krb5
@@ -159,7 +179,11 @@ public class YarnClusterDescriptorTest {
" " + logfile + " " + logback +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(true, false, false)
+ .setupApplicationMasterContainer(
+ true,
+ false,
+ false,
+ jobManagerMemory)
.getCommands().get(0));
assertEquals(
@@ -168,7 +192,11 @@ public class YarnClusterDescriptorTest {
" " + logfile + " " + logback +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(true, false, true)
+ .setupApplicationMasterContainer(
+ true,
+ false,
+ true,
+ jobManagerMemory)
.getCommands().get(0));
// log4j, with/out krb5
@@ -178,7 +206,11 @@ public class YarnClusterDescriptorTest {
" " + logfile + " " + log4j +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(false, true, false)
+ .setupApplicationMasterContainer(
+ false,
+ true,
+ false,
+ jobManagerMemory)
.getCommands().get(0));
assertEquals(
@@ -187,7 +219,11 @@ public class YarnClusterDescriptorTest {
" " + logfile + " " + log4j +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(false, true, true)
+ .setupApplicationMasterContainer(
+ false,
+ true,
+ true,
+ jobManagerMemory)
.getCommands().get(0));
// logback + log4j, with/out krb5
@@ -197,7 +233,11 @@ public class YarnClusterDescriptorTest {
" " + logfile + " " + logback + " " + log4j +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(true, true, false)
+ .setupApplicationMasterContainer(
+ true,
+ true,
+ false,
+ jobManagerMemory)
.getCommands().get(0));
assertEquals(
@@ -206,7 +246,11 @@ public class YarnClusterDescriptorTest {
" " + logfile + " " + logback + " " + log4j +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(true, true, true)
+ .setupApplicationMasterContainer(
+ true,
+ true,
+ true,
+ jobManagerMemory)
.getCommands().get(0));
// logback + log4j, with/out krb5, different JVM opts
@@ -217,7 +261,11 @@ public class YarnClusterDescriptorTest {
" " + logfile + " " + logback + " " + log4j +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(true, true, false)
+ .setupApplicationMasterContainer(
+ true,
+ true,
+ false,
+ jobManagerMemory)
.getCommands().get(0));
assertEquals(
@@ -226,7 +274,11 @@ public class YarnClusterDescriptorTest {
" " + logfile + " " + logback + " " + log4j +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(true, true, true)
+ .setupApplicationMasterContainer(
+ true,
+ true,
+ true,
+ jobManagerMemory)
.getCommands().get(0));
// logback + log4j, with/out krb5, different JVM opts
@@ -237,7 +289,11 @@ public class YarnClusterDescriptorTest {
" " + logfile + " " + logback + " " + log4j +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(true, true, false)
+ .setupApplicationMasterContainer(
+ true,
+ true,
+ false,
+ jobManagerMemory)
.getCommands().get(0));
assertEquals(
@@ -246,7 +302,11 @@ public class YarnClusterDescriptorTest {
" " + logfile + " " + logback + " " + log4j +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(true, true, true)
+ .setupApplicationMasterContainer(
+ true,
+ true,
+ true,
+ jobManagerMemory)
.getCommands().get(0));
// now try some configurations with different yarn.container-start-command-template
@@ -259,7 +319,11 @@ public class YarnClusterDescriptorTest {
" 3 " + logfile + " " + logback + " " + log4j +
" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(true, true, true)
+ .setupApplicationMasterContainer(
+ true,
+ true,
+ true,
+ jobManagerMemory)
.getCommands().get(0));
cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
@@ -271,7 +335,11 @@ public class YarnClusterDescriptorTest {
" " + jvmmem +
" " + mainClass + " " + args + " " + redirects,
clusterDescriptor
- .setupApplicationMasterContainer(true, true, true)
+ .setupApplicationMasterContainer(
+ true,
+ true,
+ true,
+ jobManagerMemory)
.getCommands().get(0));
}
}