You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/11 17:13:23 UTC

[GitHub] tillrohrmann commented on a change in pull request #6953: [FLINK-10558] [yarn-test] Port YARNHighAvailabilityITCase to new code…

tillrohrmann commented on a change in pull request #6953: [FLINK-10558] [yarn-test] Port YARNHighAvailabilityITCase to new code…
URL: https://github.com/apache/flink/pull/6953#discussion_r246423635
 
 

 ##########
 File path: flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 ##########
 @@ -18,198 +18,207 @@
 
 package org.apache.flink.yarn;
 
-import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.util.YarnTestUtils;
+
+import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.Arrays;
+import java.text.NumberFormat;
+import java.util.EnumSet;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.junit.Assume.assumeTrue;
-
 /**
  * Tests that verify correct HA behavior.
  */
 public class YARNHighAvailabilityITCase extends YarnTestBase {
 
-	private static TestingServer zkServer;
+	private static final Logger LOG = LoggerFactory.getLogger(YARNHighAvailabilityITCase.class);
 
-	private static ActorSystem actorSystem;
+	@ClassRule
+	public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
-	private static final int numberApplicationAttempts = 3;
+	private static final String LOG_DIR = "flink-yarn-tests-ha";
+	private static final NumberFormat FORMAT = NumberFormat.getInstance();
+	private static final Pattern PATTERN = Pattern.compile("(Source|Sink).*switched from DEPLOYING to RUNNING");
+	private static final FiniteDuration TIMEOUT = FiniteDuration.apply(200000L, TimeUnit.MILLISECONDS);
 
-	@Rule
-	public TemporaryFolder temp = new TemporaryFolder();
+	private static TestingServer zkServer;
+	private static String storageDir;
+	private static String zkQuorum;
 
 	@BeforeClass
 	public static void setup() {
-		actorSystem = AkkaUtils.createDefaultActorSystem();
-
 		try {
 			zkServer = new TestingServer();
 			zkServer.start();
+
+			storageDir = FOLDER.newFolder().getAbsolutePath();
+			zkQuorum = zkServer.getConnectString();
 		} catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail("Could not start ZooKeeper testing cluster.");
+			Assert.fail("Cannot start ZooKeeper Server.");
 		}
 
-		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
-		YARN_CONFIGURATION.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts);
+		FORMAT.setGroupingUsed(false);
+		FORMAT.setMinimumIntegerDigits(4);
 
+		// startYARNWithConfig should be implemented by subclass
+		YARN_CONFIGURATION.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
+		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, LOG_DIR);
+		YARN_CONFIGURATION.setInt(YarnConfiguration.NM_PMEM_MB, 4096);
 		startYARNWithConfig(YARN_CONFIGURATION);
 	}
 
 	@AfterClass
 	public static void teardown() throws Exception {
 		if (zkServer != null) {
 			zkServer.stop();
+			zkServer = null;
 		}
-
-		JavaTestKit.shutdownActorSystem(actorSystem);
-		actorSystem = null;
 	}
 
-	/**
-	 * Tests that the application master can be killed multiple times and that the surviving
-	 * TaskManager successfully reconnects to the newly started JobManager.
-	 * @throws Exception
-	 */
 	@Test
