You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/02 11:52:19 UTC

[flink] 01/02: [FLINK-12184][hs] HistoryServerArchiveFetcher incompatible with old version

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b561a5eb8fdb19af77d0a3ca3d115b11fbeaba9b
Author: klion26 <qc...@gmail.com>
AuthorDate: Mon Apr 29 23:54:05 2019 +0800

    [FLINK-12184][hs] HistoryServerArchiveFetcher incompatible with old version
    
    This closes #8313.
---
 .../history/HistoryServerArchiveFetcher.java       | 24 +++++++++++++++++----
 .../webmonitor/history/HistoryServerTest.java      | 25 +++++++++++++++++++---
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 86da704..f95b14c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -248,7 +248,23 @@ class HistoryServerArchiveFetcher {
 
 		JsonNode tasks = job.get("tasks");
 		int numTasks = tasks.get("total").asInt();
-		int pending = tasks.get("pending").asInt();
+		JsonNode pendingNode = tasks.get("pending");
+		// for flink version < 1.4 we have pending field,
+		// when version >= 1.4 pending has been split into scheduled, deploying, and created.
+		boolean versionLessThan14 = pendingNode != null;
+		int created = 0;
+		int scheduled;
+		int deploying = 0;
+
+		if (versionLessThan14) {
+			// pending is a mix of CREATED/SCHEDULED/DEPLOYING
+			// to maintain the correct number of task states we pick SCHEDULED
+			scheduled = pendingNode.asInt();
+		} else {
+			created = tasks.get("created").asInt();
+			scheduled = tasks.get("scheduled").asInt();
+			deploying = tasks.get("deploying").asInt();
+		}
 		int running = tasks.get("running").asInt();
 		int finished = tasks.get("finished").asInt();
 		int canceling = tasks.get("canceling").asInt();
@@ -256,9 +272,9 @@ class HistoryServerArchiveFetcher {
 		int failed = tasks.get("failed").asInt();
 
 		int[] tasksPerState = new int[ExecutionState.values().length];
-		// pending is a mix of CREATED/SCHEDULED/DEPLOYING
-		// to maintain the correct number of task states we have to pick one of them
-		tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending;
+		tasksPerState[ExecutionState.CREATED.ordinal()] = created;
+		tasksPerState[ExecutionState.SCHEDULED.ordinal()] = scheduled;
+		tasksPerState[ExecutionState.DEPLOYING.ordinal()] = deploying;
 		tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
 		tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
 		tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 2a05a88..f1a45e0 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -42,6 +42,8 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
@@ -50,6 +52,8 @@ import java.io.StringWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -57,6 +61,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * Tests for the HistoryServer.
  */
+@RunWith(Parameterized.class)
 public class HistoryServerTest extends TestLogger {
 
 	@ClassRule
@@ -70,10 +75,18 @@ public class HistoryServerTest extends TestLogger {
 	private File jmDirectory;
 	private File hsDirectory;
 
+	@Parameterized.Parameters(name = "Flink version less than 1.4: {0}")
+	public static Collection<Boolean> parameters() {
+		return Arrays.asList(true, false);
+	}
+
+	@Parameterized.Parameter
+	public static boolean versionLessThan14;
+
 	@Before
 	public void setUp() throws Exception {
-		jmDirectory = TMP.newFolder("jm");
-		hsDirectory = TMP.newFolder("hs");
+		jmDirectory = TMP.newFolder("jm_" + versionLessThan14);
+		hsDirectory = TMP.newFolder("hs_" + versionLessThan14);
 
 		Configuration clusterConfig = new Configuration();
 		clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
@@ -177,7 +190,13 @@ public class HistoryServerTest extends TestLogger {
 						try (JsonObject tasks = new JsonObject(gen, "tasks")) {
 							gen.writeNumberField("total", 0);
 
-							gen.writeNumberField("pending", 0);
+							if (versionLessThan14) {
+								gen.writeNumberField("pending", 0);
+							} else {
+								gen.writeNumberField("created", 0);
+								gen.writeNumberField("deploying", 0);
+								gen.writeNumberField("scheduled", 0);
+							}
 							gen.writeNumberField("running", 0);
 							gen.writeNumberField("finished", 0);
 							gen.writeNumberField("canceling", 0);