You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/04/13 19:17:26 UTC
[1/4] flink git commit: [hotfix][typo] fix some typos
Repository: flink
Updated Branches:
refs/heads/master f3d4011f8 -> 635612886
[hotfix][typo] fix some typos
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57b3ce8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57b3ce8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57b3ce8c
Branch: refs/heads/master
Commit: 57b3ce8ce84b5bfa47ca6afcff811842dad8e7cc
Parents: f3d4011
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Mar 9 12:11:55 2018 +0100
Committer: Nico Kruber <ni...@data-artisans.com>
Committed: Thu Apr 12 10:40:14 2018 -0700
----------------------------------------------------------------------
.../org/apache/flink/client/program/ContextEnvironmentFactory.java | 2 +-
.../java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3ce8c/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index 64b1863..8c1d9eb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -64,7 +64,7 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
if (isDetached && lastEnvCreated != null) {
- throw new InvalidProgramException("Multiple enviornments cannot be created in detached mode");
+ throw new InvalidProgramException("Multiple environments cannot be created in detached mode");
}
lastEnvCreated = isDetached
http://git-wip-us.apache.org/repos/asf/flink/blob/57b3ce8c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index ef266f0..9d504d3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -1070,7 +1070,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop " +
"it:\nyarn application -kill " + appId + "\nPlease also note that the " +
- "temporary files of the YARN session in the home directoy will not be removed.");
+ "temporary files of the YARN session in the home directory will not be removed.");
}
// since deployment was successful, remove the hook
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
[3/4] flink git commit: [FLINK-8872][flip6] fix yarn detached mode
command parsing
Posted by tr...@apache.org.
[FLINK-8872][flip6] fix yarn detached mode command parsing
The detached flag if given by "-yd" was not passed correctly into the
CliFrontend and resulted in the CLI waiting for submitted jobs to finish instead
of detaching from the execution.
[FLINK-8872][yarn] add tests for YARN detached mode command line parsing with CliFrontend
- create a test-jar of flink-clients
- create CliFrontendRunWithYarnTest based on CliFrontendRunTest that verifies
CliFrontend's parsing in conjunction with FlinkYarnSessionCli
-> verify detached mode in this test (can be extended further in the future)
This closes #5672.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdd1c6ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdd1c6ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdd1c6ed
Branch: refs/heads/master
Commit: fdd1c6ed0fed229612ecde1565d90a06dbe6ff55
Parents: ca5573b
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Mar 5 18:24:17 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 13 10:38:26 2018 -0700
----------------------------------------------------------------------
flink-clients/pom.xml | 14 +-
.../flink/client/cli/CliFrontendParser.java | 9 ++
.../org/apache/flink/client/cli/DefaultCLI.java | 1 +
.../apache/flink/client/cli/ProgramOptions.java | 4 +-
.../flink/client/program/ClusterClient.java | 2 +-
.../client/program/rest/RestClusterClient.java | 2 +-
.../flink/client/cli/CliFrontendRunTest.java | 2 +-
flink-yarn-tests/pom.xml | 8 +
.../flink/yarn/CliFrontendRunWithYarnTest.java | 148 +++++++++++++++++++
.../flink/yarn/util/FakeClusterClient.java | 79 ++++++++++
.../util/NonDeployingYarnClusterDescriptor.java | 98 ++++++++++++
.../flink/yarn/cli/FlinkYarnSessionCli.java | 22 +--
.../flink/yarn/FlinkYarnSessionCliTest.java | 19 +++
13 files changed, 392 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index d15e3ff..c96f275 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -96,6 +96,18 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version><!--$NO-MVN-MAN-VER$-->
<executions>
@@ -120,7 +132,7 @@ under the License.
</execution>
</executions>
</plugin>
- <!--Remove the external jar test code from the test-classes directory since it musn't be in the
+ <!--Remove the external jar test code from the test-classes directory since it mustn't be in the
classpath when running the tests to actually test whether the user code class loader
is properly used.-->
<plugin>
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 5a6c0ff..1588aac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -62,6 +62,13 @@ public class CliFrontendParser {
public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
"the job in detached mode");
+ /**
+ * @deprecated use non-prefixed variant {@link #DETACHED_OPTION} for both YARN and non-YARN deployments
+ */
+ @Deprecated
+ public static final Option YARN_DETACHED_OPTION = new Option("yd", "yarndetached", false, "If present, runs " +
+ "the job in detached mode (deprecated; use non-YARN specific option instead)");
+
static final Option ARGS_OPTION = new Option("a", "arguments", true,
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");
@@ -117,6 +124,7 @@ public class CliFrontendParser {
LOGGING_OPTION.setRequired(false);
DETACHED_OPTION.setRequired(false);
+ YARN_DETACHED_OPTION.setRequired(false);
ARGS_OPTION.setRequired(false);
ARGS_OPTION.setArgName("programArgs");
@@ -158,6 +166,7 @@ public class CliFrontendParser {
options.addOption(ARGS_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
+ options.addOption(YARN_DETACHED_OPTION);
return options;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 43efc63..e9ed9af 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -40,6 +40,7 @@ public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> {
@Override
public boolean isActive(CommandLine commandLine) {
+ // always active because we can try to read a JobManager address from the config
return true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index df25e67..1acda1b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -38,6 +38,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
/**
* Base class for command line options that refer to a JAR file program.
@@ -112,7 +113,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
}
stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
- detachedMode = line.hasOption(DETACHED_OPTION.getOpt());
+ detachedMode = line.hasOption(DETACHED_OPTION.getOpt()) || line.hasOption(
+ YARN_DETACHED_OPTION.getOpt());
if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index fbaa515..2ef0b2e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -385,7 +385,7 @@ public abstract class ClusterClient<T> {
return run(jobWithJars, parallelism, prog.getSavepointSettings());
}
else if (prog.isUsingInteractiveMode()) {
- log.info("Starting program in interactive mode");
+ log.info("Starting program in interactive mode (detached: {})", isDetached());
final List<URL> libraries;
if (hasUserJarsInClassPath(prog.getAllLibraries())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 4a4f993..a6f676e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -235,7 +235,7 @@ public class RestClusterClient<T> extends ClusterClient<T> implements NewCluster
@Override
public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
- log.info("Submitting job {}.", jobGraph.getJobID());
+ log.info("Submitting job {} (detached: {}).", jobGraph.getJobID(), isDetached());
final CompletableFuture<JobSubmissionResult> jobSubmissionFuture = submitJob(jobGraph);
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index efa6a39..ba51b69 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -144,7 +144,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
// --------------------------------------------------------------------------------------------
- private static void verifyCliFrontend(
+ public static void verifyCliFrontend(
AbstractCustomCommandLine<?> cli,
String[] parameters,
int expectedParallelism,
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 0b05b28..a2d75c9 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -67,6 +67,14 @@ under the License.
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<!-- Needed for the streaming wordcount example -->
<dependency>
<groupId>org.apache.flink</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
new file mode 100644
index 0000000..d6a029f
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendRunWithYarnTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.client.cli.CliFrontendTestBase;
+import org.apache.flink.client.cli.CliFrontendTestUtils;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.util.FakeClusterClient;
+import org.apache.flink.yarn.util.NonDeployingYarnClusterDescriptor;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.apache.flink.client.cli.CliFrontendRunTest.verifyCliFrontend;
+import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
+
+/**
+ * Tests for the RUN command using a {@link org.apache.flink.yarn.cli.FlinkYarnSessionCli} inside
+ * the {@link org.apache.flink.client.cli.CliFrontend}.
+ *
+ * @see org.apache.flink.client.cli.CliFrontendRunTest
+ */
+@RunWith(Parameterized.class)
+public class CliFrontendRunWithYarnTest extends CliFrontendTestBase {
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ @BeforeClass
+ public static void init() {
+ CliFrontendTestUtils.pipeSystemOutToNull();
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ CliFrontendTestUtils.restoreSystemOut();
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ String testJarPath = getTestJarPath("BatchWordCount.jar").getAbsolutePath();
+
+ Configuration configuration = new Configuration();
+ configuration.setString(CoreOptions.MODE, mode);
+ configuration.setString(JobManagerOptions.ADDRESS, "localhost");
+ configuration.setInteger(JobManagerOptions.PORT, 8081);
+
+ FlinkYarnSessionCli yarnCLI = new TestingFlinkYarnSessionCli(
+ configuration,
+ tmp.getRoot().getAbsolutePath(),
+ "y",
+ "yarn");
+
+ // test detached mode
+ {
+ String[] parameters = {"-m", "yarn-cluster", "-yn", "1", "-p", "2", "-d", testJarPath};
+ verifyCliFrontend(yarnCLI, parameters, 2, true, true);
+ }
+
+ // test detached mode
+ {
+ String[] parameters = {"-m", "yarn-cluster", "-yn", "1", "-p", "2", "-yd", testJarPath};
+ verifyCliFrontend(yarnCLI, parameters, 2, true, true);
+ }
+ }
+
+ private static class TestingFlinkYarnSessionCli extends FlinkYarnSessionCli {
+ @SuppressWarnings("unchecked")
+ private final ClusterClient<ApplicationId> clusterClient;
+ private final String configurationDirectory;
+
+ private TestingFlinkYarnSessionCli(
+ Configuration configuration,
+ String configurationDirectory,
+ String shortPrefix,
+ String longPrefix) throws Exception {
+ super(configuration, configurationDirectory, shortPrefix, longPrefix);
+
+ this.clusterClient = new FakeClusterClient(configuration);
+ this.configurationDirectory = configurationDirectory;
+ }
+
+ @Override
+ public AbstractYarnClusterDescriptor createClusterDescriptor(CommandLine commandLine)
+ throws FlinkException {
+ AbstractYarnClusterDescriptor parent = super.createClusterDescriptor(commandLine);
+ return new NonDeployingDetachedYarnClusterDescriptor(
+ parent.getFlinkConfiguration(),
+ (YarnConfiguration) parent.getYarnClient().getConfig(),
+ configurationDirectory,
+ parent.getYarnClient(),
+ clusterClient);
+ }
+ }
+
+ private static class NonDeployingDetachedYarnClusterDescriptor extends NonDeployingYarnClusterDescriptor {
+
+ NonDeployingDetachedYarnClusterDescriptor(
+ Configuration flinkConfiguration,
+ YarnConfiguration yarnConfiguration, String configurationDirectory,
+ YarnClient yarnClient,
+ ClusterClient<ApplicationId> clusterClient) {
+ super(flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient,
+ clusterClient);
+ }
+
+ @Override
+ public ClusterClient<ApplicationId> deployJobCluster(
+ ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) {
+ assertTrue(detached);
+ return super.deployJobCluster(clusterSpecification, jobGraph, true);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java
new file mode 100644
index 0000000..f6f6a71
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/FakeClusterClient.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.util;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Dummy {@link ClusterClient} for testing purposes (extend as needed).
+ */
+public class FakeClusterClient extends ClusterClient<ApplicationId> {
+
+ public FakeClusterClient(Configuration flinkConfig) throws Exception {
+ super(flinkConfig);
+ }
+
+ @Override
+ public void waitForClusterToBeReady() {
+ }
+
+ @Override
+ public String getWebInterfaceURL() {
+ return "";
+ }
+
+ @Override
+ public GetClusterStatusResponse getClusterStatus() {
+ throw new UnsupportedOperationException("Not needed in test.");
+ }
+
+ @Override
+ public List<String> getNewMessages() {
+ throw new UnsupportedOperationException("Not needed in test.");
+ }
+
+ @Override
+ public ApplicationId getClusterId() {
+ throw new UnsupportedOperationException("Not needed in test.");
+ }
+
+ @Override
+ public int getMaxSlots() {
+ return 10;
+ }
+
+ @Override
+ public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
+ return false;
+ }
+
+ @Override
+ public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) {
+ throw new UnsupportedOperationException("Not needed in test.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
new file mode 100644
index 0000000..4916b73
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/NonDeployingYarnClusterDescriptor.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.util;
+
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Dummy {@link AbstractYarnClusterDescriptor} without an actual deployment for tests.
+ */
+public class NonDeployingYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
+
+ private final ClusterClient<ApplicationId> clusterClient;
+
+ public NonDeployingYarnClusterDescriptor(
+ Configuration flinkConfiguration,
+ YarnConfiguration yarnConfiguration,
+ String configurationDirectory,
+ YarnClient yarnClient,
+ ClusterClient<ApplicationId> clusterClient) {
+ super(flinkConfiguration, yarnConfiguration, configurationDirectory, yarnClient, true);
+
+ //noinspection unchecked
+ this.clusterClient = Preconditions.checkNotNull(clusterClient);
+ }
+
+ @Override
+ public String getClusterDescription() {
+ // return parent.getClusterDescription();
+ return "NonDeployingYarnClusterDescriptor";
+ }
+
+ @Override
+ protected ClusterClient<ApplicationId> createYarnClusterClient(
+ AbstractYarnClusterDescriptor descriptor,
+ int numberTaskManagers,
+ int slotsPerTaskManager,
+ ApplicationReport report,
+ Configuration flinkConfiguration,
+ boolean perJobCluster) {
+ return clusterClient;
+ }
+
+ @Override
+ public ClusterClient<ApplicationId> retrieve(ApplicationId clusterId) {
+ return clusterClient;
+ }
+
+ @Override
+ public ClusterClient<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) {
+ return clusterClient;
+ }
+
+ @Override
+ public ClusterClient<ApplicationId> deployJobCluster(
+ ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) {
+ return clusterClient;
+ }
+
+ @Override
+ public void terminateCluster(ApplicationId clusterId) {
+ }
+
+ @Override
+ protected String getYarnSessionClusterEntrypoint() {
+ throw new UnsupportedOperationException("Not needed in test.");
+ }
+
+ @Override
+ protected String getYarnJobClusterEntrypoint() {
+ throw new UnsupportedOperationException("Not needed in test.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 16abffa..fe04662 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn.cli;
import org.apache.flink.client.cli.AbstractCustomCommandLine;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontend;
-import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
@@ -84,6 +83,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.YARN_DETACHED_OPTION;
import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
/**
@@ -127,7 +128,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
private final Option tmMemory;
private final Option container;
private final Option slots;
- private final Option detached;
private final Option zookeeperNamespace;
private final Option help;
@@ -162,9 +162,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
private final YarnConfiguration yarnConfiguration;
- //------------------------------------ Internal fields -------------------------
- private boolean detachedMode = false;
-
public FlinkYarnSessionCli(
Configuration configuration,
String configurationDirectory,
@@ -202,7 +199,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
.valueSeparator()
.desc("use value for given property")
.build();
- detached = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
@@ -218,7 +214,8 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
allOptions.addOption(shipPath);
allOptions.addOption(slots);
allOptions.addOption(dynamicproperties);
- allOptions.addOption(detached);
+ allOptions.addOption(DETACHED_OPTION);
+ allOptions.addOption(YARN_DETACHED_OPTION);
allOptions.addOption(streaming);
allOptions.addOption(name);
allOptions.addOption(applicationId);
@@ -348,8 +345,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
yarnClusterDescriptor.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
- if (cmd.hasOption(detached.getOpt()) || cmd.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt())) {
- this.detachedMode = true;
+ if (cmd.hasOption(YARN_DETACHED_OPTION.getOpt()) || cmd.hasOption(DETACHED_OPTION.getOpt())) {
yarnClusterDescriptor.setDetachedMode(true);
}
@@ -519,7 +515,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
for (Option option : commandLine.getOptions()) {
if (allOptions.hasOption(option.getOpt())) {
- if (!option.getOpt().equals(detached.getOpt())) {
+ if (!isDetachedOption(option)) {
// don't resume from properties file if yarn options have been specified
canApplyYarnProperties = false;
break;
@@ -530,6 +526,10 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
return canApplyYarnProperties;
}
+ private boolean isDetachedOption(Option option) {
+ return option.getOpt().equals(YARN_DETACHED_OPTION.getOpt()) || option.getOpt().equals(DETACHED_OPTION.getOpt());
+ }
+
private Configuration applyYarnProperties(Configuration configuration) throws FlinkException {
final Configuration effectiveConfiguration = new Configuration(configuration);
@@ -621,7 +621,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
}
}
- if (detachedMode) {
+ if (yarnClusterDescriptor.isDetachedMode()) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnApplicationId);
http://git-wip-us.apache.org/repos/asf/flink/blob/fdd1c6ed/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 62110ed..12c0354 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -119,6 +119,25 @@ public class FlinkYarnSessionCliTest extends TestLogger {
}
@Test
+ public void testCorrectSettingOfDetachedMode() throws Exception {
+ String[] params =
+ new String[] {"-yd"};
+
+ FlinkYarnSessionCli yarnCLI = new FlinkYarnSessionCli(
+ new Configuration(),
+ tmp.getRoot().getAbsolutePath(),
+ "y",
+ "yarn");
+
+ final CommandLine commandLine = yarnCLI.parseCommandLineOptions(params, true);
+
+ AbstractYarnClusterDescriptor descriptor = yarnCLI.createClusterDescriptor(commandLine);
+
+ // each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
+ assertTrue(descriptor.isDetachedMode());
+ }
+
+ @Test
public void testZookeeperNamespaceProperty() throws Exception {
String zkNamespaceCliInput = "flink_test_namespace";
[4/4] flink git commit: [hotfix] Deprecate
AbstractYarnClusterDescriptor#setDetachedMode and isDetachedMode
Posted by tr...@apache.org.
[hotfix] Deprecate AbstractYarnClusterDescriptor#setDetachedMode and isDetachedMode
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63561288
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63561288
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63561288
Branch: refs/heads/master
Commit: 6356128865bff7463bf03185d18b129ed3633bc2
Parents: fdd1c6e
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Apr 13 10:41:27 2018 -0700
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 13 10:41:27 2018 -0700
----------------------------------------------------------------------
.../org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 8 ++++++++
1 file changed, 8 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/63561288/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 9d504d3..8538c1f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -313,10 +313,18 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
return false;
}
+ /**
+ * @deprecated The cluster descriptor should not know about this option.
+ */
+ @Deprecated
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
+ /**
+ * @deprecated The cluster descriptor should not know about this option.
+ */
+ @Deprecated
public boolean isDetachedMode() {
return detached;
}
[2/4] flink git commit: [hotfix][tests] fix ineffective file
existence check and remove duplicate code
Posted by tr...@apache.org.
[hotfix][tests] fix ineffective file existence check and remove duplicate code
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca5573b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca5573b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca5573b2
Branch: refs/heads/master
Commit: ca5573b21867930709f094306733bf854d82fda7
Parents: 57b3ce8
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Apr 3 14:13:41 2018 +0200
Committer: Nico Kruber <ni...@data-artisans.com>
Committed: Thu Apr 12 10:40:15 2018 -0700
----------------------------------------------------------------------
.../YARNSessionCapacitySchedulerITCase.java | 18 ++++------
.../flink/yarn/YarnConfigurationITCase.java | 5 ++-
.../apache/flink/yarn/util/YarnTestUtils.java | 36 ++++++++++++++++++++
3 files changed, 44 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5573b2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 3767629..ac8eaa0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -78,6 +78,7 @@ import java.util.regex.Pattern;
import static junit.framework.TestCase.assertTrue;
import static org.apache.flink.yarn.UtilsTest.addTestAppender;
import static org.apache.flink.yarn.UtilsTest.checkForLogString;
+import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
import static org.junit.Assume.assumeTrue;
/**
@@ -123,8 +124,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
public void perJobYarnCluster() throws IOException {
LOG.info("Starting perJobYarnCluster()");
addTestAppender(JobClient.class, Level.INFO);
- File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
- Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
+ File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
runWithArgs(new String[]{"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
@@ -152,8 +152,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
public void perJobYarnClusterOffHeap() throws IOException {
LOG.info("Starting perJobYarnCluster()");
addTestAppender(JobClient.class, Level.INFO);
- File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
- Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
+ File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
// set memory constraints (otherwise this is the same test as perJobYarnCluster() above)
final long taskManagerMemoryMB = 1024;
@@ -394,8 +393,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
// write log messages to stdout as well, so that the runWithArgs() method
// is catching the log output
addTestAppender(JobClient.class, Level.INFO);
- File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
- Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
+ File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
runWithArgs(new String[]{"run",
"-p", "2", //test that the job is executed with a DOP of 2
"-m", "yarn-cluster",
@@ -419,9 +417,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
public void testDetachedPerJobYarnCluster() throws Exception {
LOG.info("Starting testDetachedPerJobYarnCluster()");
- File exampleJarLocation = new File("target/programs/BatchWordCount.jar");
-
- Assert.assertNotNull("Could not find batch wordcount jar", exampleJarLocation);
+ File exampleJarLocation = getTestJarPath("BatchWordCount.jar");
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
@@ -435,9 +431,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
public void testDetachedPerJobYarnClusterWithStreamingJob() throws Exception {
LOG.info("Starting testDetachedPerJobYarnClusterWithStreamingJob()");
- File exampleJarLocation = new File("target/programs/StreamingWordCount.jar");
-
- Assert.assertNotNull("Could not find streaming wordcount jar", exampleJarLocation);
+ File exampleJarLocation = getTestJarPath("StreamingWordCount.jar");
testDetachedPerJobYarnClusterInternal(exampleJarLocation.getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5573b2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index c5d683e..3a2f957 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -56,6 +56,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -100,9 +101,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
- final File streamingWordCountFile = new File("target/programs/WindowJoin.jar");
-
- assertThat(streamingWordCountFile.exists(), is(true));
+ final File streamingWordCountFile = getTestJarPath("WindowJoin.jar");
final PackagedProgram packagedProgram = new PackagedProgram(streamingWordCountFile);
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 1);
http://git-wip-us.apache.org/repos/asf/flink/blob/ca5573b2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
new file mode 100644
index 0000000..25d833b
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/util/YarnTestUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.util;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+/**
+ * Utility methods for YARN tests.
+ */
+public class YarnTestUtils {
+ public static File getTestJarPath(String fileName) throws FileNotFoundException {
+ File f = new File("target/programs/" + fileName);
+ if (!f.exists()) {
+ throw new FileNotFoundException("Test jar " + f.getPath() + " not present. Invoke tests using maven "
+ + "or build the jar using 'mvn process-test-classes' in flink-yarn-tests");
+ }
+ return f;
+ }
+}