You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/05/13 14:28:58 UTC
[flink] 04/08: [FLINK-11086][yarn] Use YARN_APPLICATION_CLASSPATH
instead of flink-shaded-hadoop fat jar in tests
This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 082061da5ec52c7d4257adc272869c1ecb7fa222
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Mon May 4 10:15:41 2020 +0200
[FLINK-11086][yarn] Use YARN_APPLICATION_CLASSPATH instead of flink-shaded-hadoop fat jar in tests
---
flink-yarn-tests/pom.xml | 76 +++++++++++++---------
.../flink/yarn/YARNFileReplicationITCase.java | 1 -
.../flink/yarn/YARNHighAvailabilityITCase.java | 5 --
.../yarn/YARNSessionCapacitySchedulerITCase.java | 10 +--
.../apache/flink/yarn/YARNSessionFIFOITCase.java | 5 --
.../apache/flink/yarn/YarnConfigurationITCase.java | 1 -
.../flink/yarn/YarnPrioritySchedulingITCase.java | 1 -
.../java/org/apache/flink/yarn/YarnTestBase.java | 24 +++++--
.../flink/yarn/YarnFileStageTestS3ITCase.java | 5 ++
9 files changed, 70 insertions(+), 58 deletions(-)
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 2f5e9f4..a72a988 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -379,39 +379,53 @@ under the License.
<goals>
<goal>copy</goal>
</goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
+ <type>jar</type>
+ <classifier>WordCount</classifier>
+ <overWrite>true</overWrite>
+ <destFileName>BatchWordCount.jar</destFileName>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+ <type>jar</type>
+ <classifier>WordCount</classifier>
+ <overWrite>true</overWrite>
+ <destFileName>StreamingWordCount.jar</destFileName>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+ <type>jar</type>
+ <classifier>WindowJoin</classifier>
+ <overWrite>true</overWrite>
+ <destFileName>WindowJoin.jar</destFileName>
+ </artifactItem>
+ </artifactItems>
+ <outputDirectory>${project.build.directory}/programs</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>true</overWriteSnapshots>
+ </configuration>
+ </execution>
+ <!-- Write classpath of flink-yarn to a file, so that the yarn tests can use it as their classpath
+ for the YARN "containers".
+ -->
+ <execution>
+ <id>store-classpath-in-target-for-tests</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build-classpath</goal>
+ </goals>
+ <configuration>
+ <outputFile>${project.build.directory}/yarn.classpath</outputFile>
+ <excludeGroupIds>org.apache.flink</excludeGroupIds>
+ </configuration>
</execution>
</executions>
- <configuration>
- <artifactItems>
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
- <type>jar</type>
- <classifier>WordCount</classifier>
- <overWrite>true</overWrite>
- <destFileName>BatchWordCount.jar</destFileName>
- </artifactItem>
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
- <type>jar</type>
- <classifier>WordCount</classifier>
- <overWrite>true</overWrite>
- <destFileName>StreamingWordCount.jar</destFileName>
- </artifactItem>
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
- <type>jar</type>
- <classifier>WindowJoin</classifier>
- <overWrite>true</overWrite>
- <destFileName>WindowJoin.jar</destFileName>
- </artifactItem>
- </artifactItems>
- <outputDirectory>${project.build.directory}/programs</outputDirectory>
- <overWriteReleases>false</overWriteReleases>
- <overWriteSnapshots>true</overWriteSnapshots>
- </configuration>
</plugin>
</plugins>
</build>
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
index 993adbf..b2d6da5 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
@@ -83,7 +83,6 @@ public class YARNFileReplicationITCase extends YarnTestBase {
yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
- yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final int masterMemory = yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes();
final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index a8b799a..b3a9269 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -74,7 +74,6 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -160,8 +159,6 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
- yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
-
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
try {
@@ -186,8 +183,6 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
public void testJobRecoversAfterKillingTaskManager() throws Exception {
runTest(() -> {
final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
- yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
-
final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
try {
final JobID jobId = submitJob(restClusterClient);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index c8b6280..490c858 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -155,9 +155,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
public void testStartYarnSessionClusterInQaTeamQueue() throws Exception {
runTest(() -> runWithArgs(new String[]{
"-j", flinkUberjar.getAbsolutePath(),
- "-t", flinkLibFolder.getAbsolutePath(),
- "-t", flinkShadedHadoopDir.getAbsolutePath(),
- "-jm", "768m",
+ "-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m",
"-tm", "1024m", "-qu", "qa-team"},
"JobManager Web Interface:", null, RunTypes.YARN_SESSION, 0));
}
@@ -177,7 +175,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
runWithArgs(new String[]{"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
- "-yt", flinkShadedHadoopDir.getAbsolutePath(),
"-ys", "2", //test that the job is executed with a DOP of 2
"-yjm", "768m",
"-ytm", "1024m", exampleJarLocation.getAbsolutePath()},
@@ -213,7 +210,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
runWithArgs(new String[]{"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
- "-yt", flinkShadedHadoopDir.getAbsolutePath(),
"-ys", "2", //test that the job is executed with a DOP of 2
"-yjm", "768m",
"-ytm", taskManagerMemoryMB + "m",
@@ -251,7 +247,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
final Runner yarnSessionClusterRunner = startWithArgs(new String[]{
"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
- "-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-s", "3", // set the slots 3 to check if the vCores are set properly!
@@ -393,7 +388,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
try {
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
- "-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1);
@@ -420,7 +414,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
"-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
- "-yt", flinkShadedHadoopDir.getAbsolutePath(),
"-ys", "2",
"-yjm", "768m",
"-ytm", "1024m", exampleJarLocation.getAbsolutePath()},
@@ -495,7 +488,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
- "-yt", flinkShadedHadoopDir.getAbsolutePath(),
"-yjm", "768m",
"-yD", YarnConfigOptions.APPLICATION_TAGS.key() + "=test-tag",
"-ytm", "1024m",
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index fbff39d..bb1d98b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -107,9 +107,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
args.add("-t");
args.add(flinkLibFolder.getAbsolutePath());
- args.add("-t");
- args.add(flinkShadedHadoopDir.getAbsolutePath());
-
args.add("-jm");
args.add("768m");
@@ -247,7 +244,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
runWithArgs(new String[]{
"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
- "-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "256m",
"-tm", "1585m"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testResourceComputation()");
@@ -280,7 +276,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
runWithArgs(new String[]{
"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
- "-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "256m",
"-tm", "3840m"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
LOG.info("Finished testfullAlloc()");
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 900abb6..cceb3a0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -102,7 +102,6 @@ public class YarnConfigurationITCase extends YarnTestBase {
clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
- clusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
final File streamingWordCountFile = getTestJarPath("WindowJoin.jar");
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnPrioritySchedulingITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnPrioritySchedulingITCase.java
index 280cd01..16fd866 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnPrioritySchedulingITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnPrioritySchedulingITCase.java
@@ -52,7 +52,6 @@ public class YarnPrioritySchedulingITCase extends YarnTestBase {
final Runner yarnSessionClusterRunner = startWithArgs(new String[]{
"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
- "-t", flinkShadedHadoopDir.getAbsolutePath(),
"-jm", "768m",
"-tm", "1024m",
"-Dyarn.application.priority=" + priority},
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 5334e93..003c7f7 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -74,7 +74,6 @@ import java.io.InputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
-import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -172,7 +171,6 @@ public abstract class YarnTestBase extends TestLogger {
* Temporary folder where Flink configurations will be kept for secure run.
*/
protected static File tempConfPathForSecureRun = null;
- protected static File flinkShadedHadoopDir;
protected static File yarnSiteXML = null;
protected static File hdfsSiteXML = null;
@@ -197,6 +195,24 @@ public abstract class YarnTestBase extends TestLogger {
// so we have to change the number of cores for testing.
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
YARN_CONFIGURATION.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 99.0F);
+
+ YARN_CONFIGURATION.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, getYarnClasspath());
+ }
+
+ /**
+ * Searches for the yarn.classpath file generated by the "dependency:build-classpath" maven plugin in
+ * "flink-yarn".
+ * @return a classpath suitable for running all YARN-launched JVMs
+ */
+ private static String getYarnClasspath() {
+ final String start = "../flink-yarn-tests";
+ try {
+ File classPathFile = findFile(start, (dir, name) -> name.equals("yarn.classpath"));
+ return FileUtils.readFileToString(classPathFile); // potential NPE is supposed to be fatal
+ } catch (Throwable t) {
+ LOG.error("Error while getting YARN classpath in {}", new File(start).getAbsoluteFile(), t);
+ throw new RuntimeException("Error while getting YARN classpath", t);
+ }
}
public static void populateYarnSecureConfigurations(Configuration conf, String principal, String keytab) {
@@ -431,7 +447,7 @@ public abstract class YarnTestBase extends TestLogger {
if (!whitelistedFound) {
// logging in FATAL to see the actual message in TRAVIS tests.
Marker fatal = MarkerFactory.getMarker("FATAL");
- LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile);
+ LOG.error(fatal, "Prohibited String '{}' in '{}:{}'", aProhibited, f.getAbsolutePath(), lineFromFile);
StringBuilder logExcerpt = new StringBuilder();
@@ -637,8 +653,6 @@ public abstract class YarnTestBase extends TestLogger {
Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/
- // the hadoop jar was copied into the target/shaded-hadoop directory during the build
- flinkShadedHadoopDir = Paths.get("target/shaded-hadoop").toFile();
Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder);
Assert.assertTrue("lib folder not found", flinkLibFolder.exists());
Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory());
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
index 11d48bd..bfde0c9 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
@@ -29,7 +29,9 @@ import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.testutils.s3.S3TestCredentials;
import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.util.VersionUtil;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -174,6 +176,9 @@ public class YarnFileStageTestS3ITCase extends TestLogger {
@Test
@RetryOnFailure(times = 3)
public void testRecursiveUploadForYarnS3n() throws Exception {
+ // skip test on Hadoop 3: https://issues.apache.org/jira/browse/HADOOP-14738
+ Assume.assumeTrue("This test is skipped for Hadoop versions above 3", VersionUtil.compareVersions(System.getProperty("hadoop.version"), "3.0.0") < 0);
+
try {
Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
} catch (ClassNotFoundException e) {