You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 07:52:10 UTC
[03/12] flink git commit: [FLINK-9194][history] Rework and extend the
HistoryServer test
[FLINK-9194][history] Rework and extend the HistoryServer test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d8cc733
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d8cc733
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d8cc733
Branch: refs/heads/master
Commit: 6d8cc733d45477fc784fb68be01c2b2b4d16cd87
Parents: bb06ba9
Author: zentol <ch...@apache.org>
Authored: Mon Apr 23 15:13:41 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200
----------------------------------------------------------------------
.../webmonitor/history/HistoryServerTest.java | 100 ++++++++++++-------
1 file changed, 63 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6d8cc733/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index a16f6fb..580d80f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -21,25 +21,19 @@ package org.apache.flink.runtime.webmonitor.history;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.ArchiveMessages;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.TestActorRef;
import org.apache.commons.io.IOUtils;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.Rule;
+import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -50,43 +44,67 @@ import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import scala.Option;
-
/**
* Tests for the HistoryServer.
*/
public class HistoryServerTest extends TestLogger {
- @Rule
- public TemporaryFolder tmpDir = new TemporaryFolder();
-
- @Test
- public void testFullArchiveLifecycle() throws Exception {
- ArchivedExecutionGraph graph = (ArchivedExecutionGraph) ArchivedJobGenerationUtils.getTestJob();
-
- File jmDirectory = tmpDir.newFolder("jm");
- File hsDirectory = tmpDir.newFolder("hs");
-
- Configuration config = new Configuration();
- config.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
+ @ClassRule
+ public static final TemporaryFolder TMP = new TemporaryFolder();
+
+ private MiniClusterResource cluster;
+ private File jmDirectory;
+ private File hsDirectory;
+
+ @Before
+ public void setUp() throws Exception {
+ jmDirectory = TMP.newFolder("jm");
+ hsDirectory = TMP.newFolder("hs");
+
+ Configuration clusterConfig = new Configuration();
+ clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
+
+ cluster = new MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ clusterConfig,
+ 1,
+ 1
+ ),
+ MiniClusterResource.MiniClusterType.NEW
+ );
+ cluster.before();
+ }
- config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
- config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.after();
+ }
+ }
- config.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
+ @Test
+ public void testHistoryServerIntegration() throws Exception {
+ final int numJobs = 2;
+ for (int x = 0; x < numJobs; x++) {
+ runJob();
+ }
- ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(config);
- Option<Path> archivePath = Option.apply(new Path(jmDirectory.toURI().toString()));
+ CountDownLatch numFinishedPolls = new CountDownLatch(1);
- ActorRef memoryArchivist = TestActorRef.apply(JobManager.getArchiveProps(MemoryArchivist.class, 1, archivePath), actorSystem);
- memoryArchivist.tell(new ArchiveMessages.ArchiveExecutionGraph(graph.getJobID(), graph), null);
+ Configuration historyServerConfig = new Configuration();
+ historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
+ historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
- File archive = new File(jmDirectory, graph.getJobID().toString());
- Assert.assertTrue(archive.exists());
+ historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
- CountDownLatch numFinishedPolls = new CountDownLatch(1);
+ // the job is archived asynchronously after env.execute() returns
+ File[] archives = jmDirectory.listFiles();
+ while (archives == null || archives.length != numJobs) {
+ Thread.sleep(50);
+ archives = jmDirectory.listFiles();
+ }
- HistoryServer hs = new HistoryServer(config, numFinishedPolls);
+ HistoryServer hs = new HistoryServer(historyServerConfig, numFinishedPolls);
try {
hs.start();
String baseUrl = "http://localhost:" + hs.getWebPort();
@@ -96,12 +114,20 @@ public class HistoryServerTest extends TestLogger {
String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class);
- Assert.assertEquals(1, overview.getJobs().size());
+ Assert.assertEquals(numJobs, overview.getJobs().size());
} finally {
hs.stop();
}
}
+ private static void runJob() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.fromElements(1, 2, 3)
+ .print();
+
+ env.execute();
+ }
+
public static String getFromHTTP(String url) throws Exception {
URL u = new URL(url);
HttpURLConnection connection = (HttpURLConnection) u.openConnection();