You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/12 22:25:25 UTC
[04/14] flink git commit: [FLINK-8349] [flip6] Remove Yarn specific
commands from YarnClusterDescriptor
[FLINK-8349] [flip6] Remove Yarn specific commands from YarnClusterDescriptor
Remove Yarn specific commands from YarnClusterDescriptor. This is a preparational
step to make the FlinkYarnSessionCli work with the Flip-6 RestClusterClient.
This closes #5229.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/402499f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/402499f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/402499f0
Branch: refs/heads/master
Commit: 402499f06a4b590ac47df64ecc01055c06b0399b
Parents: 10e900b
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jan 11 23:25:58 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 12 16:14:03 2018 +0100
----------------------------------------------------------------------
.../apache/flink/client/cli/CliFrontend.java | 9 +
.../client/deployment/ClusterDescriptor.java | 9 +
.../Flip6StandaloneClusterDescriptor.java | 6 +
.../deployment/StandaloneClusterDescriptor.java | 6 +
.../flink/client/program/ClusterClient.java | 13 +-
.../client/program/StandaloneClusterClient.java | 4 -
.../client/program/rest/RestClusterClient.java | 7 +-
.../client/cli/util/DummyClusterDescriptor.java | 6 +
.../YARNSessionCapacitySchedulerITCase.java | 24 +-
.../flink/yarn/YARNSessionFIFOITCase.java | 9 +-
.../org/apache/flink/yarn/YarnTestBase.java | 59 ++-
.../yarn/AbstractYarnClusterDescriptor.java | 21 +-
.../apache/flink/yarn/YarnClusterClient.java | 155 +------
.../flink/yarn/cli/FlinkYarnSessionCli.java | 415 ++++++++++---------
.../yarn/cli/YarnApplicationStatusMonitor.java | 3 +-
15 files changed, 352 insertions(+), 394 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 453d086..847a5f8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -232,6 +232,15 @@ public class CliFrontend {
executeProgram(program, client, userParallelism);
} finally {
+ if (clusterId == null && !client.isDetached()) {
+ // terminate the cluster only if we have started it before and if it's not detached
+ try {
+ clusterDescriptor.terminateCluster(client.getClusterIdentifier());
+ } catch (FlinkException e) {
+ LOG.info("Could not properly terminate the Flink cluster.", e);
+ }
+ }
+
try {
client.shutdown();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index 07eeabc..b1f566c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.deployment;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
/**
* A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication.
@@ -59,4 +60,12 @@ public interface ClusterDescriptor extends AutoCloseable {
ClusterClient deployJobCluster(
final ClusterSpecification clusterSpecification,
final JobGraph jobGraph);
+
+ /**
+ * Terminates the cluster with the given cluster id.
+ *
+ * @param clusterId identifying the cluster to shut down
+ * @throws FlinkException if the cluster could not be terminated
+ */
+ void terminateCluster(String clusterId) throws FlinkException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
index a35a68b..70fd9f7 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
/**
@@ -62,6 +63,11 @@ public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor {
}
@Override
+ public void terminateCluster(String clusterId) throws FlinkException {
+ throw new UnsupportedOperationException("Cannot terminate a standalone Flip-6 cluster.");
+ }
+
+ @Override
public void close() throws Exception {
// nothing to do
}
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index ae25194..5638232 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
/**
* A deployment descriptor for an existing cluster.
@@ -61,6 +62,11 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor {
}
@Override
+ public void terminateCluster(String clusterId) throws FlinkException {
+ throw new UnsupportedOperationException("Cannot terminate a standalone cluster.");
+ }
+
+ @Override
public void close() throws Exception {
// nothing to do
}
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 7c05e3e..efa23fb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -262,11 +262,7 @@ public abstract class ClusterClient {
*/
public void shutdown() throws Exception {
synchronized (this) {
- try {
- finalizeCluster();
- } finally {
- actorSystemLoader.shutdown();
- }
+ actorSystemLoader.shutdown();
if (highAvailabilityServices != null) {
highAvailabilityServices.close();
@@ -938,7 +934,7 @@ public abstract class ClusterClient {
* May return new messages from the cluster.
* Messages can be for example about failed containers or container launch requests.
*/
- protected abstract List<String> getNewMessages();
+ public abstract List<String> getNewMessages();
/**
* Returns a string representation of the cluster.
@@ -946,11 +942,6 @@ public abstract class ClusterClient {
public abstract String getClusterIdentifier();
/**
- * Request the cluster to shut down or disconnect.
- */
- protected abstract void finalizeCluster();
-
- /**
* Set the mode of this client (detached or blocking job execution).
* @param isDetached If true, the client will submit programs detached via the {@code run} method
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index 10e1bdd..1782a25 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -105,8 +105,4 @@ public class StandaloneClusterClient extends ClusterClient {
return super.run(jobGraph, classLoader);
}
}
-
- @Override
- protected void finalizeCluster() {}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 6b21cff..a6bff1a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -329,12 +329,7 @@ public class RestClusterClient extends ClusterClient {
}
@Override
- protected List<String> getNewMessages() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected void finalizeCluster() {
+ public List<String> getNewMessages() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
index d300055..2bb3ed0 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
/**
@@ -56,6 +57,11 @@ public class DummyClusterDescriptor implements ClusterDescriptor {
}
@Override
+ public void terminateCluster(String clusterId) throws FlinkException {
+ throw new UnsupportedOperationException("DummyClusterDescriptor does not support cluster termination.");
+ }
+
+ @Override
public void close() {
// nothing to do
}
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 5bed22e..c806c5e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -74,6 +75,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static junit.framework.TestCase.assertTrue;
import static org.apache.flink.yarn.UtilsTest.addTestAppender;
import static org.apache.flink.yarn.UtilsTest.checkForLogString;
@@ -98,7 +100,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
* Test regular operation, including command line parameter parsing.
*/
@Test
- public void testClientStartup() {
+ public void testClientStartup() throws IOException {
LOG.info("Starting testClientStartup()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
@@ -116,7 +118,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
* The parallelism is requested at the YARN client (-ys).
*/
@Test
- public void perJobYarnCluster() {
+ public void perJobYarnCluster() throws IOException {
LOG.info("Starting perJobYarnCluster()");
addTestAppender(JobClient.class, Level.INFO);
File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
@@ -145,7 +147,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
* memory remains.
*/
@Test
- public void perJobYarnClusterOffHeap() {
+ public void perJobYarnClusterOffHeap() throws IOException {
LOG.info("Starting perJobYarnCluster()");
addTestAppender(JobClient.class, Level.INFO);
File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
@@ -363,15 +365,19 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
* target queue. With an error message, we can help users identifying the issue)
*/
@Test
- public void testNonexistingQueueWARNmessage() {
+ public void testNonexistingQueueWARNmessage() throws IOException {
LOG.info("Starting testNonexistingQueueWARNmessage()");
addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
- runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+ try {
+ runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1);
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils.findThrowableWithMessage(e, "to unknown queue: doesntExist").isPresent());
+ }
checkForLogString("The specified queue 'doesntExist' does not exist. Available queues");
LOG.info("Finished testNonexistingQueueWARNmessage()");
}
@@ -380,7 +386,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
* Test per-job yarn cluster with the parallelism set at the CliFrontend instead of the YARN client.
*/
@Test
- public void perJobYarnClusterWithParallelism() {
+ public void perJobYarnClusterWithParallelism() throws IOException {
LOG.info("Starting perJobYarnClusterWithParallelism()");
// write log messages to stdout as well, so that the runWithArgs() method
// is catching the log output
@@ -407,7 +413,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
* Test a fire-and-forget job submission to a YARN cluster.
*/
@Test(timeout = 60000)
- public void testDetachedPerJobYarnCluster() {
+ public void testDetachedPerJobYarnCluster() throws IOException {
LOG.info("Starting testDetachedPerJobYarnCluster()");
File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
@@ -423,7 +429,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
* Test a fire-and-forget job submission to a YARN cluster.
*/
@Test(timeout = 60000)
- public void testDetachedPerJobYarnClusterWithStreamingJob() {
+ public void testDetachedPerJobYarnClusterWithStreamingJob() throws IOException {
LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
File exampleJarLocation = new File("target/programs/StreamingWordCount.jar");
@@ -435,7 +441,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
LOG.info("Finished testDetachedPerJobYarnClusterWithStreamingJob()");
}
- private void testDetachedPerJobYarnClusterInternal(String job) {
+ private void testDetachedPerJobYarnClusterInternal(String job) throws IOException {
YarnClient yc = YarnClient.createYarnClient();
yc.init(YARN_CONFIGURATION);
yc.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index cc26350..15fe355 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -45,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
@@ -80,7 +81,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
* Test regular operation, including command line parameter parsing.
*/
@Test(timeout = 60000) // timeout after a minute.
- public void testDetachedMode() throws InterruptedException {
+ public void testDetachedMode() throws InterruptedException, IOException {
LOG.info("Starting testDetachedMode()");
addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
Runner runner =
@@ -158,7 +159,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
* <p>This test validates through 666*2 cores in the "cluster".
*/
@Test
- public void testQueryCluster() {
+ public void testQueryCluster() throws IOException {
LOG.info("Starting testQueryCluster()");
runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", null, RunTypes.YARN_SESSION, 0); // we have 666*2 cores.
LOG.info("Finished testQueryCluster()");
@@ -178,7 +179,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
*/
@Ignore("The test is too resource consuming (8.5 GB of memory)")
@Test
- public void testResourceComputation() {
+ public void testResourceComputation() throws IOException {
addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
LOG.info("Starting testResourceComputation()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
@@ -206,7 +207,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
*/
@Ignore("The test is too resource consuming (8 GB of memory)")
@Test
- public void testfullAlloc() {
+ public void testfullAlloc() throws IOException {
addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
LOG.info("Starting testfullAlloc()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index ed02892..eeda32d 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -61,6 +61,9 @@ import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
@@ -86,6 +89,7 @@ public abstract class YarnTestBase extends TestLogger {
protected static final PrintStream ORIGINAL_STDOUT = System.out;
protected static final PrintStream ORIGINAL_STDERR = System.err;
+ private static final InputStream ORIGINAL_STDIN = System.in;
protected static final String TEST_CLUSTER_NAME_KEY = "flink-yarn-minicluster-name";
@@ -510,13 +514,18 @@ public abstract class YarnTestBase extends TestLogger {
/**
* This method returns once the "startedAfterString" has been seen.
*/
- protected Runner startWithArgs(String[] args, String startedAfterString, RunTypes type) {
+ protected Runner startWithArgs(String[] args, String startedAfterString, RunTypes type) throws IOException {
LOG.info("Running with args {}", Arrays.toString(args));
outContent = new ByteArrayOutputStream();
errContent = new ByteArrayOutputStream();
+ PipedOutputStream out = new PipedOutputStream();
+ PipedInputStream in = new PipedInputStream(out);
+ PrintStream stdinPrintStream = new PrintStream(out);
+
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
+ System.setIn(in);
final int startTimeoutSeconds = 60;
@@ -525,7 +534,8 @@ public abstract class YarnTestBase extends TestLogger {
flinkConfiguration,
CliFrontend.getConfigurationDirectoryFromEnv(),
type,
- 0);
+ 0,
+ stdinPrintStream);
runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs()).");
runner.start();
@@ -539,7 +549,7 @@ public abstract class YarnTestBase extends TestLogger {
}
// check if thread died
if (!runner.isAlive()) {
- sendOutput();
+ resetStreamsAndSendOutput();
if (runner.getRunnerError() != null) {
throw new RuntimeException("Runner failed with exception.", runner.getRunnerError());
}
@@ -547,13 +557,13 @@ public abstract class YarnTestBase extends TestLogger {
}
}
- sendOutput();
+ resetStreamsAndSendOutput();
Assert.fail("During the timeout period of " + startTimeoutSeconds + " seconds the " +
"expected string did not show up");
return null;
}
- protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) {
+ protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnStrings, RunTypes type, int returnCode) throws IOException {
runWithArgs(args, terminateAfterString, failOnStrings, type, returnCode, false);
}
@@ -566,13 +576,17 @@ public abstract class YarnTestBase extends TestLogger {
* @param expectedReturnValue Expected return code from the runner.
* @param checkLogForTerminateString If true, the runner checks also the log4j logger for the terminate string
*/
- protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnPatterns, RunTypes type, int expectedReturnValue, boolean checkLogForTerminateString) {
+ protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnPatterns, RunTypes type, int expectedReturnValue, boolean checkLogForTerminateString) throws IOException {
LOG.info("Running with args {}", Arrays.toString(args));
outContent = new ByteArrayOutputStream();
errContent = new ByteArrayOutputStream();
+ PipedOutputStream out = new PipedOutputStream();
+ PipedInputStream in = new PipedInputStream(out);
+ PrintStream stdinPrintStream = new PrintStream(out);
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
+ System.setIn(in);
// we wait for at most three minutes
final int startTimeoutSeconds = 180;
@@ -583,11 +597,13 @@ public abstract class YarnTestBase extends TestLogger {
flinkConfiguration,
CliFrontend.getConfigurationDirectoryFromEnv(),
type,
- expectedReturnValue);
+ expectedReturnValue,
+ stdinPrintStream);
runner.start();
boolean expectedStringSeen = false;
boolean testPassedFromLog4j = false;
+ long shutdownTimeout = 30000L;
do {
sleep(1000);
String outContentString = outContent.toString();
@@ -597,9 +613,15 @@ public abstract class YarnTestBase extends TestLogger {
Pattern pattern = Pattern.compile(failOnString);
if (pattern.matcher(outContentString).find() || pattern.matcher(errContentString).find()) {
LOG.warn("Failing test. Output contained illegal string '" + failOnString + "'");
- sendOutput();
+ resetStreamsAndSendOutput();
// stopping runner.
runner.sendStop();
+ // wait for the thread to stop
+ try {
+ runner.join(shutdownTimeout);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while stopping runner", e);
+ }
Assert.fail("Output contained illegal string '" + failOnString + "'");
}
}
@@ -622,7 +644,7 @@ public abstract class YarnTestBase extends TestLogger {
runner.sendStop();
// wait for the thread to stop
try {
- runner.join(30000);
+ runner.join(shutdownTimeout);
}
catch (InterruptedException e) {
LOG.warn("Interrupted while stopping runner", e);
@@ -639,7 +661,7 @@ public abstract class YarnTestBase extends TestLogger {
}
while (runner.getRunnerError() == null && !expectedStringSeen && System.currentTimeMillis() < deadline);
- sendOutput();
+ resetStreamsAndSendOutput();
if (runner.getRunnerError() != null) {
// this lets the test fail.
@@ -651,9 +673,10 @@ public abstract class YarnTestBase extends TestLogger {
LOG.info("Test was successful");
}
- protected static void sendOutput() {
+ protected static void resetStreamsAndSendOutput() {
System.setOut(ORIGINAL_STDOUT);
System.setErr(ORIGINAL_STDERR);
+ System.setIn(ORIGINAL_STDIN);
LOG.info("Sending stdout content through logger: \n\n{}\n\n", outContent.toString());
LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString());
@@ -668,6 +691,8 @@ public abstract class YarnTestBase extends TestLogger {
private final String configurationDirectory;
private final int expectedReturnValue;
+ private final PrintStream stdinPrintStream;
+
private RunTypes type;
private FlinkYarnSessionCli yCli;
private Throwable runnerError;
@@ -677,13 +702,15 @@ public abstract class YarnTestBase extends TestLogger {
org.apache.flink.configuration.Configuration configuration,
String configurationDirectory,
RunTypes type,
- int expectedReturnValue) {
+ int expectedReturnValue,
+ PrintStream stdinPrintStream) {
this.args = args;
this.configuration = Preconditions.checkNotNull(configuration);
this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
this.type = type;
this.expectedReturnValue = expectedReturnValue;
+ this.stdinPrintStream = Preconditions.checkNotNull(stdinPrintStream);
}
@Override
@@ -697,8 +724,8 @@ public abstract class YarnTestBase extends TestLogger {
configurationDirectory,
"",
"",
- false);
- returnValue = yCli.run(args, configuration, configurationDirectory);
+ true);
+ returnValue = yCli.run(args);
break;
case CLI_FRONTEND:
try {
@@ -727,9 +754,7 @@ public abstract class YarnTestBase extends TestLogger {
/** Stops the Yarn session. */
public void sendStop() {
- if (yCli != null) {
- yCli.stop();
- }
+ stdinPrintStream.println("stop");
}
public Throwable getRunnerError() {
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 0a977df..4affb78 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.yarn;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
@@ -330,7 +332,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
// -------------------------------------------------------------
@Override
- public YarnClusterClient retrieve(String applicationID) {
+ public ClusterClient retrieve(String applicationID) {
try {
// check if required Hadoop environment variables are set. If not, warn user
@@ -393,6 +395,23 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
}
+ @Override
+ public void terminateCluster(String clusterId) throws FlinkException {
+ try {
+ yarnClient.killApplication(ConverterUtils.toApplicationId(clusterId));
+ } catch (IOException | YarnException e) {
+ throw new FlinkException("Could not terminate cluster with id " + clusterId + '.', e);
+ }
+ }
+
+ public void terminateCluster(ApplicationId applicationId) throws FlinkException {
+ try {
+ yarnClient.killApplication(applicationId);
+ } catch (YarnException | IOException e) {
+ throw new FlinkException("Could not kill the Yarn Flink cluster with id " + applicationId + '.', e);
+ }
+ }
+
/**
* This method will block until the ApplicationMaster/JobManager have been
* deployed on YARN.
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 5fb7f90..991b3b9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -24,7 +24,6 @@ import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
@@ -33,8 +32,6 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
-import org.apache.flink.yarn.configuration.YarnConfigOptions;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -43,17 +40,15 @@ import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
import scala.Option;
import scala.concurrent.Await;
@@ -67,8 +62,6 @@ public class YarnClusterClient extends ClusterClient {
private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
- private Thread clientShutdownHook = new ClientShutdownHook();
-
//---------- Class internal fields -------------------
private final AbstractYarnClusterDescriptor clusterDescriptor;
@@ -80,8 +73,6 @@ public class YarnClusterClient extends ClusterClient {
private final ApplicationId appId;
private final String trackingURL;
- private boolean isConnected = true;
-
/** Indicator whether this cluster has just been created. */
private final boolean newlyCreatedCluster;
@@ -120,32 +111,6 @@ public class YarnClusterClient extends ClusterClient {
flinkConfig,
actorSystemLoader,
highAvailabilityServices);
-
- Runtime.getRuntime().addShutdownHook(clientShutdownHook);
- }
-
- /**
- * Disconnect from the Yarn cluster.
- */
- public void disconnect() {
-
- if (hasBeenShutDown.getAndSet(true)) {
- return;
- }
-
- if (!isConnected) {
- throw new IllegalStateException("Can not disconnect from an unconnected cluster.");
- }
-
- LOG.info("Disconnecting YarnClusterClient from ApplicationMaster");
-
- try {
- Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
- } catch (IllegalStateException e) {
- // we are already in the shutdown hook
- }
-
- isConnected = false;
}
// -------------------------- Interaction with the cluster ------------------------
@@ -207,7 +172,7 @@ public class YarnClusterClient extends ClusterClient {
@Override
public String getClusterIdentifier() {
- return "Yarn cluster with application id " + appReport.getApplicationId();
+ return ConverterUtils.toString(appReport.getApplicationId());
}
/**
@@ -215,13 +180,6 @@ public class YarnClusterClient extends ClusterClient {
*/
@Override
public GetClusterStatusResponse getClusterStatus() {
- if (!isConnected) {
- throw new IllegalStateException("The cluster is not connected to the cluster.");
- }
- if (hasBeenShutdown()) {
- throw new IllegalStateException("The cluster has already been shutdown.");
- }
-
try {
final Future<Object> clusterStatusOption =
getJobManagerGateway().ask(
@@ -236,15 +194,7 @@ public class YarnClusterClient extends ClusterClient {
@Override
public List<String> getNewMessages() {
- if (hasBeenShutdown()) {
- throw new RuntimeException("The YarnClusterClient has already been stopped");
- }
-
- if (!isConnected) {
- throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
- }
-
- List<String> ret = new ArrayList<String>();
+ List<String> ret = new ArrayList<>();
// get messages from ApplicationClient (locally)
while (true) {
Object result;
@@ -283,105 +233,6 @@ public class YarnClusterClient extends ClusterClient {
return ret;
}
- // -------------------------- Shutdown handling ------------------------
-
- private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
-
- /**
- * Shuts down or disconnects from the YARN cluster.
- */
- @Override
- public void finalizeCluster() {
- if (isDetached() || !newlyCreatedCluster) {
- disconnect();
- } else {
- shutdownCluster();
- }
- }
-
- /**
- * Shuts down the Yarn application.
- */
- public void shutdownCluster() {
-
- if (hasBeenShutDown.getAndSet(true)) {
- return;
- }
-
- if (!isConnected) {
- throw new IllegalStateException("The cluster has been not been connected to the ApplicationMaster.");
- }
-
- try {
- Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
- } catch (IllegalStateException e) {
- // we are already in the shutdown hook
- }
-
- LOG.info("Sending shutdown request to the Application Master");
- try {
- Future<Object> response =
- Patterns.ask(applicationClient.get(),
- new YarnMessages.LocalStopYarnSession(ApplicationStatus.CANCELED,
- "Flink YARN Client requested shutdown"),
- new Timeout(akkaDuration));
- Await.ready(response, akkaDuration);
- } catch (Exception e) {
- LOG.warn("Error while stopping YARN cluster.", e);
- }
-
- try {
- File propertiesFile = FlinkYarnSessionCli.getYarnPropertiesLocation(flinkConfig.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
- if (propertiesFile.isFile()) {
- if (propertiesFile.delete()) {
- LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString());
- } else {
- LOG.warn("Couldn't delete Yarn properties file at {}", propertiesFile.getAbsoluteFile().toString());
- }
- }
- } catch (Exception e) {
- LOG.warn("Exception while deleting the JobManager address file", e);
- }
-
- try {
- ApplicationReport appReport = clusterDescriptor.getYarnClient().getApplicationReport(appId);
-
- LOG.info("Application " + appId + " finished with state " + appReport
- .getYarnApplicationState() + " and final state " + appReport
- .getFinalApplicationStatus() + " at " + appReport.getFinishTime());
-
- if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
- == YarnApplicationState.KILLED) {
- LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
- LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
- + "the full application log using this command:"
- + System.lineSeparator()
- + "\tyarn logs -applicationId " + appReport.getApplicationId()
- + System.lineSeparator()
- + "(It sometimes takes a few seconds until the logs are aggregated)");
- }
- } catch (Exception e) {
- LOG.warn("Couldn't get final report", e);
- }
- }
-
- public boolean hasBeenShutdown() {
- return hasBeenShutDown.get();
- }
-
- private class ClientShutdownHook extends Thread {
- @Override
- public void run() {
- LOG.info("Shutting down YarnClusterClient from the client shutdown hook");
-
- try {
- shutdown();
- } catch (Throwable t) {
- LOG.warn("Could not properly shut down the yarn cluster client.", t);
- }
- }
- }
-
@Override
public boolean isDetached() {
return super.isDetached() || clusterDescriptor.isDetachedMode();
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d797f47..75a270d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -19,9 +19,11 @@
package org.apache.flink.yarn.cli;
import org.apache.flink.client.cli.AbstractCustomCommandLine;
+import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -36,20 +38,21 @@ import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
-import org.apache.flink.yarn.YarnClusterClient;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptorV2;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,8 +76,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
@@ -96,7 +101,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
// YARN-session related constants
private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
- static final String YARN_APPLICATION_ID_KEY = "applicationID";
+ private static final String YARN_APPLICATION_ID_KEY = "applicationID";
private static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
@@ -148,8 +153,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
private final String yarnApplicationIdFromYarnProperties;
+ private final String yarnPropertiesFileLocation;
+
//------------------------------------ Internal fields -------------------------
- private YarnClusterClient yarnCluster;
private boolean detachedMode = false;
public FlinkYarnSessionCli(
@@ -181,7 +187,12 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
container = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
slots = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
- dynamicproperties = new Option(shortPrefix + "D", true, "Dynamic properties");
+ dynamicproperties = Option.builder(shortPrefix + "D")
+ .argName("property=value")
+ .numberOfArgs(2)
+ .valueSeparator()
+ .desc("use value for given property")
+ .build();
detached = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
@@ -206,7 +217,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
allOptions.addOption(flip6);
// try loading a potential yarn properties file
- final File yarnPropertiesLocation = getYarnPropertiesLocation(configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
+ this.yarnPropertiesFileLocation = configuration.getString(YarnConfigOptions.PROPERTIES_FILE_LOCATION);
+ final File yarnPropertiesLocation = getYarnPropertiesLocation(yarnPropertiesFileLocation);
yarnPropertiesFile = new Properties();
@@ -216,7 +228,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
try (InputStream is = new FileInputStream(yarnPropertiesLocation)) {
yarnPropertiesFile.load(is);
} catch (IOException ioe) {
- throw new FlinkException("Could not read the Yarn properties file " + yarnPropertiesLocation + '.');
+ throw new FlinkException("Could not read the Yarn properties file " + yarnPropertiesLocation +
+ ". Please delete the file at " + yarnPropertiesLocation.getAbsolutePath() + '.', ioe);
}
yarnApplicationIdFromYarnProperties = yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
@@ -305,10 +318,21 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
yarnClusterDescriptor.setQueue(cmd.getOptionValue(queue.getOpt()));
}
- String[] dynamicProperties = null;
- if (cmd.hasOption(dynamicproperties.getOpt())) {
- dynamicProperties = cmd.getOptionValues(dynamicproperties.getOpt());
- }
+ final Properties properties = cmd.getOptionProperties(dynamicproperties.getOpt());
+
+ String[] dynamicProperties = properties.stringPropertyNames().stream()
+ .flatMap(
+ (String key) -> {
+ final String value = properties.getProperty(key);
+
+ if (value != null) {
+ return Stream.of(key + dynamicproperties.getValueSeparator() + value);
+ } else {
+ return Stream.empty();
+ }
+ })
+ .toArray(String[]::new);
+
String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, YARN_DYNAMIC_PROPERTIES_SEPARATOR);
yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
@@ -534,184 +558,168 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
return effectiveConfiguration;
}
- public int run(
- String[] args,
- Configuration configuration,
- String configurationDirectory) {
+ public int run(String[] args) throws CliArgsException, FlinkException {
//
// Command Line Options
//
- Options options = new Options();
- addGeneralOptions(options);
- addRunOptions(options);
+ final CommandLine cmd = parseCommandLineOptions(args, true);
- CommandLineParser parser = new PosixParser();
- CommandLine cmd;
- try {
- cmd = parser.parse(options, args);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- printUsage();
- return 1;
- }
+ final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd);
- // Query cluster for metrics
- if (cmd.hasOption(query.getOpt())) {
- AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
- configuration,
- configurationDirectory,
- cmd.hasOption(flip6.getOpt()));
- String description;
- try {
- description = yarnDescriptor.getClusterDescription();
- } catch (Exception e) {
- System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage());
- e.printStackTrace(System.err);
- return 1;
- }
- System.out.println(description);
- return 0;
- } else if (cmd.hasOption(applicationId.getOpt())) {
+ try {
+ // Query cluster for metrics
+ if (cmd.hasOption(query.getOpt())) {
+ final String description = yarnClusterDescriptor.getClusterDescription();
+ System.out.println(description);
+ return 0;
+ } else {
+ final ClusterClient clusterClient;
+ final ApplicationId yarnApplicationId;
- AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(
- configuration,
- configurationDirectory,
- cmd.hasOption(flip6.getOpt()));
+ if (cmd.hasOption(applicationId.getOpt())) {
+ yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
- //configure ZK namespace depending on the value passed
- String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ?
- cmd.getOptionValue(zookeeperNamespace.getOpt())
- : yarnDescriptor.getFlinkConfiguration()
- .getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
- LOG.info("Going to use the ZK namespace: {}", zkNamespace);
- yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace);
+ clusterClient = yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
+ } else {
+ final ClusterSpecification clusterSpecification = getClusterSpecification(cmd);
- try {
- yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
- } catch (Exception e) {
- throw new RuntimeException("Could not retrieve existing Yarn application", e);
- }
+ clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
- if (detachedMode) {
- LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
- "Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
- "yarn application -kill " + applicationId.getOpt());
- yarnCluster.disconnect();
- } else {
- ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
-
- try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
- yarnDescriptor.getYarnClient(),
- yarnCluster.getApplicationId(),
- new ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
- runInteractiveCli(
- yarnCluster,
- yarnApplicationStatusMonitor,
- true);
- } finally {
- // shut down the scheduled executor service
- ExecutorUtils.gracefulShutdown(
- 1000L,
- TimeUnit.MILLISECONDS,
- scheduledExecutorService);
- }
- }
- } else {
+ //------------------ ClusterClient deployed, handle connection details
+ yarnApplicationId = ConverterUtils.toApplicationId(clusterClient.getClusterIdentifier());
- try (AbstractYarnClusterDescriptor yarnDescriptor = createClusterDescriptor(cmd)){
- final ClusterSpecification clusterSpecification;
+ String jobManagerAddress =
+ clusterClient.getJobManagerAddress().getAddress().getHostName() +
+ ':' + clusterClient.getJobManagerAddress().getPort();
- try {
- clusterSpecification = getClusterSpecification(cmd);
- } catch (FlinkException e) {
- System.err.println("Error while creating the cluster specification: " + e.getMessage());
- e.printStackTrace();
- return 1;
- }
+ System.out.println("Flink JobManager is now running on " + jobManagerAddress);
+ System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL());
- try {
- yarnCluster = yarnDescriptor.deploySessionCluster(clusterSpecification);
- } catch (Exception e) {
- System.err.println("Error while deploying YARN cluster: " + e.getMessage());
- e.printStackTrace(System.err);
- return 1;
- }
- //------------------ ClusterClient deployed, handle connection details
- String jobManagerAddress =
- yarnCluster.getJobManagerAddress().getAddress().getHostName() +
- ":" + yarnCluster.getJobManagerAddress().getPort();
-
- System.out.println("Flink JobManager is now running on " + jobManagerAddress);
- System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
-
- // file that we write into the conf/ dir containing the jobManager address and the dop.
- File yarnPropertiesFile = getYarnPropertiesLocation(configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
-
- Properties yarnProps = new Properties();
- yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString());
- if (clusterSpecification.getSlotsPerTaskManager() != -1) {
- String parallelism =
- Integer.toString(clusterSpecification.getSlotsPerTaskManager() * clusterSpecification.getNumberTaskManagers());
- yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
- }
- // add dynamic properties
- if (yarnDescriptor.getDynamicPropertiesEncoded() != null) {
- yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
- yarnDescriptor.getDynamicPropertiesEncoded());
+ writeYarnPropertiesFile(
+ yarnApplicationId,
+ clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(),
+ yarnClusterDescriptor.getDynamicPropertiesEncoded());
}
- writeYarnProperties(yarnProps, yarnPropertiesFile);
-
- //------------------ ClusterClient running, let user control it ------------
if (detachedMode) {
- // print info and quit:
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
- "yarn application -kill " + yarnCluster.getApplicationId());
- yarnCluster.waitForClusterToBeReady();
- yarnCluster.disconnect();
+ "yarn application -kill " + applicationId.getOpt());
} else {
+ ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
- ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+ final YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+ yarnClusterDescriptor.getYarnClient(),
+ yarnApplicationId,
+ new ScheduledExecutorServiceAdapter(scheduledExecutorService));
- try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
- yarnDescriptor.getYarnClient(),
- yarnCluster.getApplicationId(),
- new ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
+ try {
runInteractiveCli(
- yarnCluster,
+ clusterClient,
yarnApplicationStatusMonitor,
acceptInteractiveInput);
} finally {
+ try {
+ yarnApplicationStatusMonitor.close();
+ } catch (Exception e) {
+ LOG.info("Could not properly close the Yarn application status monitor.", e);
+ }
+
+ try {
+ clusterClient.shutdown();
+ } catch (Exception e) {
+ LOG.info("Could not properly shutdown cluster client.", e);
+ }
+
+ try {
+ yarnClusterDescriptor.terminateCluster(yarnApplicationId);
+ } catch (FlinkException e) {
+ LOG.info("Could not properly terminate the Flink cluster.", e);
+ }
+
// shut down the scheduled executor service
ExecutorUtils.gracefulShutdown(
1000L,
TimeUnit.MILLISECONDS,
scheduledExecutorService);
+
+ deleteYarnPropertiesFile();
+
+ try {
+ final ApplicationReport applicationReport = yarnClusterDescriptor
+ .getYarnClient()
+ .getApplicationReport(yarnApplicationId);
+
+ logFinalApplicationReport(applicationReport);
+ } catch (YarnException | IOException e) {
+ LOG.info("Could not log the final application report.", e);
+ }
}
}
- } catch (FlinkException e) {
- System.err.println("Error while deploying a Flink cluster: " + e.getMessage());
- e.printStackTrace();
- return 1;
+ }
+ } finally {
+ try {
+ yarnClusterDescriptor.close();
+ } catch (Exception e) {
+ LOG.info("Could not properly close the yarn cluster descriptor.", e);
}
}
+
return 0;
}
- /**
- * Utility method for tests.
- */
- public void stop() {
- if (yarnCluster != null) {
- LOG.info("Command line interface is shutting down the yarnCluster");
+ private void logFinalApplicationReport(ApplicationReport appReport) {
+ LOG.info("Application " + appReport.getApplicationId() + " finished with state " + appReport
+ .getYarnApplicationState() + " and final state " + appReport
+ .getFinalApplicationStatus() + " at " + appReport.getFinishTime());
+
+ if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+ == YarnApplicationState.KILLED) {
+ LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics());
+ LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve "
+ + "the full application log using this command:"
+ + System.lineSeparator()
+ + "\tyarn logs -applicationId " + appReport.getApplicationId()
+ + System.lineSeparator()
+ + "(It sometimes takes a few seconds until the logs are aggregated)");
+ }
+ }
- try {
- yarnCluster.shutdown();
- } catch (Throwable t) {
- LOG.warn("Could not properly shutdown the yarn cluster.", t);
+ private void deleteYarnPropertiesFile() {
+ // try to clean up the old yarn properties file
+ try {
+ File propertiesFile = getYarnPropertiesLocation(yarnPropertiesFileLocation);
+ if (propertiesFile.isFile()) {
+ if (propertiesFile.delete()) {
+ LOG.info("Deleted Yarn properties file at {}", propertiesFile.getAbsoluteFile());
+ } else {
+ LOG.warn("Couldn't delete Yarn properties file at {}", propertiesFile.getAbsoluteFile());
+ }
}
+ } catch (Exception e) {
+ LOG.warn("Exception while deleting the JobManager address file", e);
+ }
+ }
+
+ private void writeYarnPropertiesFile(
+ ApplicationId yarnApplicationId,
+ int parallelism,
+ @Nullable String dynamicProperties) {
+ // file that we write into the conf/ dir containing the jobManager address and the dop.
+ final File yarnPropertiesFile = getYarnPropertiesLocation(yarnPropertiesFileLocation);
+
+ Properties yarnProps = new Properties();
+ yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnApplicationId.toString());
+ if (parallelism > 0) {
+ yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, Integer.toString(parallelism));
+ }
+
+ // add dynamic properties
+ if (dynamicProperties != null) {
+ yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, dynamicProperties);
}
+
+ writeYarnProperties(yarnProps, yarnPropertiesFile);
}
private void logAndSysout(String message) {
@@ -719,28 +727,64 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
System.out.println(message);
}
- public static void main(final String[] args) throws Exception {
+ public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
+ if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
+ Map<String, String> properties = new HashMap<>();
+
+ String[] propertyLines = dynamicPropertiesEncoded.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+ for (String propLine : propertyLines) {
+ if (propLine == null) {
+ continue;
+ }
+
+ int firstEquals = propLine.indexOf("=");
+
+ if (firstEquals >= 0) {
+ String key = propLine.substring(0, firstEquals).trim();
+ String value = propLine.substring(firstEquals + 1, propLine.length()).trim();
+
+ if (!key.isEmpty()) {
+ properties.put(key, value);
+ }
+ }
+ }
+ return properties;
+ }
+ else {
+ return Collections.emptyMap();
+ }
+ }
+
+ public static void main(final String[] args) {
final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
- final FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
- flinkConfiguration,
- configurationDirectory,
- "",
- ""); // no prefix for the YARN session
+ int retCode;
+
+ try {
+ final FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
+ flinkConfiguration,
+ configurationDirectory,
+ "",
+ ""); // no prefix for the YARN session
- SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
+ SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
- final int retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args, flinkConfiguration, configurationDirectory));
+ retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
+ } catch (CliArgsException e) {
+ retCode = handleCliArgsException(e);
+ } catch (Exception e) {
+ retCode = handleError(e);
+ }
System.exit(retCode);
}
private static void runInteractiveCli(
- YarnClusterClient clusterClient,
- YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
- boolean readConsoleInput) {
+ ClusterClient clusterClient,
+ YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+ boolean readConsoleInput) {
try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
boolean continueRepl = true;
int numTaskmanagers = 0;
@@ -799,7 +843,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
}
}
- private static void printClusterMessages(YarnClusterClient clusterClient) {
+ private static void printClusterMessages(ClusterClient clusterClient) {
final List<String> messages = clusterClient.getNewMessages();
if (!messages.isEmpty()) {
System.err.println("New messages from the YARN cluster: ");
@@ -819,8 +863,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
* @throws InterruptedException
*/
private static boolean repStep(
- BufferedReader in,
- boolean readConsoleInput) throws IOException, InterruptedException {
+ BufferedReader in,
+ boolean readConsoleInput) throws IOException, InterruptedException {
// wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
long startTime = System.currentTimeMillis();
@@ -859,32 +903,25 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
propertiesFile.setReadable(true, false); // readable for all.
}
- public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
- if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
- Map<String, String> properties = new HashMap<>();
+ private static int handleCliArgsException(CliArgsException e) {
+ LOG.error("Could not parse the command line arguments.", e);
- String[] propertyLines = dynamicPropertiesEncoded.split(YARN_DYNAMIC_PROPERTIES_SEPARATOR);
- for (String propLine : propertyLines) {
- if (propLine == null) {
- continue;
- }
+ System.out.println(e.getMessage());
+ System.out.println();
+ System.out.println("Use the help option (-h or --help) to get help on the command.");
+ return 1;
+ }
- int firstEquals = propLine.indexOf("=");
+ private static int handleError(Exception e) {
+ LOG.error("Error while running the Flink Yarn session.", e);
- if (firstEquals >= 0) {
- String key = propLine.substring(0, firstEquals).trim();
- String value = propLine.substring(firstEquals + 1, propLine.length()).trim();
+ System.err.println();
+ System.err.println("------------------------------------------------------------");
+ System.err.println(" The program finished with the following exception:");
+ System.err.println();
- if (!key.isEmpty()) {
- properties.put(key, value);
- }
- }
- }
- return properties;
- }
- else {
- return Collections.emptyMap();
- }
+ e.printStackTrace();
+ return 1;
}
public static File getYarnPropertiesLocation(@Nullable String yarnPropertiesFileLocation) {
@@ -902,7 +939,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
return new File(propertiesFileLocation, YARN_PROPERTIES_FILE + currentUser);
}
- protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) {
+ private static AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) {
final YarnClient yarnClient = YarnClient.createYarnClient();
if (flip6) {
return new YarnClusterDescriptorV2(configuration, configurationDirectory, yarnClient);
http://git-wip-us.apache.org/repos/asf/flink/blob/402499f0/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
index 88d7747..f96b581 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
@@ -59,7 +59,7 @@ public class YarnApplicationStatusMonitor implements AutoCloseable {
applicationStatusUpdateFuture = scheduledExecutor.scheduleWithFixedDelay(
this::updateApplicationStatus,
- UPDATE_INTERVAL,
+ 0L,
UPDATE_INTERVAL,
TimeUnit.MILLISECONDS);
@@ -83,6 +83,7 @@ public class YarnApplicationStatusMonitor implements AutoCloseable {
applicationReport = yarnClient.getApplicationReport(yarnApplicationId);
} catch (Exception e) {
LOG.info("Could not retrieve the Yarn application report for {}.", yarnApplicationId);
+ applicationStatus = ApplicationStatus.UNKNOWN;
return;
}