You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/10/08 14:23:59 UTC
[3/4] flink git commit: [FLINK-2790] [yarn] [ha] Add high
availability support for Yarn
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 5c5a465..c7f7698 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -100,36 +100,14 @@ class ForkableFlinkMiniCluster(
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
}
- val (executionContext,
- instanceManager,
- scheduler,
- libraryCacheManager,
- _,
- executionRetries,
- delayBetweenRetries,
- timeout,
- archiveCount,
- leaderElectionService) = JobManager.createJobManagerComponents(config)
- val testArchiveProps = Props(
- new TestingMemoryArchivist(archiveCount))
-
- val archiver = actorSystem.actorOf(testArchiveProps, archiveName)
-
- val jobManagerProps = Props(
- new TestingJobManager(
- configuration,
- executionContext,
- instanceManager,
- scheduler,
- libraryCacheManager,
- archiver,
- executionRetries,
- delayBetweenRetries,
- timeout,
- streamingMode,
- leaderElectionService))
-
- val jobManager = actorSystem.actorOf(jobManagerProps, jobManagerName)
+ val (jobManager, _) = JobManager.startJobManagerActors(
+ config,
+ actorSystem,
+ Some(jobManagerName),
+ Some(archiveName),
+ streamingMode,
+ classOf[TestingJobManager],
+ classOf[TestingMemoryArchivist])
jobManager
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
index affe134..7e16baf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractProcessFailureRecoveryTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -125,7 +126,12 @@ public abstract class AbstractProcessFailureRecoveryTest extends TestLogger {
jmConfig.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "10 s");
jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
- ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem, StreamingMode.STREAMING)._1();
+ ActorRef jmActor = JobManager.startJobManagerActors(
+ jmConfig,
+ jmActorSystem,
+ StreamingMode.STREAMING,
+ JobManager.class,
+ MemoryArchivist.class)._1();
// the TaskManager java command
String[] command = new String[] {
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index ac308dd..945a78c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.NetUtils;
@@ -100,7 +101,12 @@ public class ProcessFailureCancelingITCase {
jmConfig.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "10 s");
jmActorSystem = AkkaUtils.createActorSystem(jmConfig, new Some<Tuple2<String, Object>>(localAddress));
- ActorRef jmActor = JobManager.startJobManagerActors(jmConfig, jmActorSystem, StreamingMode.BATCH_ONLY)._1();
+ ActorRef jmActor = JobManager.startJobManagerActors(
+ jmConfig,
+ jmActorSystem,
+ StreamingMode.BATCH_ONLY,
+ JobManager.class,
+ MemoryArchivist.class)._1();
// the TaskManager java command
String[] command = new String[] {
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 2444ee4..2d251d9 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -101,6 +101,11 @@ under the License.
<version>${guava.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_${scala.binary.version}</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
@@ -177,6 +182,140 @@ under the License.
</executions>
</plugin>
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.1.4</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xms128m</jvmArg>
+ <jvmArg>-Xmx512m</jvmArg>
+ </jvmArgs>
+ </configuration>
+ </plugin>
+
+ <!-- Eclipse Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+
+ <!-- Adding scala source directories to build path -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <!-- Add src/test/scala to eclipse build path -->
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+ <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+ <configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+ <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+ <outputEncoding>UTF-8</outputEncoding>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
new file mode 100644
index 0000000..fb644c3
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FlinkYarnSessionCliTest {
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ @Test
+ public void testDynamicProperties() throws IOException {
+
+ Map<String, String> map = new HashMap<String, String>(System.getenv());
+ File tmpFolder = tmp.newFolder();
+ File fakeConf = new File(tmpFolder, "flink-conf.yaml");
+ fakeConf.createNewFile();
+ map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
+ TestBaseUtils.setEnv(map);
+ Options options = new Options();
+ FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "");
+ cli.getYARNSessionCLIOptions(options);
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});
+ } catch(Exception e) {
+ e.printStackTrace();
+ Assert.fail("Parsing failed with " + e.getMessage());
+ }
+
+ AbstractFlinkYarnClient flinkYarnClient = cli.createFlinkYarnClient(cmd);
+
+ Assert.assertNotNull(flinkYarnClient);
+
+ List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded());
+ Assert.assertEquals(1, dynProperties.size());
+ Assert.assertEquals("akka.ask.timeout", dynProperties.get(0).f0);
+ Assert.assertEquals("5 min", dynProperties.get(0).f1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
new file mode 100644
index 0000000..6671eb4
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingApplicationMaster.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+
+/**
+ * Yarn application master which starts the {@link TestingYarnJobManager} and the
+ * {@link TestingMemoryArchivist}.
+ */
+public class TestingApplicationMaster extends ApplicationMasterBase {
+ @Override
+ public Class<? extends JobManager> getJobManagerClass() {
+ return TestingYarnJobManager.class;
+ }
+
+ @Override
+ public Class<? extends MemoryArchivist> getArchivistClass() {
+ return TestingMemoryArchivist.class;
+ }
+
+ public static void main(String[] args) {
+ TestingApplicationMaster applicationMaster = new TestingApplicationMaster();
+
+ applicationMaster.run(args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
new file mode 100644
index 0000000..1efc336
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingFlinkYarnClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Yarn client which starts a {@link TestingApplicationMaster}. Additionally the client adds the
+ * flink-yarn-tests-XXX-tests.jar and the flink-runtime-XXX-tests.jar to the set of files which
+ * are shipped to the yarn cluster. This is necessary to load the testing classes.
+ */
+public class TestingFlinkYarnClient extends FlinkYarnClientBase {
+
+ public TestingFlinkYarnClient() {
+ List<File> filesToShip = new ArrayList<>();
+
+ File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));
+ Preconditions.checkNotNull(testingJar, "Could not find the flink-yarn-tests tests jar. " +
+ "Make sure to package the flink-yarn-tests module.");
+
+ File testingRuntimeJar = YarnTestBase.findFile("..", new TestJarFinder("flink-runtime"));
+ Preconditions.checkNotNull(testingRuntimeJar, "Could not find the flink-runtime tests " +
+ "jar. Make sure to package the flink-runtime module.");
+
+ filesToShip.add(testingJar);
+ filesToShip.add(testingRuntimeJar);
+
+ setShipFiles(filesToShip);
+ }
+
+ @Override
+ protected Class<?> getApplicationMasterClass() {
+ return TestingApplicationMaster.class;
+ }
+
+ public static class TestJarFinder implements FilenameFilter {
+
+ private final String jarName;
+
+ public TestJarFinder(final String jarName) {
+ this.jarName = jarName;
+ }
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith(jarName) && name.endsWith("-tests.jar") &&
+ dir.getAbsolutePath().contains(dir.separator + jarName + dir.separator);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
new file mode 100644
index 0000000..8586a77
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/TestingYarnTaskManagerRunner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import java.io.IOException;
+
+/**
+ * Yarn TaskManager runner which starts a {@link TestingYarnTaskManager}.
+ */
+public class TestingYarnTaskManagerRunner {
+ public static void main(String[] args) throws IOException {
+ YarnTaskManagerRunner.runYarnTaskManager(args, TestingYarnTaskManager.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
index 1e9e4fa..5c709b0 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/UtilsTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.flink.yarn;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
@@ -35,7 +37,7 @@ public class UtilsTest {
@Test
public void testUberjarLocator() {
- File dir = YarnTestBase.findFile(".", new YarnTestBase.RootDirFilenameFilter());
+ File dir = YarnTestBase.findFile("..", new YarnTestBase.RootDirFilenameFilter());
Assert.assertNotNull(dir);
Assert.assertTrue(dir.getName().endsWith(".jar"));
dir = dir.getParentFile().getParentFile(); // from uberjar to lib to root
@@ -47,6 +49,54 @@ public class UtilsTest {
Assert.assertTrue(files.contains("conf"));
}
+ /**
+ * Remove 15% of the heap, at least 384MB.
+ *
+ */
+ @Test
+ public void testHeapCutoff() {
+ Configuration conf = new Configuration();
+ conf.setDouble(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, 0.15);
+ conf.setInteger(ConfigConstants.YARN_HEAP_CUTOFF_MIN, 384);
+
+ Assert.assertEquals(616, Utils.calculateHeapSize(1000, conf) );
+ Assert.assertEquals(8500, Utils.calculateHeapSize(10000, conf) );
+
+ // test different configuration
+ Assert.assertEquals(3400, Utils.calculateHeapSize(4000, conf) );
+
+ conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "1000");
+ conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.1");
+ Assert.assertEquals(3000, Utils.calculateHeapSize(4000, conf));
+
+ conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "0.5");
+ Assert.assertEquals(2000, Utils.calculateHeapSize(4000, conf));
+
+ conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1");
+ Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void illegalArgument() {
+ Configuration conf = new Configuration();
+ conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "1.1");
+ Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void illegalArgumentNegative() {
+ Configuration conf = new Configuration();
+ conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_RATIO, "-0.01");
+ Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void tooMuchCutoff() {
+ Configuration conf = new Configuration();
+ conf.setString(ConfigConstants.YARN_HEAP_CUTOFF_MIN, "6000");
+ Assert.assertEquals(0, Utils.calculateHeapSize(4000, conf));
+ }
+
//
// --------------- Tools to test if a certain string has been logged with Log4j. -------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
new file mode 100644
index 0000000..94d0a81
--- /dev/null
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.testkit.JavaTestKit;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+public class YARNHighAvailabilityITCase extends YarnTestBase {
+
+ private static TestingServer zkServer;
+
+ private static ActorSystem actorSystem;
+
+ private static final int numberApplicationAttempts = 10;
+
+ @BeforeClass
+ public static void setup() {
+ actorSystem = AkkaUtils.createDefaultActorSystem();
+
+ try {
+ zkServer = new TestingServer();
+ zkServer.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Could not start ZooKeeper testing cluster.");
+ }
+
+ yarnConfiguration.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
+ yarnConfiguration.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts);
+
+ startYARNWithConfig(yarnConfiguration);
+ }
+
+ @AfterClass
+ public static void teardown() throws IOException {
+ if(zkServer != null) {
+ zkServer.stop();
+ }
+
+ JavaTestKit.shutdownActorSystem(actorSystem);
+ actorSystem = null;
+ }
+
+ /**
+ * Tests that the application master can be killed multiple times and that the surviving
+ * TaskManager succesfully reconnects to the newly started JobManager.
+ * @throws Exception
+ */
+ @Test
+ public void testMultipleAMKill() throws Exception {
+ final int numberKillingAttempts = numberApplicationAttempts - 1;
+
+ TestingFlinkYarnClient flinkYarnClient = new TestingFlinkYarnClient();
+
+ Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
+ flinkYarnClient.setTaskManagerCount(1);
+ flinkYarnClient.setJobManagerMemory(768);
+ flinkYarnClient.setTaskManagerMemory(1024);
+ flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+ flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+
+ String confDirPath = System.getenv("FLINK_CONF_DIR");
+ flinkYarnClient.setConfigurationDirectory(confDirPath);
+
+ flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
+ flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" +
+ zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts);
+ flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
+
+ AbstractFlinkYarnCluster yarnCluster = null;
+
+ final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
+
+ try {
+ yarnCluster = flinkYarnClient.deploy();
+ yarnCluster.connectToCluster();
+ final Configuration config = yarnCluster.getFlinkConfiguration();
+
+ new JavaTestKit(actorSystem) {{
+ for (int attempt = 0; attempt < numberKillingAttempts; attempt++) {
+ new Within(timeout) {
+ @Override
+ protected void run() {
+ try {
+ LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+ ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+ ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
+
+ gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
+
+ expectMsgEquals(Messages.getAcknowledge());
+
+ gateway.tell(PoisonPill.getInstance());
+ } catch (Exception e) {
+ throw new AssertionError("Could not complete test.", e);
+ }
+ }
+ };
+ }
+
+ new Within(timeout) {
+ @Override
+ protected void run() {
+ try {
+ LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);
+ ActorGateway gateway2 = LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, timeout);
+ ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway2.leaderSessionID());
+ gateway2.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
+
+ expectMsgEquals(Messages.getAcknowledge());
+ } catch (Exception e) {
+ throw new AssertionError("Could not complete test.", e);
+ }
+ }
+ };
+
+ }};
+ } finally {
+ if (yarnCluster != null) {
+ yarnCluster.shutdown(false);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 9d72c90..c2e9a45 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.yarn.appMaster.YarnTaskManagerRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index b7d0b33..5ce528e 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -356,9 +356,12 @@ public abstract class YarnTestBase extends TestLogger {
}
Map<String, String> map = new HashMap<String, String>(System.getenv());
- File flinkConfFilePath = findFile(flinkDistRootDir, new ContainsName(new String[] {"flink-conf.yaml"}));
- Assert.assertNotNull(flinkConfFilePath);
- map.put("FLINK_CONF_DIR", flinkConfFilePath.getParent());
+
+ File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
+ Assert.assertNotNull(flinkConfDirPath);
+
+ map.put("FLINK_CONF_DIR", flinkConfDirPath.getParent());
+
File yarnConfFile = writeYarnSiteConfigXML(conf);
map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
map.put("IN_TESTS", "yes we are in tests"); // see FlinkYarnClient() for more infos
@@ -580,11 +583,7 @@ public abstract class YarnTestBase extends TestLogger {
// -------------------------- Tear down -------------------------- //
@AfterClass
- public static void tearDown() {
- /*
- We don't shut down the MiniCluster, as it is prone to blocking infinitely.
- */
-
+ public static void copyOnTravis() {
// When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files)
// to <flinkRoot>/target/flink-yarn-tests-*.
// The files from there are picked up by the ./tools/travis_watchdog.sh script
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/resources/log4j-test.properties b/flink-yarn-tests/src/main/resources/log4j-test.properties
index dc02575..ebe0d37 100644
--- a/flink-yarn-tests/src/main/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/main/resources/log4j-test.properties
@@ -27,5 +27,9 @@ log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x -
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
# log whats going on between the tests
-log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO, console
-log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO, console
+log4j.logger.org.apache.flink.yarn.YARNSessionFIFOITCase=INFO
+log4j.logger.org.apache.flink.yarn.YARNSessionCapacitySchedulerITCase=INFO
+log4j.logger.org.apache.flink.yarn.YarnHighAvailability=INFO
+log4j.logger.org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch=INFO
+log4j.logger.org.apache.flink.runtime.leaderelection=INFO
+log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
new file mode 100644
index 0000000..83d1f3c
--- /dev/null
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import akka.actor.ActorRef
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.testingUtils.TestingJobManagerLike
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.duration.FiniteDuration
+
+/** [[YarnJobManager]] implementation which mixes in the [[TestingJobManagerLike]] mixin.
+ *
+ * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition
+ * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
+ *
+ * @param flinkConfiguration Configuration object for the actor
+ * @param executionContext Execution context which is used to execute concurrent tasks in the
+ * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
+ * @param instanceManager Instance manager to manage the registered
+ * [[org.apache.flink.runtime.taskmanager.TaskManager]]
+ * @param scheduler Scheduler to schedule Flink jobs
+ * @param libraryCacheManager Manager to manage uploaded jar files
+ * @param archive Archive for finished Flink jobs
+ * @param defaultExecutionRetries Number of default execution retries
+ * @param delayBetweenRetries Delay between retries
+ * @param timeout Timeout for futures
+ * @param mode StreamingMode in which the system shall be started
+ * @param leaderElectionService LeaderElectionService to participate in the leader election
+ */
+class TestingYarnJobManager(
+ flinkConfiguration: Configuration,
+ executionContext: ExecutionContext,
+ instanceManager: InstanceManager,
+ scheduler: Scheduler,
+ libraryCacheManager: BlobLibraryCacheManager,
+ archive: ActorRef,
+ defaultExecutionRetries: Int,
+ delayBetweenRetries: Long,
+ timeout: FiniteDuration,
+ mode: StreamingMode,
+ leaderElectionService: LeaderElectionService)
+ extends YarnJobManager(
+ flinkConfiguration,
+ executionContext,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ defaultExecutionRetries,
+ delayBetweenRetries,
+ timeout,
+ mode,
+ leaderElectionService)
+ with TestingJobManagerLike {
+
+ override val taskManagerRunnerClass = classOf[TestingYarnTaskManagerRunner]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
new file mode 100644
index 0000000..11c7f88
--- /dev/null
+++ b/flink-yarn-tests/src/main/scala/org/apache/flink/yarn/TestingYarnTaskManager.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn
+
+import org.apache.flink.runtime.instance.InstanceConnectionInfo
+import org.apache.flink.runtime.io.disk.iomanager.IOManager
+import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
+import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.taskmanager.TaskManagerConfiguration
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerLike
+
+/** [[YarnTaskManager]] implementation which mixes in the [[TestingTaskManagerLike]] mixin.
+ *
+ * This actor class is used for testing purposes on Yarn. Here we use an explicit class definition
+ * instead of an anonymous class with the respective mixin to obtain a more readable logger name.
+ *
+ * @param config Configuration object for the actor
+ * @param connectionInfo Connection information of this actor
+ * @param memoryManager MemoryManager which is responsibel for Flink's managed memory allocation
+ * @param ioManager IOManager responsible for I/O
+ * @param network NetworkEnvironment for this actor
+ * @param numberOfSlots Number of slots for this TaskManager
+ * @param leaderRetrievalService [[LeaderRetrievalService]] to retrieve the current leading
+ * JobManager
+ */
+class TestingYarnTaskManager(
+ config: TaskManagerConfiguration,
+ connectionInfo: InstanceConnectionInfo,
+ memoryManager: MemoryManager,
+ ioManager: IOManager,
+ network: NetworkEnvironment,
+ numberOfSlots: Int,
+ leaderRetrievalService: LeaderRetrievalService)
+ extends YarnTaskManager(
+ config,
+ connectionInfo,
+ memoryManager,
+ ioManager,
+ network,
+ numberOfSlots,
+ leaderRetrievalService)
+ with TestingTaskManagerLike {}
http://git-wip-us.apache.org/repos/asf/flink/blob/fa2bb8f1/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 44eca0d..4225e68 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -17,746 +17,12 @@
*/
package org.apache.flink.yarn;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.FlinkYarnSessionCli;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Records;
-
/**
- * All classes in this package contain code taken from
- * https://github.com/apache/hadoop-common/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?source=cc
- * and
- * https://github.com/hortonworks/simple-yarn-app
- * and
- * https://github.com/yahoo/storm-yarn/blob/master/src/main/java/com/yahoo/storm/yarn/StormOnYarn.java
- *
- * The Flink jar is uploaded to HDFS by this client.
- * The application master and all the TaskManager containers get the jar file downloaded
- * by YARN into their local fs.
- *
+ * Default implementation of {@link FlinkYarnClientBase} which starts an {@link ApplicationMaster}.
*/
-public class FlinkYarnClient extends AbstractFlinkYarnClient {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnClient.class);
-
- /**
- * Constants,
- * all starting with ENV_ are used as environment variables to pass values from the Client
- * to the Application Master.
- */
- public final static String ENV_TM_MEMORY = "_CLIENT_TM_MEMORY";
- public final static String ENV_TM_COUNT = "_CLIENT_TM_COUNT";
- public final static String ENV_APP_ID = "_APP_ID";
- public final static String FLINK_JAR_PATH = "_FLINK_JAR_PATH"; // the Flink jar resource location (in HDFS).
- public static final String ENV_CLIENT_HOME_DIR = "_CLIENT_HOME_DIR";
- public static final String ENV_CLIENT_SHIP_FILES = "_CLIENT_SHIP_FILES";
- public static final String ENV_CLIENT_USERNAME = "_CLIENT_USERNAME";
- public static final String ENV_SLOTS = "_SLOTS";
- public static final String ENV_DETACHED = "_DETACHED";
- public static final String ENV_STREAMING_MODE = "_STREAMING_MODE";
- public static final String ENV_DYNAMIC_PROPERTIES = "_DYNAMIC_PROPERTIES";
-
-
- /**
- * Minimum memory requirements, checked by the Client.
- */
- private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
- private static final int MIN_TM_MEMORY = 768;
-
- private Configuration conf;
- private YarnClient yarnClient;
- private YarnClientApplication yarnApplication;
-
-
- /**
- * Files (usually in a distributed file system) used for the YARN session of Flink.
- * Contains configuration files and jar files.
- */
- private Path sessionFilesDir;
-
- /**
- * If the user has specified a different number of slots, we store them here
- */
- private int slots = -1;
-
- private int jobManagerMemoryMb = 1024;
-
- private int taskManagerMemoryMb = 1024;
-
- private int taskManagerCount = 1;
-
- private String yarnQueue = null;
-
- private String configurationDirectory;
-
- private Path flinkConfigurationPath;
-
- private Path flinkLoggingConfigurationPath; // optional
-
- private Path flinkJarPath;
-
- private String dynamicPropertiesEncoded;
-
- private List<File> shipFiles = new ArrayList<File>();
- private org.apache.flink.configuration.Configuration flinkConfiguration;
-
- private boolean detached;
- private boolean streamingMode;
-
- private String customName = null;
-
- public FlinkYarnClient() {
- conf = new YarnConfiguration();
- if(this.yarnClient == null) {
- // Create yarnClient
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(conf);
- yarnClient.start();
- }
-
- // for unit tests only
- if(System.getenv("IN_TESTS") != null) {
- try {
- conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
- } catch (Throwable t) {
- throw new RuntimeException("Error",t);
- }
- }
- }
-
- @Override
- public void setJobManagerMemory(int memoryMb) {
- if(memoryMb < MIN_JM_MEMORY) {
- throw new IllegalArgumentException("The JobManager memory (" + memoryMb + ") is below the minimum required memory amount "
- + "of " + MIN_JM_MEMORY+ " MB");
- }
- this.jobManagerMemoryMb = memoryMb;
- }
-
- @Override
- public void setTaskManagerMemory(int memoryMb) {
- if(memoryMb < MIN_TM_MEMORY) {
- throw new IllegalArgumentException("The TaskManager memory (" + memoryMb + ") is below the minimum required memory amount "
- + "of " + MIN_TM_MEMORY+ " MB");
- }
- this.taskManagerMemoryMb = memoryMb;
- }
-
- @Override
- public void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf) {
- this.flinkConfiguration = conf;
- }
-
- @Override
- public void setTaskManagerSlots(int slots) {
- if(slots <= 0) {
- throw new IllegalArgumentException("Number of TaskManager slots must be positive");
- }
- this.slots = slots;
- }
-
- @Override
- public int getTaskManagerSlots() {
- return this.slots;
- }
-
+public class FlinkYarnClient extends FlinkYarnClientBase {
@Override
- public void setQueue(String queue) {
- this.yarnQueue = queue;
+ protected Class<?> getApplicationMasterClass() {
+ return ApplicationMaster.class;
}
-
- @Override
- public void setLocalJarPath(Path localJarPath) {
- if(!localJarPath.toString().endsWith("jar")) {
- throw new IllegalArgumentException("The passed jar path ('" + localJarPath + "') does not end with the 'jar' extension");
- }
- this.flinkJarPath = localJarPath;
- }
-
- @Override
- public void setConfigurationFilePath(Path confPath) {
- flinkConfigurationPath = confPath;
- }
-
- public void setConfigurationDirectory(String configurationDirectory) {
- this.configurationDirectory = configurationDirectory;
- }
-
- @Override
- public void setFlinkLoggingConfigurationPath(Path logConfPath) {
- flinkLoggingConfigurationPath = logConfPath;
- }
-
- @Override
- public Path getFlinkLoggingConfigurationPath() {
- return flinkLoggingConfigurationPath;
- }
-
- @Override
- public void setTaskManagerCount(int tmCount) {
- if(tmCount < 1) {
- throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
- }
- this.taskManagerCount = tmCount;
- }
-
- @Override
- public int getTaskManagerCount() {
- return this.taskManagerCount;
- }
-
- @Override
- public void setShipFiles(List<File> shipFiles) {
- File shipFile;
- for (File shipFile1 : shipFiles) {
- shipFile = shipFile1;
- // remove uberjar from ship list (by default everything in the lib/ folder is added to
- // the list of files to ship, but we handle the uberjar separately.
- if (!(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar"))) {
- this.shipFiles.add(shipFile);
- }
- }
- }
-
- public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
- this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
- }
-
- @Override
- public String getDynamicPropertiesEncoded() {
- return this.dynamicPropertiesEncoded;
- }
-
-
- public void isReadyForDepoyment() throws YarnDeploymentException {
- if(taskManagerCount <= 0) {
- throw new YarnDeploymentException("Taskmanager count must be positive");
- }
- if(this.flinkJarPath == null) {
- throw new YarnDeploymentException("The Flink jar path is null");
- }
- if(this.configurationDirectory == null) {
- throw new YarnDeploymentException("Configuration directory not set");
- }
- if(this.flinkConfigurationPath == null) {
- throw new YarnDeploymentException("Configuration path not set");
- }
- if(this.flinkConfiguration == null) {
- throw new YarnDeploymentException("Flink configuration object has not been set");
- }
-
- // check if required Hadoop environment variables are set. If not, warn user
- if(System.getenv("HADOOP_CONF_DIR") == null &&
- System.getenv("YARN_CONF_DIR") == null) {
- LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set." +
- "The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
- "configuration for accessing YARN.");
- }
- }
-
- public static boolean allocateResource(int[] nodeManagers, int toAllocate) {
- for(int i = 0; i < nodeManagers.length; i++) {
- if(nodeManagers[i] >= toAllocate) {
- nodeManagers[i] -= toAllocate;
- return true;
- }
- }
- return false;
- }
-
- @Override
- public void setDetachedMode(boolean detachedMode) {
- this.detached = detachedMode;
- }
-
- @Override
- public boolean isDetached() {
- return detached;
- }
-
- public AbstractFlinkYarnCluster deploy() throws Exception {
-
- UserGroupInformation.setConfiguration(conf);
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- if (!ugi.hasKerberosCredentials()) {
- throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. " +
- "You may use kinit to authenticate and request a TGT from the Kerberos server.");
- }
- return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
- @Override
- public AbstractFlinkYarnCluster run() throws Exception {
- return deployInternal();
- }
- });
- } else {
- return deployInternal();
- }
- }
-
-
-
- /**
- * This method will block until the ApplicationMaster/JobManager have been
- * deployed on YARN.
- */
- protected AbstractFlinkYarnCluster deployInternal() throws Exception {
- isReadyForDepoyment();
-
- LOG.info("Using values:");
- LOG.info("\tTaskManager count = {}", taskManagerCount);
- LOG.info("\tJobManager memory = {}", jobManagerMemoryMb);
- LOG.info("\tTaskManager memory = {}", taskManagerMemoryMb);
-
- // Create application via yarnClient
- yarnApplication = yarnClient.createApplication();
- GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
-
- // ------------------ Add dynamic properties to local flinkConfiguraton ------
-
- List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded);
- for (Tuple2<String, String> dynProperty : dynProperties) {
- flinkConfiguration.setString(dynProperty.f0, dynProperty.f1);
- }
-
- // ------------------ Check if the specified queue exists --------------
-
- try {
- List<QueueInfo> queues = yarnClient.getAllQueues();
- if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
- boolean queueFound = false;
- for (QueueInfo queue : queues) {
- if (queue.getQueueName().equals(this.yarnQueue)) {
- queueFound = true;
- break;
- }
- }
- if (!queueFound) {
- String queueNames = "";
- for (QueueInfo queue : queues) {
- queueNames += queue.getQueueName() + ", ";
- }
- LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
- "Available queues: " + queueNames);
- }
- } else {
- LOG.debug("The YARN cluster does not have any queues configured");
- }
- } catch(Throwable e) {
- LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Error details", e);
- }
- }
-
- // ------------------ Check if the YARN Cluster has the requested resources --------------
-
- // the yarnMinAllocationMB specifies the smallest possible container allocation size.
- // all allocations below this value are automatically set to this value.
- final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
- if(jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
- LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
- + "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
- "YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
- "you requested will start.");
- }
-
- // set the memory to minAllocationMB to do the next checks correctly
- if(jobManagerMemoryMb < yarnMinAllocationMB) {
- jobManagerMemoryMb = yarnMinAllocationMB;
- }
- if(taskManagerMemoryMb < yarnMinAllocationMB) {
- taskManagerMemoryMb = yarnMinAllocationMB;
- }
-
- Resource maxRes = appResponse.getMaximumResourceCapability();
- final String NOTE = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
- if(jobManagerMemoryMb > maxRes.getMemory() ) {
- failSessionDuringDeployment();
- throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
- + "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + NOTE);
- }
-
- if(taskManagerMemoryMb > maxRes.getMemory() ) {
- failSessionDuringDeployment();
- throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
- + "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + NOTE);
- }
-
- final String NOTE_RSC = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
- "connecting from the beginning because the resources are currently not available in the cluster. " +
- "The allocation might take more time than usual because the Flink YARN client needs to wait until " +
- "the resources become available.";
- int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
- ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
- if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
- LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
- + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
-
- }
- if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
- LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
- + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
- }
- if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
- LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
- + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC);
- }
-
- // ----------------- check if the requested containers fit into the cluster.
-
- int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
- // first, allocate the jobManager somewhere.
- if(!allocateResource(nmFree, jobManagerMemoryMb)) {
- LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
- "The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
- Arrays.toString(freeClusterMem.nodeManagersFree) + NOTE_RSC);
- }
- // allocate TaskManagers
- for(int i = 0; i < taskManagerCount; i++) {
- if(!allocateResource(nmFree, taskManagerMemoryMb)) {
- LOG.warn("There is not enough memory available in the YARN cluster. " +
- "The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
- "NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) + "\n" +
- "After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
- "the following NodeManagers are available: " + Arrays.toString(nmFree) + NOTE_RSC );
- }
- }
-
- // ------------------ Prepare Application Master Container ------------------------------
-
- // respect custom JVM options in the YAML file
- final String javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
-
- String logbackFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
- boolean hasLogback = new File(logbackFile).exists();
- String log4jFile = configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
-
- boolean hasLog4j = new File(log4jFile).exists();
- if(hasLogback) {
- shipFiles.add(new File(logbackFile));
- }
- if(hasLog4j) {
- shipFiles.add(new File(log4jFile));
- }
-
- // Set up the container launch context for the application master
- ContainerLaunchContext amContainer = Records
- .newRecord(ContainerLaunchContext.class);
-
- String amCommand = "$JAVA_HOME/bin/java"
- + " -Xmx" + Utils.calculateHeapSize(jobManagerMemoryMb, flinkConfiguration) + "M " +javaOpts;
-
- if(hasLogback || hasLog4j) {
- amCommand += " -Dlog.file=\"" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-main.log\"";
- }
-
- if(hasLogback) {
- amCommand += " -Dlogback.configurationFile=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME;
- }
- if(hasLog4j) {
- amCommand += " -Dlog4j.configuration=file:" + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME;
- }
-
- amCommand += " " + ApplicationMaster.class.getName() + " "
- + " 1>"
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stdout.log"
- + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager-stderr.log";
- amContainer.setCommands(Collections.singletonList(amCommand));
-
- LOG.debug("Application Master start command: " + amCommand);
-
- // intialize HDFS
- // Copy the application master jar to the filesystem
- // Create a local resource to point to the destination jar path
- final FileSystem fs = FileSystem.get(conf);
-
- // hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
- if( !fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
- fs.getScheme().startsWith("file")) {
- LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
- + "specified Hadoop configuration path is wrong and the sytem is using the default Hadoop configuration values."
- + "The Flink YARN client needs to store its files in a distributed file system");
- }
-
- // Set-up ApplicationSubmissionContext for the application
- ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
- appContext.setMaxAppAttempts(flinkConfiguration.getInteger(ConfigConstants.YARN_APPLICATION_ATTEMPTS, 1));
-
- final ApplicationId appId = appContext.getApplicationId();
-
- // Setup jar for ApplicationMaster
- LocalResource appMasterJar = Records.newRecord(LocalResource.class);
- LocalResource flinkConf = Records.newRecord(LocalResource.class);
- Path remotePathJar = Utils.setupLocalResource(conf, fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
- Path remotePathConf = Utils.setupLocalResource(conf, fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
- localResources.put("flink.jar", appMasterJar);
- localResources.put("flink-conf.yaml", flinkConf);
-
-
- // setup security tokens (code from apache storm)
- final Path[] paths = new Path[2 + shipFiles.size()];
- StringBuilder envShipFileList = new StringBuilder();
- // upload ship files
- for (int i = 0; i < shipFiles.size(); i++) {
- File shipFile = shipFiles.get(i);
- LocalResource shipResources = Records.newRecord(LocalResource.class);
- Path shipLocalPath = new Path("file://" + shipFile.getAbsolutePath());
- paths[2 + i] = Utils.setupLocalResource(conf, fs, appId.toString(),
- shipLocalPath, shipResources, fs.getHomeDirectory());
- localResources.put(shipFile.getName(), shipResources);
-
- envShipFileList.append(paths[2 + i]);
- if(i+1 < shipFiles.size()) {
- envShipFileList.append(',');
- }
- }
-
- paths[0] = remotePathJar;
- paths[1] = remotePathConf;
- sessionFilesDir = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString() + "/");
-
- FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
- fs.setPermission(sessionFilesDir, permission); // set permission for path.
-
- Utils.setTokensFor(amContainer, paths, conf);
-
- amContainer.setLocalResources(localResources);
- fs.close();
-
- // Setup CLASSPATH for ApplicationMaster
- Map<String, String> appMasterEnv = new HashMap<String, String>();
- Utils.setupEnv(conf, appMasterEnv);
- // set configuration values
- appMasterEnv.put(FlinkYarnClient.ENV_TM_COUNT, String.valueOf(taskManagerCount));
- appMasterEnv.put(FlinkYarnClient.ENV_TM_MEMORY, String.valueOf(taskManagerMemoryMb));
- appMasterEnv.put(FlinkYarnClient.FLINK_JAR_PATH, remotePathJar.toString() );
- appMasterEnv.put(FlinkYarnClient.ENV_APP_ID, appId.toString());
- appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_HOME_DIR, fs.getHomeDirectory().toString());
- appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
- appMasterEnv.put(FlinkYarnClient.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
- appMasterEnv.put(FlinkYarnClient.ENV_SLOTS, String.valueOf(slots));
- appMasterEnv.put(FlinkYarnClient.ENV_DETACHED, String.valueOf(detached));
- appMasterEnv.put(FlinkYarnClient.ENV_STREAMING_MODE, String.valueOf(streamingMode));
-
- if(dynamicPropertiesEncoded != null) {
- appMasterEnv.put(FlinkYarnClient.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
- }
-
- amContainer.setEnvironment(appMasterEnv);
-
- // Set up resource type requirements for ApplicationMaster
- Resource capability = Records.newRecord(Resource.class);
- capability.setMemory(jobManagerMemoryMb);
- capability.setVirtualCores(1);
-
- String name;
- if(customName == null) {
- name = "Flink session with " + taskManagerCount + " TaskManagers";
- if(detached) {
- name += " (detached)";
- }
- } else {
- name = customName;
- }
-
- appContext.setApplicationName(name); // application name
- appContext.setApplicationType("Apache Flink");
- appContext.setAMContainerSpec(amContainer);
- appContext.setResource(capability);
- if(yarnQueue != null) {
- appContext.setQueue(yarnQueue);
- }
-
- LOG.info("Submitting application master " + appId);
- yarnClient.submitApplication(appContext);
-
- LOG.info("Waiting for the cluster to be allocated");
- int waittime = 0;
- loop: while( true ) {
- ApplicationReport report = yarnClient.getApplicationReport(appId);
- YarnApplicationState appState = report.getYarnApplicationState();
- switch(appState) {
- case FAILED:
- case FINISHED:
- case KILLED:
- throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
- + appState + " during deployment. \n" +
- "Diagnostics from YARN: " + report.getDiagnostics() + "\n" +
- "If log aggregation is enabled on your cluster, use this command to further invesitage the issue:\n" +
- "yarn logs -applicationId " + appId);
- //break ..
- case RUNNING:
- LOG.info("YARN application has been deployed successfully.");
- break loop;
- default:
- LOG.info("Deploying cluster, current state " + appState);
- if(waittime > 60000) {
- LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
- }
-
- }
- waittime += 1000;
- Thread.sleep(1000);
- }
- // the Flink cluster is deployed in YARN. Represent cluster
- return new FlinkYarnCluster(yarnClient, appId, conf, flinkConfiguration, sessionFilesDir, detached);
- }
-
- /**
- * Kills YARN application and stops YARN client.
- *
- * Use this method to kill the App before it has been properly deployed
- */
- private void failSessionDuringDeployment() {
- LOG.info("Killing YARN application");
-
- try {
- yarnClient.killApplication(yarnApplication.getNewApplicationResponse().getApplicationId());
- } catch (Exception e) {
- // we only log a debug message here because the "killApplication" call is a best-effort
- // call (we don't know if the application has been deployed when the error occured).
- LOG.debug("Error while killing YARN application", e);
- }
- yarnClient.stop();
- }
-
-
- private static class ClusterResourceDescription {
- final public int totalFreeMemory;
- final public int containerLimit;
- final public int[] nodeManagersFree;
-
- public ClusterResourceDescription(int totalFreeMemory, int containerLimit, int[] nodeManagersFree) {
- this.totalFreeMemory = totalFreeMemory;
- this.containerLimit = containerLimit;
- this.nodeManagersFree = nodeManagersFree;
- }
- }
-
- private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
- List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
-
- int totalFreeMemory = 0;
- int containerLimit = 0;
- int[] nodeManagersFree = new int[nodes.size()];
-
- for(int i = 0; i < nodes.size(); i++) {
- NodeReport rep = nodes.get(i);
- int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
- nodeManagersFree[i] = free;
- totalFreeMemory += free;
- if(free > containerLimit) {
- containerLimit = free;
- }
- }
- return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
- }
-
- public String getClusterDescription() throws Exception {
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream ps = new PrintStream(baos);
-
- YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
-
- ps.append("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
- List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
- final String format = "|%-16s |%-16s %n";
- ps.printf("|Property |Value %n");
- ps.println("+---------------------------------------+");
- int totalMemory = 0;
- int totalCores = 0;
- for(NodeReport rep : nodes) {
- final Resource res = rep.getCapability();
- totalMemory += res.getMemory();
- totalCores += res.getVirtualCores();
- ps.format(format, "NodeID", rep.getNodeId());
- ps.format(format, "Memory", res.getMemory() + " MB");
- ps.format(format, "vCores", res.getVirtualCores());
- ps.format(format, "HealthReport", rep.getHealthReport());
- ps.format(format, "Containers", rep.getNumContainers());
- ps.println("+---------------------------------------+");
- }
- ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
- List<QueueInfo> qInfo = yarnClient.getAllQueues();
- for(QueueInfo q : qInfo) {
- ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
- q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
- }
- yarnClient.stop();
- return baos.toString();
- }
-
- public String getSessionFilesDir() {
- return sessionFilesDir.toString();
- }
-
- @Override
- public void setStreamingMode(boolean streamingMode) {
- this.streamingMode = streamingMode;
- }
-
- @Override
- public void setName(String name) {
- if(name == null) {
- throw new IllegalArgumentException("The passed name is null");
- }
- customName = name;
- }
-
- public static class YarnDeploymentException extends RuntimeException {
- public YarnDeploymentException() {
- }
-
- public YarnDeploymentException(String message) {
- super(message);
- }
-
- public YarnDeploymentException(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
}