You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/11 16:15:01 UTC
[2/5] flink git commit: [FLINK-8329] [flip6] Move YarnClient to
AbstractYarnClusterDescriptor
[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor
Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the client and gives
a better separation of concerns.
This closes #5216.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/156b8935
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/156b8935
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/156b8935
Branch: refs/heads/master
Commit: 156b8935ef76eb53456cea1d40fd528ccefa21d8
Parents: 2ce5b98
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 20 16:43:21 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 16:59:29 2018 +0100
----------------------------------------------------------------------
.../client/deployment/ClusterDescriptor.java | 2 +-
.../Flip6StandaloneClusterDescriptor.java | 5 +
.../deployment/StandaloneClusterDescriptor.java | 5 +
...CliFrontendYarnAddressConfigurationTest.java | 80 ++--
.../flink/yarn/FlinkYarnSessionCliTest.java | 6 +-
.../yarn/TestingYarnClusterDescriptor.java | 7 +-
.../java/org/apache/flink/yarn/YARNITCase.java | 47 +-
.../flink/yarn/YARNSessionFIFOITCase.java | 81 ++--
.../flink/yarn/YarnClusterDescriptorTest.java | 94 ++--
.../yarn/AbstractYarnClusterDescriptor.java | 87 ++--
.../apache/flink/yarn/YarnClusterClient.java | 14 +-
.../flink/yarn/YarnClusterDescriptor.java | 9 +-
.../flink/yarn/YarnClusterDescriptorV2.java | 9 +-
.../flink/yarn/cli/FlinkYarnSessionCli.java | 6 +-
.../flink/yarn/YarnClusterDescriptorTest.java | 451 ++++++++++---------
15 files changed, 483 insertions(+), 420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index a62ceff..1603930 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
/**
* A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication.
*/
-public interface ClusterDescriptor<ClientType extends ClusterClient> {
+public interface ClusterDescriptor<ClientType extends ClusterClient> extends AutoCloseable {
/**
* Returns a String containing details about the cluster (NodeManagers, available memory, ...).
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
index 9d88f59..b8eb534 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
@@ -60,4 +60,9 @@ public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor<RestC
public RestClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
throw new UnsupportedOperationException("Can't deploy a standalone FLIP-6 per-job cluster.");
}
+
+ @Override
+ public void close() throws Exception {
+ // nothing to do
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 51e267a..3808efa 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -59,4 +59,9 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
public StandaloneClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
throw new UnsupportedOperationException("Can't deploy a standalone per-job cluster.");
}
+
+ @Override
+ public void close() throws Exception {
+ // nothing to do
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 1b457a5..56087a1 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -38,7 +38,6 @@ import org.apache.commons.cli.CommandLine;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.AfterClass;
@@ -375,12 +374,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
public TestingYarnClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
- super(flinkConfiguration, configurationDirectory);
- }
-
- @Override
- public YarnClient getYarnClient() {
- return new TestYarnClient();
+ super(
+ flinkConfiguration,
+ configurationDirectory,
+ new TestYarnClient(finalApplicationStatus));
}
@Override
@@ -388,52 +385,51 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
AbstractYarnClusterDescriptor descriptor,
int numberTaskManagers,
int slotsPerTaskManager,
- YarnClient yarnClient,
ApplicationReport report,
Configuration flinkConfiguration,
boolean perJobCluster) throws IOException, YarnException {
return Mockito.mock(YarnClusterClient.class);
}
+ }
+ }
- private class TestYarnClient extends YarnClientImpl {
-
- private final List<ApplicationReport> reports = new LinkedList<>();
-
- TestYarnClient() {
- { // a report that of our Yarn application we want to resume from
- ApplicationReport report = Mockito.mock(ApplicationReport.class);
- Mockito.when(report.getHost()).thenReturn(TEST_YARN_JOB_MANAGER_ADDRESS);
- Mockito.when(report.getRpcPort()).thenReturn(TEST_YARN_JOB_MANAGER_PORT);
- Mockito.when(report.getApplicationId()).thenReturn(TEST_YARN_APPLICATION_ID);
- Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
- this.reports.add(report);
- }
- { // a second report, just for noise
- ApplicationReport report = Mockito.mock(ApplicationReport.class);
- Mockito.when(report.getHost()).thenReturn("1.2.3.4");
- Mockito.when(report.getRpcPort()).thenReturn(-123);
- Mockito.when(report.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 0));
- Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
- this.reports.add(report);
- }
- }
+ private static class TestYarnClient extends YarnClientImpl {
- @Override
- public List<ApplicationReport> getApplications() throws YarnException, IOException {
- return reports;
- }
+ private final List<ApplicationReport> reports = new LinkedList<>();
- @Override
- public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
- for (ApplicationReport report : reports) {
- if (report.getApplicationId().equals(appId)) {
- return report;
- }
- }
- throw new YarnException();
+ TestYarnClient(FinalApplicationStatus finalApplicationStatus) {
+ { // a report that of our Yarn application we want to resume from
+ ApplicationReport report = Mockito.mock(ApplicationReport.class);
+ Mockito.when(report.getHost()).thenReturn(TEST_YARN_JOB_MANAGER_ADDRESS);
+ Mockito.when(report.getRpcPort()).thenReturn(TEST_YARN_JOB_MANAGER_PORT);
+ Mockito.when(report.getApplicationId()).thenReturn(TEST_YARN_APPLICATION_ID);
+ Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
+ this.reports.add(report);
+ }
+ { // a second report, just for noise
+ ApplicationReport report = Mockito.mock(ApplicationReport.class);
+ Mockito.when(report.getHost()).thenReturn("1.2.3.4");
+ Mockito.when(report.getRpcPort()).thenReturn(-123);
+ Mockito.when(report.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 0));
+ Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
+ this.reports.add(report);
+ }
+ }
+
+ @Override
+ public List<ApplicationReport> getApplications() throws YarnException, IOException {
+ return reports;
+ }
+
+ @Override
+ public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
+ for (ApplicationReport report : reports) {
+ if (report.getApplicationId().equals(appId)) {
+ return report;
}
}
+ throw new YarnException();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 8eef8f0..3fe8d2f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -174,7 +174,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
private static class JarAgnosticClusterDescriptor extends YarnClusterDescriptor {
public JarAgnosticClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
- super(flinkConfiguration, configurationDirectory);
+ super(
+ flinkConfiguration,
+ configurationDirectory,
+ YarnClient.createYarnClient());
}
@Override
@@ -202,7 +205,6 @@ public class FlinkYarnSessionCliTest extends TestLogger {
super(descriptor,
numberTaskManagers,
slotsPerTaskManager,
- Mockito.mock(YarnClient.class),
Mockito.mock(ApplicationReport.class),
config,
false);
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 30d2798..e66d2e0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
import java.io.File;
import java.io.FilenameFilter;
import java.util.ArrayList;
@@ -36,7 +38,10 @@ import java.util.List;
public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
public TestingYarnClusterDescriptor(Configuration configuration, String configurationDirectory) {
- super(configuration, configurationDirectory);
+ super(
+ configuration,
+ configurationDirectory,
+ YarnClient.createYarnClient());
List<File> filesToShip = new ArrayList<>();
File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index bc28c5b..069f68a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@@ -49,35 +50,43 @@ public class YARNITCase extends YarnTestBase {
@Ignore("The cluster cannot be stopped yet.")
@Test
- public void testPerJobMode() {
+ public void testPerJobMode() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
- YarnClusterDescriptorV2 yarnClusterDescriptorV2 = new YarnClusterDescriptorV2(configuration, System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR));
+ final YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClusterDescriptorV2.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
- yarnClusterDescriptorV2.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+ try (final YarnClusterDescriptorV2 yarnClusterDescriptorV2 = new YarnClusterDescriptorV2(
+ configuration,
+ System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
+ yarnClient)) {
- final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
- .setMasterMemoryMB(768)
- .setTaskManagerMemoryMB(1024)
- .setSlotsPerTaskManager(1)
- .setNumberTaskManagers(1)
- .createClusterSpecification();
+ yarnClusterDescriptorV2.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+ yarnClusterDescriptorV2.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(2);
+ final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(768)
+ .setTaskManagerMemoryMB(1024)
+ .setSlotsPerTaskManager(1)
+ .setNumberTaskManagers(1)
+ .createClusterSpecification();
- env.addSource(new InfiniteSource())
- .shuffle()
- .addSink(new DiscardingSink<Integer>());
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
- final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ env.addSource(new InfiniteSource())
+ .shuffle()
+ .addSink(new DiscardingSink<Integer>());
- File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+ File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
- YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph);
+ jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+
+ YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph);
+
+ clusterClient.shutdown();
+ }
}
private static class InfiniteSource implements ParallelSourceFunction<Integer> {
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index dd56f2f..ec8ef50 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -227,51 +227,56 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
Configuration configuration = GlobalConfiguration.loadConfiguration();
- AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor(configuration, confDirPath);
- Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
- flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
- flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+ final YarnClient yarnClient = YarnClient.createYarnClient();
- final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
- .setMasterMemoryMB(768)
- .setTaskManagerMemoryMB(1024)
- .setNumberTaskManagers(1)
- .setSlotsPerTaskManager(1)
- .createClusterSpecification();
+ try (final AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+ configuration,
+ confDirPath,
+ yarnClient)) {
+ Assert.assertNotNull("unable to get yarn client", clusterDescriptor);
+ clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+ clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
- // deploy
- ClusterClient yarnCluster = null;
- try {
- yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
- } catch (Exception e) {
- LOG.warn("Failing test", e);
- Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
- }
- GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
- for (int second = 0; second < waitTime * 2; second++) { // run "forever"
+ final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+ .setMasterMemoryMB(768)
+ .setTaskManagerMemoryMB(1024)
+ .setNumberTaskManagers(1)
+ .setSlotsPerTaskManager(1)
+ .createClusterSpecification();
+ // deploy
+ ClusterClient yarnCluster = null;
try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted", e);
- }
- GetClusterStatusResponse status = yarnCluster.getClusterStatus();
- if (status != null && status.equals(expectedStatus)) {
- LOG.info("ClusterClient reached status " + status);
- break; // all good, cluster started
+ yarnCluster = clusterDescriptor.deploySessionCluster(clusterSpecification);
+ } catch (Exception e) {
+ LOG.warn("Failing test", e);
+ Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
}
- if (second > waitTime) {
- // we waited for 15 seconds. cluster didn't come up correctly
- Assert.fail("The custer didn't start after " + waitTime + " seconds");
+ GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
+ for (int second = 0; second < waitTime * 2; second++) { // run "forever"
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted", e);
+ }
+ GetClusterStatusResponse status = yarnCluster.getClusterStatus();
+ if (status != null && status.equals(expectedStatus)) {
+ LOG.info("ClusterClient reached status " + status);
+ break; // all good, cluster started
+ }
+ if (second > waitTime) {
+ // we waited for 15 seconds. cluster didn't come up correctly
+ Assert.fail("The custer didn't start after " + waitTime + " seconds");
+ }
}
- }
- // use the cluster
- Assert.assertNotNull(yarnCluster.getJobManagerAddress());
- Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
+ // use the cluster
+ Assert.assertNotNull(yarnCluster.getJobManagerAddress());
+ Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
- LOG.info("Shutting down cluster. All tests passed");
- // shutdown cluster
- yarnCluster.shutdown();
+ LOG.info("Shutting down cluster. All tests passed");
+ // shutdown cluster
+ yarnCluster.shutdown();
+ }
LOG.info("Finished testJavaAPI()");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index f3e48c5..5144550 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -50,32 +51,40 @@ public class YarnClusterDescriptorTest extends TestLogger {
*/
@Test
public void testExplicitLibShipping() throws Exception {
- AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(new Configuration(), temporaryFolder.getRoot().getAbsolutePath());
- descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
+ AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
+ new Configuration(),
+ temporaryFolder.getRoot().getAbsolutePath(),
+ YarnClient.createYarnClient());
- File libFile = temporaryFolder.newFile("libFile.jar");
- File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
+ try {
+ descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
+
+ File libFile = temporaryFolder.newFile("libFile.jar");
+ File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
- Assert.assertFalse(descriptor.shipFiles.contains(libFile));
- Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
+ Assert.assertFalse(descriptor.shipFiles.contains(libFile));
+ Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
- List<File> shipFiles = new ArrayList<>();
- shipFiles.add(libFile);
- shipFiles.add(libFolder);
+ List<File> shipFiles = new ArrayList<>();
+ shipFiles.add(libFile);
+ shipFiles.add(libFolder);
- descriptor.addShipFiles(shipFiles);
+ descriptor.addShipFiles(shipFiles);
- Assert.assertTrue(descriptor.shipFiles.contains(libFile));
- Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+ Assert.assertTrue(descriptor.shipFiles.contains(libFile));
+ Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
- // only execute part of the deployment to test for shipped files
- Set<File> effectiveShipFiles = new HashSet<>();
- descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+ // only execute part of the deployment to test for shipped files
+ Set<File> effectiveShipFiles = new HashSet<>();
+ descriptor.addLibFolderToShipFiles(effectiveShipFiles);
- Assert.assertEquals(0, effectiveShipFiles.size());
- Assert.assertEquals(2, descriptor.shipFiles.size());
- Assert.assertTrue(descriptor.shipFiles.contains(libFile));
- Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+ Assert.assertEquals(0, effectiveShipFiles.size());
+ Assert.assertEquals(2, descriptor.shipFiles.size());
+ Assert.assertTrue(descriptor.shipFiles.contains(libFile));
+ Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+ } finally {
+ descriptor.close();
+ }
}
/**
@@ -83,30 +92,37 @@ public class YarnClusterDescriptorTest extends TestLogger {
*/
@Test
public void testEnvironmentLibShipping() throws Exception {
- AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(new Configuration(), temporaryFolder.getRoot().getAbsolutePath());
+ AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
+ new Configuration(),
+ temporaryFolder.getRoot().getAbsolutePath(),
+ YarnClient.createYarnClient());
- File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
- File libFile = new File(libFolder, "libFile.jar");
- libFile.createNewFile();
-
- Set<File> effectiveShipFiles = new HashSet<>();
-
- final Map<String, String> oldEnv = System.getenv();
try {
- Map<String, String> env = new HashMap<>(1);
- env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
- TestBaseUtils.setEnv(env);
- // only execute part of the deployment to test for shipped files
- descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+ File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
+ File libFile = new File(libFolder, "libFile.jar");
+ libFile.createNewFile();
+
+ Set<File> effectiveShipFiles = new HashSet<>();
+
+ final Map<String, String> oldEnv = System.getenv();
+ try {
+ Map<String, String> env = new HashMap<>(1);
+ env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
+ TestBaseUtils.setEnv(env);
+ // only execute part of the deployment to test for shipped files
+ descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+ } finally {
+ TestBaseUtils.setEnv(oldEnv);
+ }
+
+ // only add the ship the folder, not the contents
+ Assert.assertFalse(effectiveShipFiles.contains(libFile));
+ Assert.assertTrue(effectiveShipFiles.contains(libFolder));
+ Assert.assertFalse(descriptor.shipFiles.contains(libFile));
+ Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
} finally {
- TestBaseUtils.setEnv(oldEnv);
+ descriptor.close();
}
-
- // only add the ship the folder, not the contents
- Assert.assertFalse(effectiveShipFiles.contains(libFile));
- Assert.assertTrue(effectiveShipFiles.contains(libFolder));
- Assert.assertFalse(descriptor.shipFiles.contains(libFile));
- Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 86ddd9b..0372319 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
@@ -35,7 +36,6 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
@@ -108,14 +108,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
private static final int MIN_TM_MEMORY = 768;
- private Configuration conf = new YarnConfiguration();
+ private final YarnConfiguration yarnConfiguration;
- /**
- * If the user has specified a different number of slots, we store them here
- * Files (usually in a distributed file system) used for the YARN session of Flink.
- * Contains configuration files and jar files.
- */
- private Path sessionFilesDir;
+ private final YarnClient yarnClient;
private String yarnQueue;
@@ -128,7 +123,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
/** Lazily initialized list of files to ship. */
protected List<File> shipFiles = new LinkedList<>();
- private final org.apache.flink.configuration.Configuration flinkConfiguration;
+ private final Configuration flinkConfiguration;
private boolean detached;
@@ -143,31 +138,48 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
private YarnConfigOptions.UserJarInclusion userJarInclusion;
public AbstractYarnClusterDescriptor(
- org.apache.flink.configuration.Configuration flinkConfiguration,
- String configurationDirectory) {
+ Configuration flinkConfiguration,
+ String configurationDirectory,
+ YarnClient yarnClient) {
+
+ yarnConfiguration = new YarnConfiguration();
+
// for unit tests only
if (System.getenv("IN_TESTS") != null) {
try {
- conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
+ yarnConfiguration.addResource(new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml").toURI().toURL());
} catch (Throwable t) {
throw new RuntimeException("Error", t);
}
}
+ this.yarnClient = Preconditions.checkNotNull(yarnClient);
+ yarnClient.init(yarnConfiguration);
+ yarnClient.start();
+
this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
}
+ public YarnClient getYarnClient() {
+ return yarnClient;
+ }
+
/**
- * The class to bootstrap the application master of the Yarn cluster (runs main method).
+ * The class to start the application master with. This class runs the main
+ * method in case of session cluster.
*/
protected abstract String getYarnSessionClusterEntrypoint();
+ /**
+ * The class to start the application master with. This class runs the main
+ * method in case of the job cluster.
+ */
protected abstract String getYarnJobClusterEntrypoint();
- public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
+ public Configuration getFlinkConfiguration() {
return flinkConfiguration;
}
@@ -257,7 +269,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// Check if we don't exceed YARN's maximum virtual cores.
// The number of cores can be configured in the config.
// If not configured, it is set to the number of task slots
- int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
+ int numYarnVcores = yarnConfiguration.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of vcores
if (configuredVcores > numYarnVcores) {
@@ -304,21 +316,22 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
this.zookeeperNamespace = zookeeperNamespace;
}
- /**
- * Gets a Hadoop Yarn client.
- * @return Returns a YarnClient which has to be shutdown manually
- */
- public YarnClient getYarnClient() {
- YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
- yarnClient.start();
- return yarnClient;
+ // -------------------------------------------------------------
+ // Lifecycle management
+ // -------------------------------------------------------------
+
+ @Override
+ public void close() {
+ yarnClient.stop();
}
+ // -------------------------------------------------------------
+ // ClusterClient overrides
+ // -------------------------------------------------------------
+
@Override
public YarnClusterClient retrieve(String applicationID) {
- YarnClient yarnClient = null;
try {
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null &&
@@ -329,7 +342,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
- yarnClient = getYarnClient();
final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
@@ -349,14 +361,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
this,
-1, // we don't know the number of task managers of a started Flink cluster
-1, // we don't know how many slots each task manager has for a started Flink cluster
- yarnClient,
appReport,
flinkConfiguration,
false);
} catch (Exception e) {
- if (null != yarnClient) {
- yarnClient.stop();
- }
throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
}
}
@@ -414,8 +422,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
isReadyForDeployment(clusterSpecification);
- final YarnClient yarnClient = getYarnClient();
-
// ------------------ Check if the specified queue exists --------------------
checkYarnQueues(yarnClient);
@@ -442,7 +448,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
}
- final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+ final int yarnMinAllocationMB = yarnConfiguration.getInt("yarn.scheduler.minimum-allocation-mb", 0);
final ClusterSpecification validClusterSpecification;
try {
@@ -477,7 +483,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
this,
clusterSpecification.getNumberTaskManagers(),
clusterSpecification.getSlotsPerTaskManager(),
- yarnClient,
report,
flinkConfiguration,
true);
@@ -627,7 +632,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// initialize file system
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
- final FileSystem fs = FileSystem.get(conf);
+ final FileSystem fs = FileSystem.get(yarnConfiguration);
final Path homeDir = fs.getHomeDirectory();
// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
@@ -881,7 +886,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container..");
- Utils.setTokensFor(amContainer, paths, conf);
+ Utils.setTokensFor(amContainer, paths, yarnConfiguration);
}
amContainer.setLocalResources(localResources);
@@ -926,7 +931,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
// set classpath from YARN configuration
- Utils.setupYarnClassPath(conf, appMasterEnv);
+ Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
amContainer.setEnvironment(appMasterEnv);
@@ -1196,7 +1201,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
- YarnClient yarnClient = getYarnClient();
YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
@@ -1223,7 +1227,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
}
- yarnClient.stop();
return baos.toString();
} catch (Exception e) {
throw new RuntimeException("Couldn't get cluster description", e);
@@ -1411,7 +1414,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
failSessionDuringDeployment(yarnClient, yarnApplication);
LOG.info("Deleting files in {}.", yarnFilesDir);
try {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(yarnConfiguration);
if (!fs.delete(yarnFilesDir, true)) {
throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
@@ -1419,7 +1422,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
fs.close();
} catch (IOException e) {
- LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
+ LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
}
}
}
@@ -1525,7 +1528,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
AbstractYarnClusterDescriptor descriptor,
int numberTaskManagers,
int slotsPerTaskManager,
- YarnClient yarnClient,
ApplicationReport report,
org.apache.flink.configuration.Configuration flinkConfiguration,
boolean perJobCluster) throws Exception {
@@ -1533,7 +1535,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
descriptor,
numberTaskManagers,
slotsPerTaskManager,
- yarnClient,
report,
flinkConfiguration,
perJobCluster);
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 80d0943..63421f9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -43,7 +43,6 @@ import akka.util.Timeout;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,8 +66,6 @@ public class YarnClusterClient extends ClusterClient {
private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
- private YarnClient yarnClient;
-
private Thread clientShutdownHook = new ClientShutdownHook();
//---------- Class internal fields -------------------
@@ -93,7 +90,6 @@ public class YarnClusterClient extends ClusterClient {
* @param clusterDescriptor The descriptor used at cluster creation
* @param numberTaskManagers The number of task managers, -1 if unknown
* @param slotsPerTaskManager Slots per task manager, -1 if unknown
- * @param yarnClient Client to talk to YARN
* @param appReport the YARN application ID
* @param flinkConfig Flink configuration
* @param newlyCreatedCluster Indicator whether this cluster has just been created
@@ -104,7 +100,6 @@ public class YarnClusterClient extends ClusterClient {
final AbstractYarnClusterDescriptor clusterDescriptor,
final int numberTaskManagers,
final int slotsPerTaskManager,
- final YarnClient yarnClient,
final ApplicationReport appReport,
Configuration flinkConfig,
boolean newlyCreatedCluster) throws Exception {
@@ -115,7 +110,6 @@ public class YarnClusterClient extends ClusterClient {
this.clusterDescriptor = clusterDescriptor;
this.numberTaskManagers = numberTaskManagers;
this.slotsPerTaskManager = slotsPerTaskManager;
- this.yarnClient = yarnClient;
this.appReport = appReport;
this.appId = appReport.getApplicationId();
this.trackingURL = appReport.getTrackingUrl();
@@ -328,7 +322,7 @@ public class YarnClusterClient extends ClusterClient {
Future<Object> response =
Patterns.ask(applicationClient.get(),
new YarnMessages.LocalStopYarnSession(ApplicationStatus.CANCELED,
- "Flink YARN Client requested shutdown"),
+ "Flink YARN Client requested shutdown"),
new Timeout(akkaDuration));
Await.ready(response, akkaDuration);
} catch (Exception e) {
@@ -349,7 +343,7 @@ public class YarnClusterClient extends ClusterClient {
}
try {
- ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ ApplicationReport appReport = clusterDescriptor.getYarnClient().getApplicationReport(appId);
LOG.info("Application " + appId + " finished with state " + appReport
.getYarnApplicationState() + " and final state " + appReport
@@ -368,10 +362,6 @@ public class YarnClusterClient extends ClusterClient {
} catch (Exception e) {
LOG.warn("Couldn't get final report", e);
}
-
- LOG.info("YARN Client is shutting down");
- yarnClient.stop(); // actorRunner is using the yarnClient.
- yarnClient = null; // set null to clearly see if somebody wants to access it afterwards.
}
public boolean hasBeenShutdown() {
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 8759c3e..76f9154 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -22,13 +22,18 @@ import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
/**
* Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}.
*/
public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
- public YarnClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
- super(flinkConfiguration, configurationDirectory);
+ public YarnClusterDescriptor(
+ Configuration flinkConfiguration,
+ String configurationDirectory,
+ YarnClient yarnClient) {
+ super(flinkConfiguration, configurationDirectory, yarnClient);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index ed04523..6ce192c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -22,14 +22,19 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
/**
* Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the
* new application master for a job under flip-6.
*/
public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
- public YarnClusterDescriptorV2(Configuration flinkConfiguration, String configurationDirectory) {
- super(flinkConfiguration, configurationDirectory);
+ public YarnClusterDescriptorV2(
+ Configuration flinkConfiguration,
+ String configurationDirectory,
+ YarnClient yarnCLient) {
+ super(flinkConfiguration, configurationDirectory, yarnCLient);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 5483758..c045082 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -50,6 +50,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -862,10 +863,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
}
protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) {
+ final YarnClient yarnClient = YarnClient.createYarnClient();
if (flip6) {
- return new YarnClusterDescriptorV2(configuration, configurationDirectory);
+ return new YarnClusterDescriptorV2(configuration, configurationDirectory, yarnClient);
} else {
- return new YarnClusterDescriptor(configuration, configurationDirectory);
+ return new YarnClusterDescriptor(configuration, configurationDirectory, yarnClient);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index c11c413..0d1bf65 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -59,9 +60,12 @@ public class YarnClusterDescriptorTest extends TestLogger {
@Test
public void testFailIfTaskSlotsHigherThanMaxVcores() {
+ final YarnClient yarnClient = YarnClient.createYarnClient();
+
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
new Configuration(),
- temporaryFolder.getRoot().getAbsolutePath());
+ temporaryFolder.getRoot().getAbsolutePath(),
+ yarnClient);
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
@@ -81,6 +85,8 @@ public class YarnClusterDescriptorTest extends TestLogger {
if (!(e.getCause() instanceof IllegalConfigurationException)) {
throw e;
}
+ } finally {
+ clusterDescriptor.close();
}
}
@@ -90,9 +96,12 @@ public class YarnClusterDescriptorTest extends TestLogger {
// overwrite vcores in config
configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
+ final YarnClient yarnClient = YarnClient.createYarnClient();
+
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration,
- temporaryFolder.getRoot().getAbsolutePath());
+ temporaryFolder.getRoot().getAbsolutePath(),
+ yarnClient);
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
@@ -113,15 +122,19 @@ public class YarnClusterDescriptorTest extends TestLogger {
if (!(e.getCause() instanceof IllegalConfigurationException)) {
throw e;
}
+ } finally {
+ clusterDescriptor.close();
}
}
@Test
public void testSetupApplicationMasterContainer() {
Configuration cfg = new Configuration();
+ final YarnClient yarnClient = YarnClient.createYarnClient();
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
cfg,
- temporaryFolder.getRoot().getAbsolutePath());
+ temporaryFolder.getRoot().getAbsolutePath(),
+ yarnClient);
final String java = "$JAVA_HOME/bin/java";
final String jvmmem = "-Xmx424m";
@@ -142,219 +155,223 @@ public class YarnClusterDescriptorTest extends TestLogger {
"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err";
final int jobManagerMemory = 1024;
- // no logging, with/out krb5
- assertEquals(
- java + " " + jvmmem +
- " " + // jvmOpts
- " " + // logging
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- false,
- false,
- false,
- jobManagerMemory)
- .getCommands().get(0));
-
- assertEquals(
- java + " " + jvmmem +
- " " + " " + krb5 + // jvmOpts
- " " + // logging
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- false,
- false,
- true,
- jobManagerMemory)
- .getCommands().get(0));
-
- // logback only, with/out krb5
- assertEquals(
- java + " " + jvmmem +
- " " + // jvmOpts
- " " + logfile + " " + logback +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- true,
- false,
- false,
- jobManagerMemory)
- .getCommands().get(0));
-
- assertEquals(
- java + " " + jvmmem +
- " " + " " + krb5 + // jvmOpts
- " " + logfile + " " + logback +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- true,
- false,
- true,
- jobManagerMemory)
- .getCommands().get(0));
-
- // log4j, with/out krb5
- assertEquals(
- java + " " + jvmmem +
- " " + // jvmOpts
- " " + logfile + " " + log4j +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- false,
- true,
- false,
- jobManagerMemory)
- .getCommands().get(0));
-
- assertEquals(
- java + " " + jvmmem +
- " " + " " + krb5 + // jvmOpts
- " " + logfile + " " + log4j +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- false,
- true,
- true,
- jobManagerMemory)
- .getCommands().get(0));
-
- // logback + log4j, with/out krb5
- assertEquals(
- java + " " + jvmmem +
- " " + // jvmOpts
- " " + logfile + " " + logback + " " + log4j +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- true,
- true,
- false,
- jobManagerMemory)
- .getCommands().get(0));
-
- assertEquals(
- java + " " + jvmmem +
- " " + " " + krb5 + // jvmOpts
- " " + logfile + " " + logback + " " + log4j +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- true,
- true,
- true,
- jobManagerMemory)
- .getCommands().get(0));
-
- // logback + log4j, with/out krb5, different JVM opts
- // IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor,
- // because we have a reference to the ClusterDescriptor's configuration which we modify continuously
- cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
- assertEquals(
- java + " " + jvmmem +
- " " + jvmOpts +
- " " + logfile + " " + logback + " " + log4j +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- true,
- true,
- false,
- jobManagerMemory)
- .getCommands().get(0));
-
- assertEquals(
- java + " " + jvmmem +
- " " + jvmOpts + " " + krb5 + // jvmOpts
- " " + logfile + " " + logback + " " + log4j +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- true,
- true,
- true,
- jobManagerMemory)
- .getCommands().get(0));
-
- // logback + log4j, with/out krb5, different JVM opts
- // IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
- cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
- assertEquals(
- java + " " + jvmmem +
- " " + jvmOpts + " " + jmJvmOpts +
- " " + logfile + " " + logback + " " + log4j +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- true,
- true,
- false,
- jobManagerMemory)
- .getCommands().get(0));
-
- assertEquals(
- java + " " + jvmmem +
- " " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
- " " + logfile + " " + logback + " " + log4j +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- true,
- true,
- true,
- jobManagerMemory)
- .getCommands().get(0));
-
- // now try some configurations with different yarn.container-start-command-template
- // IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
- cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
- "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
- assertEquals(
- java + " 1 " + jvmmem +
- " 2 " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
- " 3 " + logfile + " " + logback + " " + log4j +
- " 4 " + mainClass + " 5 " + args + " 6 " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- true,
- true,
- true,
- jobManagerMemory)
- .getCommands().get(0));
-
- cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
- "%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
- // IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
- assertEquals(
- java +
- " " + logfile + " " + logback + " " + log4j +
- " " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
- " " + jvmmem +
- " " + mainClass + " " + args + " " + redirects,
- clusterDescriptor
- .setupApplicationMasterContainer(
- mainClass,
- true,
- true,
- true,
- jobManagerMemory)
- .getCommands().get(0));
+ try {
+ // no logging, with/out krb5
+ assertEquals(
+ java + " " + jvmmem +
+ " " + // jvmOpts
+ " " + // logging
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ false,
+ false,
+ false,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ assertEquals(
+ java + " " + jvmmem +
+ " " + " " + krb5 + // jvmOpts
+ " " + // logging
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ false,
+ false,
+ true,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ // logback only, with/out krb5
+ assertEquals(
+ java + " " + jvmmem +
+ " " + // jvmOpts
+ " " + logfile + " " + logback +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ true,
+ false,
+ false,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ assertEquals(
+ java + " " + jvmmem +
+ " " + " " + krb5 + // jvmOpts
+ " " + logfile + " " + logback +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ true,
+ false,
+ true,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ // log4j, with/out krb5
+ assertEquals(
+ java + " " + jvmmem +
+ " " + // jvmOpts
+ " " + logfile + " " + log4j +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ false,
+ true,
+ false,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ assertEquals(
+ java + " " + jvmmem +
+ " " + " " + krb5 + // jvmOpts
+ " " + logfile + " " + log4j +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ false,
+ true,
+ true,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ // logback + log4j, with/out krb5
+ assertEquals(
+ java + " " + jvmmem +
+ " " + // jvmOpts
+ " " + logfile + " " + logback + " " + log4j +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ true,
+ true,
+ false,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ assertEquals(
+ java + " " + jvmmem +
+ " " + " " + krb5 + // jvmOpts
+ " " + logfile + " " + logback + " " + log4j +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ true,
+ true,
+ true,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ // logback + log4j, with/out krb5, different JVM opts
+ // IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor,
+ // because we have a reference to the ClusterDescriptor's configuration which we modify continuously
+ cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
+ assertEquals(
+ java + " " + jvmmem +
+ " " + jvmOpts +
+ " " + logfile + " " + logback + " " + log4j +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ true,
+ true,
+ false,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ assertEquals(
+ java + " " + jvmmem +
+ " " + jvmOpts + " " + krb5 + // jvmOpts
+ " " + logfile + " " + logback + " " + log4j +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ true,
+ true,
+ true,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ // logback + log4j, with/out krb5, different JVM opts
+ // IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
+ cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
+ assertEquals(
+ java + " " + jvmmem +
+ " " + jvmOpts + " " + jmJvmOpts +
+ " " + logfile + " " + logback + " " + log4j +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ true,
+ true,
+ false,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ assertEquals(
+ java + " " + jvmmem +
+ " " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
+ " " + logfile + " " + logback + " " + log4j +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ true,
+ true,
+ true,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ // now try some configurations with different yarn.container-start-command-template
+ // IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
+ cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+ "%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
+ assertEquals(
+ java + " 1 " + jvmmem +
+ " 2 " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
+ " 3 " + logfile + " " + logback + " " + log4j +
+ " 4 " + mainClass + " 5 " + args + " 6 " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ true,
+ true,
+ true,
+ jobManagerMemory)
+ .getCommands().get(0));
+
+ cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+ "%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
+ // IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
+ assertEquals(
+ java +
+ " " + logfile + " " + logback + " " + log4j +
+ " " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
+ " " + jvmmem +
+ " " + mainClass + " " + args + " " + redirects,
+ clusterDescriptor
+ .setupApplicationMasterContainer(
+ mainClass,
+ true,
+ true,
+ true,
+ jobManagerMemory)
+ .getCommands().get(0));
+ } finally {
+ clusterDescriptor.close();
+ }
}
}