You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 07:52:10 UTC

[03/12] flink git commit: [FLINK-9194][history] Rework and extend the HistoryServer test

[FLINK-9194][history] Rework and extend the HistoryServer test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d8cc733
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d8cc733
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d8cc733

Branch: refs/heads/master
Commit: 6d8cc733d45477fc784fb68be01c2b2b4d16cd87
Parents: bb06ba9
Author: zentol <ch...@apache.org>
Authored: Mon Apr 23 15:13:41 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../webmonitor/history/HistoryServerTest.java   | 100 ++++++++++++-------
 1 file changed, 63 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d8cc733/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index a16f6fb..580d80f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -21,25 +21,19 @@ package org.apache.flink.runtime.webmonitor.history;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.ArchiveMessages;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.TestActorRef;
 import org.apache.commons.io.IOUtils;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.Rule;
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
@@ -50,43 +44,67 @@ import java.net.URL;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-
 /**
  * Tests for the HistoryServer.
  */
 public class HistoryServerTest extends TestLogger {
 
-	@Rule
-	public TemporaryFolder tmpDir = new TemporaryFolder();
-
-	@Test
-	public void testFullArchiveLifecycle() throws Exception {
-		ArchivedExecutionGraph graph = (ArchivedExecutionGraph) ArchivedJobGenerationUtils.getTestJob();
-
-		File jmDirectory = tmpDir.newFolder("jm");
-		File hsDirectory = tmpDir.newFolder("hs");
-
-		Configuration config = new Configuration();
-		config.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
+	@ClassRule
+	public static final TemporaryFolder TMP = new TemporaryFolder();
+
+	private MiniClusterResource cluster;
+	private File jmDirectory;
+	private File hsDirectory;
+
+	@Before
+	public void setUp() throws Exception {
+		jmDirectory = TMP.newFolder("jm");
+		hsDirectory = TMP.newFolder("hs");
+
+		Configuration clusterConfig = new Configuration();
+		clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
+
+		cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				clusterConfig,
+				1,
+				1
+			),
+			MiniClusterResource.MiniClusterType.NEW
+		);
+		cluster.before();
+	}
 
-		config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
-		config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
+	@After
+	public void tearDown() {
+		if (cluster != null) {
+			cluster.after();
+		}
+	}
 
-		config.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
+	@Test
+	public void testHistoryServerIntegration() throws Exception {
+		final int numJobs = 2;
+		for (int x = 0; x < numJobs; x++) {
+			runJob();
+		}
 
-		ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(config);
-		Option<Path> archivePath = Option.apply(new Path(jmDirectory.toURI().toString()));
+		CountDownLatch numFinishedPolls = new CountDownLatch(1);
 
-		ActorRef memoryArchivist = TestActorRef.apply(JobManager.getArchiveProps(MemoryArchivist.class, 1, archivePath), actorSystem);
-		memoryArchivist.tell(new ArchiveMessages.ArchiveExecutionGraph(graph.getJobID(), graph), null);
+		Configuration historyServerConfig = new Configuration();
+		historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
+		historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
 
-		File archive = new File(jmDirectory, graph.getJobID().toString());
-		Assert.assertTrue(archive.exists());
+		historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
 
-		CountDownLatch numFinishedPolls = new CountDownLatch(1);
+		// the job is archived asynchronously after env.execute() returns
+		File[] archives = jmDirectory.listFiles();
+		while (archives == null || archives.length != numJobs) {
+			Thread.sleep(50);
+			archives = jmDirectory.listFiles();
+		}
 
-		HistoryServer hs = new HistoryServer(config, numFinishedPolls);
+		HistoryServer hs = new HistoryServer(historyServerConfig, numFinishedPolls);
 		try {
 			hs.start();
 			String baseUrl = "http://localhost:" + hs.getWebPort();
@@ -96,12 +114,20 @@ public class HistoryServerTest extends TestLogger {
 			String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
 			MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class);
 
-			Assert.assertEquals(1, overview.getJobs().size());
+			Assert.assertEquals(numJobs, overview.getJobs().size());
 		} finally {
 			hs.stop();
 		}
 	}
 
+	private static void runJob() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(1, 2, 3)
+			.print();
+
+		env.execute();
+	}
+
 	public static String getFromHTTP(String url) throws Exception {
 		URL u = new URL(url);
 		HttpURLConnection connection = (HttpURLConnection) u.openConnection();