You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2020/05/13 14:28:58 UTC

[flink] 04/08: [FLINK-11086][yarn] Use YARN_APPLICATION_CLASSPATH instead of flink-shaded-hadoop fat jar in tests

This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 082061da5ec52c7d4257adc272869c1ecb7fa222
Author: Robert Metzger <rm...@apache.org>
AuthorDate: Mon May 4 10:15:41 2020 +0200

    [FLINK-11086][yarn] Use YARN_APPLICATION_CLASSPATH instead of flink-shaded-hadoop fat jar in tests
---
 flink-yarn-tests/pom.xml                           | 76 +++++++++++++---------
 .../flink/yarn/YARNFileReplicationITCase.java      |  1 -
 .../flink/yarn/YARNHighAvailabilityITCase.java     |  5 --
 .../yarn/YARNSessionCapacitySchedulerITCase.java   | 10 +--
 .../apache/flink/yarn/YARNSessionFIFOITCase.java   |  5 --
 .../apache/flink/yarn/YarnConfigurationITCase.java |  1 -
 .../flink/yarn/YarnPrioritySchedulingITCase.java   |  1 -
 .../java/org/apache/flink/yarn/YarnTestBase.java   | 24 +++++--
 .../flink/yarn/YarnFileStageTestS3ITCase.java      |  5 ++
 9 files changed, 70 insertions(+), 58 deletions(-)

diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index 2f5e9f4..a72a988 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -379,39 +379,53 @@ under the License.
 						<goals>
 							<goal>copy</goal>
 						</goals>
+						<configuration>
+							<artifactItems>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
+									<type>jar</type>
+									<classifier>WordCount</classifier>
+									<overWrite>true</overWrite>
+									<destFileName>BatchWordCount.jar</destFileName>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+									<type>jar</type>
+									<classifier>WordCount</classifier>
+									<overWrite>true</overWrite>
+									<destFileName>StreamingWordCount.jar</destFileName>
+								</artifactItem>
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
+									<type>jar</type>
+									<classifier>WindowJoin</classifier>
+									<overWrite>true</overWrite>
+									<destFileName>WindowJoin.jar</destFileName>
+								</artifactItem>
+							</artifactItems>
+							<outputDirectory>${project.build.directory}/programs</outputDirectory>
+							<overWriteReleases>false</overWriteReleases>
+							<overWriteSnapshots>true</overWriteSnapshots>
+						</configuration>
+					</execution>
+					<!-- Write classpath of flink-yarn to a file, so that the yarn tests can use it as their classpath
+						for the YARN "containers".
+					-->
+					<execution>
+						<id>store-classpath-in-target-for-tests</id>
+						<phase>package</phase>
+						<goals>
+							<goal>build-classpath</goal>
+						</goals>
+						<configuration>
+							<outputFile>${project.build.directory}/yarn.classpath</outputFile>
+							<excludeGroupIds>org.apache.flink</excludeGroupIds>
+						</configuration>
 					</execution>
 				</executions>
