You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/02 11:52:18 UTC

[flink] branch master updated (b909ecf -> bf08853)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from b909ecf  [hotfix][travis] Enable flink-runtime on Java 9
     new b561a5e  [FLINK-12184][hs] HistoryServerArchiveFetcher incompatible with old version
     new bf08853  [hotfix][tests] Prevent HistoryServerTest to print to STDOUT

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../history/HistoryServerArchiveFetcher.java       | 24 +++++++++++++++---
 .../webmonitor/history/HistoryServerTest.java      | 29 ++++++++++++++++++----
 2 files changed, 44 insertions(+), 9 deletions(-)


[flink] 01/02: [FLINK-12184][hs] HistoryServerArchiveFetcher incompatible with old version

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b561a5eb8fdb19af77d0a3ca3d115b11fbeaba9b
Author: klion26 <qc...@gmail.com>
AuthorDate: Mon Apr 29 23:54:05 2019 +0800

    [FLINK-12184][hs] HistoryServerArchiveFetcher incompatible with old version
    
    This closes #8313.
---
 .../history/HistoryServerArchiveFetcher.java       | 24 +++++++++++++++++----
 .../webmonitor/history/HistoryServerTest.java      | 25 +++++++++++++++++++---
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 86da704..f95b14c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -248,7 +248,23 @@ class HistoryServerArchiveFetcher {
 
 		JsonNode tasks = job.get("tasks");
 		int numTasks = tasks.get("total").asInt();
-		int pending = tasks.get("pending").asInt();
+		JsonNode pendingNode = tasks.get("pending");
+		// for flink version < 1.4 we have pending field,
+		// when version >= 1.4 pending has been split into scheduled, deploying, and created.
+		boolean versionLessThan14 = pendingNode != null;
+		int created = 0;
+		int scheduled;
+		int deploying = 0;
+
+		if (versionLessThan14) {
+			// pending is a mix of CREATED/SCHEDULED/DEPLOYING
+			// to maintain the correct number of task states we pick SCHEDULED
+			scheduled = pendingNode.asInt();
+		} else {
+			created = tasks.get("created").asInt();
+			scheduled = tasks.get("scheduled").asInt();
+			deploying = tasks.get("deploying").asInt();
+		}
 		int running = tasks.get("running").asInt();
 		int finished = tasks.get("finished").asInt();
 		int canceling = tasks.get("canceling").asInt();
@@ -256,9 +272,9 @@ class HistoryServerArchiveFetcher {
 		int failed = tasks.get("failed").asInt();
 
 		int[] tasksPerState = new int[ExecutionState.values().length];
-		// pending is a mix of CREATED/SCHEDULED/DEPLOYING
-		// to maintain the correct number of task states we have to pick one of them
-		tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending;
+		tasksPerState[ExecutionState.CREATED.ordinal()] = created;
+		tasksPerState[ExecutionState.SCHEDULED.ordinal()] = scheduled;
+		tasksPerState[ExecutionState.DEPLOYING.ordinal()] = deploying;
 		tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
 		tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
 		tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 2a05a88..f1a45e0 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -42,6 +42,8 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
@@ -50,6 +52,8 @@ import java.io.StringWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -57,6 +61,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * Tests for the HistoryServer.
  */
+@RunWith(Parameterized.class)
 public class HistoryServerTest extends TestLogger {
 
 	@ClassRule
@@ -70,10 +75,18 @@ public class HistoryServerTest extends TestLogger {
 	private File jmDirectory;
 	private File hsDirectory;
 
+	@Parameterized.Parameters(name = "Flink version less than 1.4: {0}")
+	public static Collection<Boolean> parameters() {
+		return Arrays.asList(true, false);
+	}
+
+	@Parameterized.Parameter
+	public static boolean versionLessThan14;
+
 	@Before
 	public void setUp() throws Exception {
-		jmDirectory = TMP.newFolder("jm");
-		hsDirectory = TMP.newFolder("hs");
+		jmDirectory = TMP.newFolder("jm_" + versionLessThan14);
+		hsDirectory = TMP.newFolder("hs_" + versionLessThan14);
 
 		Configuration clusterConfig = new Configuration();
 		clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
@@ -177,7 +190,13 @@ public class HistoryServerTest extends TestLogger {
 						try (JsonObject tasks = new JsonObject(gen, "tasks")) {
 							gen.writeNumberField("total", 0);
 
-							gen.writeNumberField("pending", 0);
+							if (versionLessThan14) {
+								gen.writeNumberField("pending", 0);
+							} else {
+								gen.writeNumberField("created", 0);
+								gen.writeNumberField("deploying", 0);
+								gen.writeNumberField("scheduled", 0);
+							}
 							gen.writeNumberField("running", 0);
 							gen.writeNumberField("finished", 0);
 							gen.writeNumberField("canceling", 0);


[flink] 02/02: [hotfix][tests] Prevent HistoryServerTest to print to STDOUT

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bf0885307414493091e983f4bc57069212e75ca3
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Apr 30 15:29:28 2019 +0200

    [hotfix][tests] Prevent HistoryServerTest to print to STDOUT
---
 .../apache/flink/runtime/webmonitor/history/HistoryServerTest.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index f1a45e0..a1ace56 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
@@ -148,8 +149,7 @@ public class HistoryServerTest extends TestLogger {
 
 	private static void runJob() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.fromElements(1, 2, 3)
-			.print();
+		env.fromElements(1, 2, 3).addSink(new DiscardingSink<>());
 
 		env.execute();
 	}