You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/10/16 13:12:54 UTC

[flink] branch release-1.8 updated (d5285d5 -> 4c4dc3d)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d5285d5  [FLINK-14215][docs] Add how to configure environment variables to documentation
     new 5501858  [hotfix][hs] Deduplicate variables
     new 5ec9627  [hotfix][hs] Clarify write access to webJobDir
     new f44f598  [FLINK-14337][hs] Prevent NPE on corrupt archives
     new 4c4dc3d  [FLINK-14337][hs] Only mark archives as processed on success

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../history/HistoryServerArchiveFetcher.java          | 10 ++++------
 .../apache/flink/runtime/history/FsJobArchivist.java  | 19 ++++++++++++-------
 2 files changed, 16 insertions(+), 13 deletions(-)


[flink] 01/04: [hotfix][hs] Deduplicate variables

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5501858245cfe8700f9ca5f081da13fb4f73154d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Oct 8 11:26:55 2019 +0200

    [hotfix][hs] Deduplicate variables
---
 .../flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index fed220f..4cd8b89 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -152,7 +152,6 @@ class HistoryServerArchiveFetcher {
 					if (jobArchives == null) {
 						continue;
 					}
-					boolean updateOverview = false;
 					int numFetchedArchives = 0;
 					for (FileStatus jobArchive : jobArchives) {
 						Path jobArchivePath = jobArchive.getPath();
@@ -200,7 +199,6 @@ class HistoryServerArchiveFetcher {
 										fw.flush();
 									}
 								}
-								updateOverview = true;
 								numFetchedArchives++;
 							} catch (IOException e) {
 								LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
@@ -223,7 +221,7 @@ class HistoryServerArchiveFetcher {
 							}
 						}
 					}
-					if (updateOverview) {
+					if (numFetchedArchives > 0) {
 						updateJobOverview(webOverviewDir, webDir);
 						for (int x = 0; x < numFetchedArchives; x++) {
 							numArchivedJobs.countDown();


[flink] 03/04: [FLINK-14337][hs] Prevent NPE on corrupt archives

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f44f598c472d470a99a6916870a30066d08657cc
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Oct 8 10:48:39 2019 +0200

    [FLINK-14337][hs] Prevent NPE on corrupt archives
---
 .../apache/flink/runtime/history/FsJobArchivist.java  | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
index d0fbc5e..ab1e34d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
@@ -109,15 +109,20 @@ public class FsJobArchivist {
 			ByteArrayOutputStream output = new ByteArrayOutputStream()) {
 			IOUtils.copyBytes(input, output);
 
-			JsonNode archive = mapper.readTree(output.toByteArray());
+			try {
+				JsonNode archive = mapper.readTree(output.toByteArray());
 
-			Collection<ArchivedJson> archives = new ArrayList<>();
-			for (JsonNode archivePart : archive.get(ARCHIVE)) {
-				String path = archivePart.get(PATH).asText();
-				String json = archivePart.get(JSON).asText();
-				archives.add(new ArchivedJson(path, json));
+				Collection<ArchivedJson> archives = new ArrayList<>();
+				for (JsonNode archivePart : archive.get(ARCHIVE)) {
+					String path = archivePart.get(PATH).asText();
+					String json = archivePart.get(JSON).asText();
+					archives.add(new ArchivedJson(path, json));
+				}
+				return archives;
+			} catch (NullPointerException npe) {
+				// occurs if the archive is empty or any of the expected fields are not present
+				throw new IOException("Job archive (" + file.getPath() + ") did not conform to expected format.");
 			}
-			return archives;
 		}
 	}
 }


[flink] 02/04: [hotfix][hs] Clarify write access to webJobDir

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ec962775d196eece26317283c2438b05fe78211
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Oct 8 11:32:59 2019 +0200

    [hotfix][hs] Clarify write access to webJobDir
---
 .../flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 4cd8b89..6f52cef 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -176,6 +176,7 @@ class HistoryServerArchiveFetcher {
 										json = convertLegacyJobOverview(json);
 										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
 									} else {
+										// this implicitly writes into webJobDir
 										target = new File(webDir, path + JSON_FILE_ENDING);
 									}
 


[flink] 04/04: [FLINK-14337][hs] Only mark archives as processed on success

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4c4dc3d0e72d0b5277af9440e4a3168a88555403
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Tue Oct 8 11:27:34 2019 +0200

    [FLINK-14337][hs] Only mark archives as processed on success
---
 .../runtime/webmonitor/history/HistoryServerArchiveFetcher.java      | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 6f52cef..79add41f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -163,7 +163,7 @@ class HistoryServerArchiveFetcher {
 								refreshDir, jobID, iae);
 							continue;
 						}
-						if (cachedArchives.add(jobID)) {
+						if (!cachedArchives.contains(jobID)) {
 							try {
 								for (ArchivedJson archive : FsJobArchivist.getArchivedJsons(jobArchive.getPath())) {
 									String path = archive.getPath();
@@ -200,11 +200,10 @@ class HistoryServerArchiveFetcher {
 										fw.flush();
 									}
 								}
+								cachedArchives.add(jobID);
 								numFetchedArchives++;
 							} catch (IOException e) {
 								LOG.error("Failure while fetching/processing job archive for job {}.", jobID, e);
-								// Make sure we attempt to fetch the archive again
-								cachedArchives.remove(jobID);
 								// Make sure we do not include this job in the overview
 								try {
 									Files.delete(new File(webOverviewDir, jobID + JSON_FILE_ENDING).toPath());