-				<configuration>
-					<artifactItems>
-						<artifactItem>
-							<groupId>org.apache.flink</groupId>
-							<artifactId>flink-examples-batch_${scala.binary.version}</artifactId>
-							<type>jar</type>
-							<classifier>WordCount</classifier>
-							<overWrite>true</overWrite>
-							<destFileName>BatchWordCount.jar</destFileName>
-						</artifactItem>
-						<artifactItem>
-							<groupId>org.apache.flink</groupId>
-							<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
-							<type>jar</type>
-							<classifier>WordCount</classifier>
-							<overWrite>true</overWrite>
-							<destFileName>StreamingWordCount.jar</destFileName>
-						</artifactItem>
-						<artifactItem>
-							<groupId>org.apache.flink</groupId>
-							<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
-							<type>jar</type>
-							<classifier>WindowJoin</classifier>
-							<overWrite>true</overWrite>
-							<destFileName>WindowJoin.jar</destFileName>
-						</artifactItem>
-					</artifactItems>
-					<outputDirectory>${project.build.directory}/programs</outputDirectory>
-					<overWriteReleases>false</overWriteReleases>
-					<overWriteSnapshots>true</overWriteSnapshots>
-				</configuration>
 			</plugin>
 		</plugins>
 	</build>
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
index 993adbf..b2d6da5 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNFileReplicationITCase.java
@@ -83,7 +83,6 @@ public class YARNFileReplicationITCase extends YarnTestBase {
 
 			yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
 			yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-			yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
 
 			final int masterMemory = yarnClusterDescriptor.getFlinkConfiguration().get(JobManagerOptions.TOTAL_PROCESS_MEMORY).getMebiBytes();
 			final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index a8b799a..b3a9269 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -74,7 +74,6 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -160,8 +159,6 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 				OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
 
 			final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
-			yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
-
 			final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
 
 			try {
@@ -186,8 +183,6 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 	public void testJobRecoversAfterKillingTaskManager() throws Exception {
 		runTest(() -> {
 			final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
-			yarnClusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
-
 			final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
 			try {
 				final JobID jobId = submitJob(restClusterClient);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index c8b6280..490c858 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -155,9 +155,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	public void testStartYarnSessionClusterInQaTeamQueue() throws Exception {
 		runTest(() -> runWithArgs(new String[]{
 				"-j", flinkUberjar.getAbsolutePath(),
-				"-t", flinkLibFolder.getAbsolutePath(),
-				"-t", flinkShadedHadoopDir.getAbsolutePath(),
-				"-jm", "768m",
+				"-t", flinkLibFolder.getAbsolutePath(), "-jm", "768m",
 				"-tm", "1024m", "-qu", "qa-team"},
 			"JobManager Web Interface:", null, RunTypes.YARN_SESSION, 0));
 	}
@@ -177,7 +175,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			runWithArgs(new String[]{"run", "-m", "yarn-cluster",
 					"-yj", flinkUberjar.getAbsolutePath(),
 					"-yt", flinkLibFolder.getAbsolutePath(),
-					"-yt", flinkShadedHadoopDir.getAbsolutePath(),
 					"-ys", "2", //test that the job is executed with a DOP of 2
 					"-yjm", "768m",
 					"-ytm", "1024m", exampleJarLocation.getAbsolutePath()},
@@ -213,7 +210,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			runWithArgs(new String[]{"run", "-m", "yarn-cluster",
 					"-yj", flinkUberjar.getAbsolutePath(),
 					"-yt", flinkLibFolder.getAbsolutePath(),
-					"-yt", flinkShadedHadoopDir.getAbsolutePath(),
 					"-ys", "2", //test that the job is executed with a DOP of 2
 					"-yjm", "768m",
 					"-ytm", taskManagerMemoryMB + "m",
@@ -251,7 +247,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			final Runner yarnSessionClusterRunner = startWithArgs(new String[]{
 					"-j", flinkUberjar.getAbsolutePath(),
 					"-t", flinkLibFolder.getAbsolutePath(),
-					"-t", flinkShadedHadoopDir.getAbsolutePath(),
 					"-jm", "768m",
 					"-tm", "1024m",
 					"-s", "3", // set the slots 3 to check if the vCores are set properly!
@@ -393,7 +388,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			try {
 				runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 					"-t", flinkLibFolder.getAbsolutePath(),
-					"-t", flinkShadedHadoopDir.getAbsolutePath(),
 					"-jm", "768m",
 					"-tm", "1024m",
 					"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1);
@@ -420,7 +414,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 					"-m", "yarn-cluster",
 					"-yj", flinkUberjar.getAbsolutePath(),
 					"-yt", flinkLibFolder.getAbsolutePath(),
-					"-yt", flinkShadedHadoopDir.getAbsolutePath(),
 					"-ys", "2",
 					"-yjm", "768m",
 					"-ytm", "1024m", exampleJarLocation.getAbsolutePath()},
@@ -495,7 +488,6 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"run", "-m", "yarn-cluster",
 				"-yj", flinkUberjar.getAbsolutePath(),
 				"-yt", flinkLibFolder.getAbsolutePath(),
-				"-yt", flinkShadedHadoopDir.getAbsolutePath(),
 				"-yjm", "768m",
 				"-yD", YarnConfigOptions.APPLICATION_TAGS.key() + "=test-tag",
 				"-ytm", "1024m",
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index fbff39d..bb1d98b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -107,9 +107,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			args.add("-t");
 			args.add(flinkLibFolder.getAbsolutePath());
 
-			args.add("-t");
-			args.add(flinkShadedHadoopDir.getAbsolutePath());
-
 			args.add("-jm");
 			args.add("768m");
 
@@ -247,7 +244,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			runWithArgs(new String[]{
 				"-j", flinkUberjar.getAbsolutePath(),
 				"-t", flinkLibFolder.getAbsolutePath(),
-				"-t", flinkShadedHadoopDir.getAbsolutePath(),
 				"-jm", "256m",
 				"-tm", "1585m"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
 			LOG.info("Finished testResourceComputation()");
@@ -280,7 +276,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			runWithArgs(new String[]{
 				"-j", flinkUberjar.getAbsolutePath(),
 				"-t", flinkLibFolder.getAbsolutePath(),
-				"-t", flinkShadedHadoopDir.getAbsolutePath(),
 				"-jm", "256m",
 				"-tm", "3840m"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
 			LOG.info("Finished testfullAlloc()");
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 900abb6..cceb3a0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -102,7 +102,6 @@ public class YarnConfigurationITCase extends YarnTestBase {
 
 			clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
 			clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-			clusterDescriptor.addShipFiles(Arrays.asList(flinkShadedHadoopDir.listFiles()));
 
 			final File streamingWordCountFile = getTestJarPath("WindowJoin.jar");
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnPrioritySchedulingITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnPrioritySchedulingITCase.java
index 280cd01..16fd866 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnPrioritySchedulingITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnPrioritySchedulingITCase.java
@@ -52,7 +52,6 @@ public class YarnPrioritySchedulingITCase extends YarnTestBase {
 			final Runner yarnSessionClusterRunner = startWithArgs(new String[]{
 					"-j", flinkUberjar.getAbsolutePath(),
 					"-t", flinkLibFolder.getAbsolutePath(),
-					"-t", flinkShadedHadoopDir.getAbsolutePath(),
 					"-jm", "768m",
 					"-tm", "1024m",
 					"-Dyarn.application.priority=" + priority},
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 5334e93..003c7f7 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -74,7 +74,6 @@ import java.io.InputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
 import java.io.PrintStream;
-import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -172,7 +171,6 @@ public abstract class YarnTestBase extends TestLogger {
 	 * Temporary folder where Flink configurations will be kept for secure run.
 	 */
 	protected static File tempConfPathForSecureRun = null;
-	protected static File flinkShadedHadoopDir;
 
 	protected static File yarnSiteXML = null;
 	protected static File hdfsSiteXML = null;
@@ -197,6 +195,24 @@ public abstract class YarnTestBase extends TestLogger {
 		// so we have to change the number of cores for testing.
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
 		YARN_CONFIGURATION.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 99.0F);
+
+		YARN_CONFIGURATION.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, getYarnClasspath());
+	}
+
+	/**
+	 * Searches for the yarn.classpath file generated by the "dependency:build-classpath" maven plugin in
+	 * "flink-yarn".
+	 * @return a classpath suitable for running all YARN-launched JVMs
+	 */
+	private static String getYarnClasspath() {
+		final String start = "../flink-yarn-tests";
+		try {
+			File classPathFile = findFile(start, (dir, name) -> name.equals("yarn.classpath"));
+			return FileUtils.readFileToString(classPathFile); // potential NPE is supposed to be fatal
+		} catch (Throwable t) {
+			LOG.error("Error while getting YARN classpath in {}", new File(start).getAbsoluteFile(), t);
+			throw new RuntimeException("Error while getting YARN classpath", t);
+		}
 	}
 
 	public static void populateYarnSecureConfigurations(Configuration conf, String principal, String keytab) {
@@ -431,7 +447,7 @@ public abstract class YarnTestBase extends TestLogger {
 							if (!whitelistedFound) {
 								// logging in FATAL to see the actual message in TRAVIS tests.
 								Marker fatal = MarkerFactory.getMarker("FATAL");
-								LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile);
+								LOG.error(fatal, "Prohibited String '{}' in '{}:{}'", aProhibited, f.getAbsolutePath(), lineFromFile);
 
 								StringBuilder logExcerpt = new StringBuilder();
 
@@ -637,8 +653,6 @@ public abstract class YarnTestBase extends TestLogger {
 		Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
 		String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
 		flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/
-		// the hadoop jar was copied into the target/shaded-hadoop directory during the build
-		flinkShadedHadoopDir = Paths.get("target/shaded-hadoop").toFile();
 		Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder);
 		Assert.assertTrue("lib folder not found", flinkLibFolder.exists());
 		Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory());
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
index 11d48bd..bfde0c9 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTestS3ITCase.java
@@ -29,7 +29,9 @@ import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.testutils.s3.S3TestCredentials;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.hadoop.util.VersionUtil;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -174,6 +176,9 @@ public class YarnFileStageTestS3ITCase extends TestLogger {
 	@Test
 	@RetryOnFailure(times = 3)
 	public void testRecursiveUploadForYarnS3n() throws Exception {
+		// skip test on Hadoop 3: https://issues.apache.org/jira/browse/HADOOP-14738
+		Assume.assumeTrue("This test is skipped for Hadoop versions above 3", VersionUtil.compareVersions(System.getProperty("hadoop.version"), "3.0.0") < 0);
+
 		try {
 			Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
 		} catch (ClassNotFoundException e) {