-	public void testMultipleAMKill() throws Exception {
-		assumeTrue("This test only works with the old actor based code.", !isNewMode);
-		final int numberKillingAttempts = numberApplicationAttempts - 1;
-		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
-		final Configuration configuration = GlobalConfiguration.loadConfiguration();
-		TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(
-			configuration,
-			getYarnConfiguration(),
-			confDirPath,
-			getYarnClient(),
-			true);
-
-		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
-		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
-		String fsStateHandlePath = temp.getRoot().getPath();
-
-		// load the configuration
-		File configDirectory = new File(confDirPath);
-		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
-
-		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
-			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
-			"@@" + CheckpointingOptions.STATE_BACKEND.key() + "=FILESYSTEM" +
-			"@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" +
-			"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
-
-		ClusterClient<ApplicationId> yarnClusterClient = null;
-
-		final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
-
-		HighAvailabilityServices highAvailabilityServices = null;
-
-		final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
-			.setMasterMemoryMB(768)
-			.setTaskManagerMemoryMB(1024)
-			.setNumberTaskManagers(1)
-			.setSlotsPerTaskManager(1)
-			.createClusterSpecification();
+	public void testKillJobManager() throws Exception {
+		final Runner clusterRunner = startWithArgs(
+			new String[]{
+				"-j", flinkUberjar.getAbsolutePath(),
+				"-t", flinkLibFolder.getAbsolutePath(),
+				"-n", "2",
+				"-jm", "768",
+				"-tm", "1024",
+				"-s", "1",
+				"-nm", "test-cluster",
+				"-D" + TaskManagerOptions.MANAGED_MEMORY_SIZE.key() + "=128",
+				"-D" + YarnConfigOptions.APPLICATION_ATTEMPTS.key() + "=10",
+				"-D" + HighAvailabilityOptions.HA_MODE.key() + "=ZOOKEEPER",
+				"-D" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + storageDir,
+				"-D" + HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM.key() + "=" + zkQuorum,
+				"-D" + ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY.key() + "=3 s",
+				"-D" + ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS + "=10",
+				"--detached"
+			},
+			"Flink JobManager is now running",
+			RunTypes.YARN_SESSION);
+
+		// before checking any strings outputted by the CLI, first give it time to return
+		clusterRunner.join();
+
+		// actually run a program, otherwise we wouldn't necessarily see any TaskManagers
+		// be brought up
+		final File testingJar =
+			YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
+		final String job = "org.apache.flink.yarn.testjob.StreamCase";
+
+		Runner jobRunner = startWithArgs(new String[]{"run",
+			"--detached",
+			"-c", job,
+			testingJar.getAbsolutePath(),
+			"-yD", HighAvailabilityOptions.HA_MODE.key() + "=ZOOKEEPER",
+			"-yD", HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + storageDir,
+			"-yD", HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM.key() + "=" + zkQuorum,
+		}, "Job has been submitted with JobID", RunTypes.CLI_FRONTEND);
+
+		jobRunner.join();
+
+		while (getRunningContainers() < 3) {
+			sleep(500);
+		}
 
-		try {
-			yarnClusterClient = flinkYarnClient.deploySessionCluster(clusterSpecification);
-
-			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				yarnClusterClient.getFlinkConfiguration(),
-				Executors.directExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-			final HighAvailabilityServices finalHighAvailabilityServices = highAvailabilityServices;
-
-			new JavaTestKit(actorSystem) {{
-				for (int attempt = 0; attempt < numberKillingAttempts; attempt++) {
-					new Within(timeout) {
-						@Override
-						protected void run() {
-							try {
-								ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-									finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-									actorSystem,
-									timeout);
-								ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-								gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
-
-								expectMsgEquals(Acknowledge.get());
-
-								gateway.tell(PoisonPill.getInstance());
-							} catch (Exception e) {
-								throw new AssertionError("Could not complete test.", e);
-							}
-						}
-					};
+		final YarnClient yarnClient = getYarnClient();
+		Assert.assertNotNull(yarnClient);
+
+		Assert.assertEquals(1, yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).size());
+		final ApplicationReport report1 = yarnClient.getApplications(EnumSet.of(YarnApplicationState.RUNNING)).get(0);
+		Assert.assertEquals(1, report1.getCurrentApplicationAttemptId().getAttemptId());
+
+		final ApplicationId id = report1.getApplicationId();
+
+		waitUntilCondition(
+			() -> {
+				final File jmLog = findFile("..", (dir, name) ->
+					name.contains("jobmanager.log") && dir.getAbsolutePath().contains("_01_")
+						&& dir.getAbsolutePath().contains(LOG_DIR)
+						&& dir.getAbsolutePath().contains(FORMAT.format(id.getId())));
+				if (jmLog != null) {
+					final String jmLogText = FileUtils.readFileToString(jmLog);
+					final Matcher m = PATTERN.matcher(jmLogText);
+					// match 4 times, all vertices running
+					return m.find() && m.find() && m.find() && m.find();
 				}
+				return false;
+			}, TIMEOUT.fromNow());
 
-				new Within(timeout) {
-					@Override
-					protected void run() {
-						try {
-							ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-								finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-								actorSystem,
-								timeout);
-							ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-							gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
-
-							expectMsgEquals(Acknowledge.get());
-						} catch (Exception e) {
-							throw new AssertionError("Could not complete test.", e);
-						}
-					}
-				};
-
-			}};
-		} finally {
-			if (yarnClusterClient != null) {
-				log.info("Shutting down the Flink Yarn application.");
-				yarnClusterClient.shutDownCluster();
-				yarnClusterClient.shutdown();
-			}
+		Runtime.getRuntime().exec(new String[]{
+			"/bin/sh", "-c", "kill $(ps aux | grep -v bash | grep jobmanager | grep -v grep | FS=' \\t' awk '{print $2}')"
 
 Review comment:
   This might make the test fail on Windows. Would be good to check it. Maybe @zentol could help us with that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services