You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/02 11:54:02 UTC

[flink] branch release-1.7 updated (1ce2efd -> b219890)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 1ce2efd  [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators (#8323)
     new 0706791  [FLINK-12184][hs] HistoryServerArchiveFetcher incompatible with old version
     new b219890  [hotfix][tests] Prevent HistoryServerTest to print to STDOUT

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../history/HistoryServerArchiveFetcher.java       | 24 +++++++++++++++---
 .../webmonitor/history/HistoryServerTest.java      | 29 ++++++++++++++++++----
 2 files changed, 44 insertions(+), 9 deletions(-)


[flink] 02/02: [hotfix][tests] Prevent HistoryServerTest to print to STDOUT

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b21989074509675c27a8fd9b7b29db9233eaee9d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Apr 30 15:29:28 2019 +0200

    [hotfix][tests] Prevent HistoryServerTest to print to STDOUT
---
 .../apache/flink/runtime/webmonitor/history/HistoryServerTest.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 58f8f2b..03bcfa8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 
@@ -145,8 +146,7 @@ public class HistoryServerTest extends TestLogger {
 
 	private static void runJob() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.fromElements(1, 2, 3)
-			.print();
+		env.fromElements(1, 2, 3).addSink(new DiscardingSink<>());
 
 		env.execute();
 	}


[flink] 01/02: [FLINK-12184][hs] HistoryServerArchiveFetcher incompatible with old version

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 070679124406c6223064956fad05b778a0fa9f47
Author: klion26 <qc...@gmail.com>
AuthorDate: Mon Apr 29 23:54:05 2019 +0800

    [FLINK-12184][hs] HistoryServerArchiveFetcher incompatible with old version
    
    This closes #8313.
---
 .../history/HistoryServerArchiveFetcher.java       | 24 +++++++++++++++++----
 .../webmonitor/history/HistoryServerTest.java      | 25 +++++++++++++++++++---
 2 files changed, 42 insertions(+), 7 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index e6a189d..ad6863f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -248,7 +248,23 @@ class HistoryServerArchiveFetcher {
 
 		JsonNode tasks = job.get("tasks");
 		int numTasks = tasks.get("total").asInt();
-		int pending = tasks.get("pending").asInt();
+		JsonNode pendingNode = tasks.get("pending");
+		// for flink version < 1.4 we have pending field,
+		// when version >= 1.4 pending has been split into scheduled, deploying, and created.
+		boolean versionLessThan14 = pendingNode != null;
+		int created = 0;
+		int scheduled;
+		int deploying = 0;
+
+		if (versionLessThan14) {
+			// pending is a mix of CREATED/SCHEDULED/DEPLOYING
+			// to maintain the correct number of task states we pick SCHEDULED
+			scheduled = pendingNode.asInt();
+		} else {
+			created = tasks.get("created").asInt();
+			scheduled = tasks.get("scheduled").asInt();
+			deploying = tasks.get("deploying").asInt();
+		}
 		int running = tasks.get("running").asInt();
 		int finished = tasks.get("finished").asInt();
 		int canceling = tasks.get("canceling").asInt();
@@ -256,9 +272,9 @@ class HistoryServerArchiveFetcher {
 		int failed = tasks.get("failed").asInt();
 
 		int[] tasksPerState = new int[ExecutionState.values().length];
-		// pending is a mix of CREATED/SCHEDULED/DEPLOYING
-		// to maintain the correct number of task states we have to pick one of them
-		tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending;
+		tasksPerState[ExecutionState.CREATED.ordinal()] = created;
+		tasksPerState[ExecutionState.SCHEDULED.ordinal()] = scheduled;
+		tasksPerState[ExecutionState.DEPLOYING.ordinal()] = deploying;
 		tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
 		tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
 		tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index fce2b6d..58f8f2b 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -41,6 +41,8 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
@@ -49,6 +51,8 @@ import java.io.StringWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -58,6 +62,7 @@ import static org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGene
 /**
  * Tests for the HistoryServer.
  */
+@RunWith(Parameterized.class)
 public class HistoryServerTest extends TestLogger {
 
 	@ClassRule
@@ -67,10 +72,18 @@ public class HistoryServerTest extends TestLogger {
 	private File jmDirectory;
 	private File hsDirectory;
 
+	@Parameterized.Parameters(name = "Flink version less than 1.4: {0}")
+	public static Collection<Boolean> parameters() {
+		return Arrays.asList(true, false);
+	}
+
+	@Parameterized.Parameter
+	public static boolean versionLessThan14;
+
 	@Before
 	public void setUp() throws Exception {
-		jmDirectory = TMP.newFolder("jm");
-		hsDirectory = TMP.newFolder("hs");
+		jmDirectory = TMP.newFolder("jm_" + versionLessThan14);
+		hsDirectory = TMP.newFolder("hs_" + versionLessThan14);
 
 		Configuration clusterConfig = new Configuration();
 		clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
@@ -174,7 +187,13 @@ public class HistoryServerTest extends TestLogger {
 						try (JsonObject tasks = new JsonObject(gen, "tasks")) {
 							gen.writeNumberField("total", 0);
 
-							gen.writeNumberField("pending", 0);
+							if (versionLessThan14) {
+								gen.writeNumberField("pending", 0);
+							} else {
+								gen.writeNumberField("created", 0);
+								gen.writeNumberField("deploying", 0);
+								gen.writeNumberField("scheduled", 0);
+							}
 							gen.writeNumberField("running", 0);
 							gen.writeNumberField("finished", 0);
 							gen.writeNumberField("canceling", 0);