You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/05/15 07:52:08 UTC

[01/12] flink git commit: [hotfix] Add import for linked component in ArchivedJson

Repository: flink
Updated Branches:
  refs/heads/master bcd028d75 -> 4de72bbee


[hotfix] Add import for linked component in ArchivedJson


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd37feb4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd37feb4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd37feb4

Branch: refs/heads/master
Commit: cd37feb4d3fb75a8f9dabf228c22216efd9ac636
Parents: 6958380
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 19:14:06 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/webmonitor/history/ArchivedJson.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd37feb4/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
index 9200248..a9ddce5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.history;
 
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.util.Preconditions;


[07/12] flink git commit: [FLINK-9194][history] Add archiving routine to Dispatcher

Posted by tr...@apache.org.
[FLINK-9194][history] Add archiving routine to Dispatcher


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd374b83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd374b83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd374b83

Branch: refs/heads/master
Commit: fd374b832f830adfa59b7f834b11c38080486f1c
Parents: 6b6603f
Author: zentol <ch...@apache.org>
Authored: Wed Apr 18 14:33:04 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 43 +++++++++++++++++++-
 .../runtime/dispatcher/MiniDispatcher.java      |  5 ++-
 .../dispatcher/StandaloneDispatcher.java        |  7 +++-
 .../runtime/entrypoint/ClusterEntrypoint.java   |  7 +++-
 .../entrypoint/JobClusterEntrypoint.java        |  5 ++-
 .../entrypoint/SessionClusterEntrypoint.java    |  7 +++-
 .../flink/runtime/history/FsJobArchivist.java   | 42 +++++++++++++++++++
 .../flink/runtime/minicluster/MiniCluster.java  |  3 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java  | 22 +++++++++-
 .../runtime/dispatcher/DispatcherTest.java      |  1 +
 .../runtime/dispatcher/MiniDispatcherTest.java  |  1 +
 11 files changed, 132 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 58ffda3..82b9291 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -32,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -61,6 +64,8 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -124,6 +129,12 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	@Nullable
 	protected final String restAddress;
 
+	@Nullable
+	private final JsonArchivist jsonArchivist;
+
+	@Nullable
+	private final Path archivePath;
+
 	private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
 
 	public Dispatcher(
@@ -140,7 +151,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 		super(rpcService, endpointId);
 
 		this.configuration = Preconditions.checkNotNull(configuration);
@@ -165,6 +177,22 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		this.restAddress = restAddress;
 
+		this.jsonArchivist = jsonArchivist;
+
+		String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
+		if (configuredArchivePath != null) {
+			Path tmpArchivePath = null;
+			try {
+				tmpArchivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
+			} catch (Exception e) {
+				log.warn("Failed to validate specified archive directory in '{}'. " +
+					"Jobs will not be archived for the HistoryServer.", configuredArchivePath, e);
+			}
+			archivePath = tmpArchivePath;
+		} else {
+			archivePath = null;
+		}
+
 		this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
 
 		this.jobManagerRunnerFactory = Preconditions.checkNotNull(jobManagerRunnerFactory);
@@ -621,6 +649,19 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				e);
 		}
 
+		try {
+			if (jsonArchivist != null && archivePath != null) {
+				FsJobArchivist.archiveJob(archivePath, archivedExecutionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(archivedExecutionGraph));
+				log.info("Archived job {} to {}", archivedExecutionGraph.getJobID(), archivePath);
+			}
+		} catch (IOException e) {
+			log.info(
+				"Could not archive completed job {}({}).",
+				archivedExecutionGraph.getJobName(),
+				archivedExecutionGraph.getJobID(),
+				e);
+		}
+
 		final JobID jobId = archivedExecutionGraph.getJobID();
 
 		removeJob(jobId, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 4361e08..38e74fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -71,6 +72,7 @@ public class MiniDispatcher extends Dispatcher {
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist,
 			JobGraph jobGraph,
 			JobClusterEntrypoint.ExecutionMode executionMode) throws Exception {
 		super(
@@ -87,7 +89,8 @@ public class MiniDispatcher extends Dispatcher {
 			archivedExecutionGraphStore,
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
-			restAddress);
+			restAddress,
+			jsonArchivist);
 
 		this.executionMode = checkNotNull(executionMode);
 		this.jobTerminationFuture = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 52ac7a0..5c6a7ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import javax.annotation.Nullable;
 
@@ -50,7 +51,8 @@ public class StandaloneDispatcher extends Dispatcher {
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 		super(
 			rpcService,
 			endpointId,
@@ -65,6 +67,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			archivedExecutionGraphStore,
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
-			restAddress);
+			restAddress,
+			jsonArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f823ea7..933add8 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -357,7 +358,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				metricRegistry.getMetricQueryServicePath(),
 				archivedExecutionGraphStore,
 				this,
-				webMonitorEndpoint.getRestBaseUrl());
+				webMonitorEndpoint.getRestBaseUrl(),
+				webMonitorEndpoint);
 
 			LOG.debug("Starting ResourceManager.");
 			resourceManager.start();
@@ -657,7 +659,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		@Nullable String metricQueryServicePath,
 		ArchivedExecutionGraphStore archivedExecutionGraphStore,
 		FatalErrorHandler fatalErrorHandler,
-		@Nullable String restAddress) throws Exception;
+		@Nullable String restAddress,
+		@Nullable JsonArchivist jsonArchivist) throws Exception;
 
 	protected abstract ResourceManager<?> createResourceManager(
 		Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index ea7cbe2..2a5b8ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.FlinkException;
@@ -100,7 +101,8 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			@Nullable String metricQueryServicePath,
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 
 		final JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -122,6 +124,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			restAddress,
+			jsonArchivist,
 			jobGraph,
 			executionMode);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index fcab796..85446eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
@@ -114,7 +115,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			@Nullable String metricQueryServicePath,
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
-			@Nullable String restAddress) throws Exception {
+			@Nullable String restAddress,
+			@Nullable JsonArchivist jsonArchivist) throws Exception {
 
 		// create the default dispatcher
 		return new StandaloneDispatcher(
@@ -130,6 +132,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			archivedExecutionGraphStore,
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
-			restAddress);
+			restAddress,
+			jsonArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
index 3a6ea4f..1cfbf96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.history;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -67,7 +68,9 @@ public class FsJobArchivist {
 	 * @param graph  graph to archive
 	 * @return path to where the archive was written, or null if no archive was created
 	 * @throws IOException
+	 * @deprecated only kept for legacy reasons
 	 */
+	@Deprecated
 	public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException {
 		try {
 			FileSystem fs = rootPath.getFileSystem();
@@ -100,6 +103,45 @@ public class FsJobArchivist {
 	}
 
 	/**
+	 * Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by
+	 * {@link JobManagerOptions#ARCHIVE_DIR}.
+	 *
+	 * @param rootPath directory to which the archive should be written to
+	 * @param jobId  job id
+	 * @param jsonToArchive collection of json-path pairs to that should be archived
+	 * @return path to where the archive was written, or null if no archive was created
+	 * @throws IOException
+	 */
+	public static Path archiveJob(Path rootPath, JobID jobId, Collection<ArchivedJson> jsonToArchive) throws IOException {
+		try {
+			FileSystem fs = rootPath.getFileSystem();
+			Path path = new Path(rootPath, jobId.toString());
+			OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE);
+
+			try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) {
+				gen.writeStartObject();
+				gen.writeArrayFieldStart(ARCHIVE);
+				for (ArchivedJson archive : jsonToArchive) {
+					gen.writeStartObject();
+					gen.writeStringField(PATH, archive.getPath());
+					gen.writeStringField(JSON, archive.getJson());
+					gen.writeEndObject();
+				}
+				gen.writeEndArray();
+				gen.writeEndObject();
+			} catch (Exception e) {
+				fs.delete(path, false);
+				throw e;
+			}
+			LOG.info("Job {} has been archived at {}.", jobId, path);
+			return path;
+		} catch (IOException e) {
+			LOG.error("Failed to archive job.", e);
+			throw e;
+		}
+	}
+
+	/**
 	 * Reads the given archive file and returns a {@link Collection} of contained {@link ArchivedJson}.
 	 *
 	 * @param file archive to extract

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index a86aa2e..09f8bf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -370,7 +370,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					new MemoryArchivedExecutionGraphStore(),
 					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 					new ShutDownFatalErrorHandler(),
-					dispatcherRestEndpoint.getRestBaseUrl());
+					dispatcherRestEndpoint.getRestBaseUrl(),
+					dispatcherRestEndpoint);
 
 				dispatcher.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5cb57d3..f9b2fa8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -113,6 +114,8 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHead
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.ExceptionUtils;
@@ -126,6 +129,7 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.UUID;
@@ -137,7 +141,7 @@ import java.util.concurrent.Executor;
  *
  * @param <T> type of the leader gateway
  */
-public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender {
+public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender, JsonArchivist {
 
 	protected final GatewayRetriever<? extends T> leaderRetriever;
 	protected final Configuration clusterConfiguration;
@@ -157,6 +161,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 
 	private boolean hasWebUI = false;
 
+	private final Collection<JsonArchivist> archivingHandlers = new ArrayList<>(16);
+
 	public WebMonitorEndpoint(
 			RestServerEndpointConfiguration endpointConfiguration,
 			GatewayRetriever<? extends T> leaderRetriever,
@@ -667,6 +673,11 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler));
 		handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler));
 
+		handlers.stream()
+			.map(tuple -> tuple.f1)
+			.filter(handler -> handler instanceof JsonArchivist)
+			.forEachOrdered(handler -> archivingHandlers.add((JsonArchivist) handler));
+
 		return handlers;
 	}
 
@@ -756,4 +767,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		fatalErrorHandler.onFatalError(exception);
 	}
 
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		Collection<ArchivedJson> archivedJson = new ArrayList<>(archivingHandlers.size());
+		for (JsonArchivist archivist : archivingHandlers) {
+			Collection<ArchivedJson> subArchive = archivist.archiveJsonWithPath(graph);
+			archivedJson.addAll(subArchive);
+		}
+		return archivedJson;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 18a8ec1..d9482e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -567,6 +567,7 @@ public class DispatcherTest extends TestLogger {
 				archivedExecutionGraphStore,
 				jobManagerRunnerFactory,
 				fatalErrorHandler,
+				null,
 				null);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd374b83/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 6dfb243..157fca7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -258,6 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
 			testingJobManagerRunnerFactory,
 			testingFatalErrorHandler,
 			null,
+			null,
 			jobGraph,
 			executionMode);
 	}


[05/12] flink git commit: [hotfix][history] Read/Write MultipleJobsDetails instead of manual JSON

Posted by tr...@apache.org.
[hotfix][history] Read/Write MultipleJobsDetails instead of manual JSON


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b6603f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b6603f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b6603f9

Branch: refs/heads/master
Commit: 6b6603f91ac17ab02a7147d96a81297067245ac9
Parents: 2cef5fd
Author: zentol <ch...@apache.org>
Authored: Tue Apr 24 10:26:32 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../history/HistoryServerArchiveFetcher.java     | 19 ++++++++-----------
 .../webmonitor/history/HistoryServerTest.java    |  8 +++-----
 2 files changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b6603f9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 413473b..ac19197 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -24,6 +24,8 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -31,7 +33,6 @@ import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.slf4j.Logger;
@@ -42,6 +43,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -233,21 +236,15 @@ class HistoryServerArchiveFetcher {
 	 */
 	private static void updateJobOverview(File webOverviewDir, File webDir) {
 		try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) {
-			gen.writeStartObject();
-			gen.writeArrayFieldStart("jobs");
-
 			File[] overviews = new File(webOverviewDir.getPath()).listFiles();
 			if (overviews != null) {
+				Collection<JobDetails> allJobs = new ArrayList<>(overviews.length);
 				for (File overview : overviews) {
-					JsonNode root = mapper.readTree(overview);
-					JsonNode finished = root.get("jobs");
-					JsonNode job = finished.get(0);
-					mapper.writeTree(gen, job);
+					MultipleJobsDetails subJobs = mapper.readValue(overview, MultipleJobsDetails.class);
+					allJobs.addAll(subJobs.getJobs());
 				}
+				mapper.writeValue(gen, new MultipleJobsDetails(allJobs));
 			}
-
-			gen.writeEndArray();
-			gen.writeEndObject();
 		} catch (IOException ioe) {
 			LOG.error("Failed to update job overview.", ioe);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b6603f9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index de63b43..a16f6fb 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -27,11 +27,11 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.ArchiveMessages;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import akka.actor.ActorRef;
@@ -94,11 +94,9 @@ public class HistoryServerTest extends TestLogger {
 
 			ObjectMapper mapper = new ObjectMapper();
 			String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
-			JsonNode overview = mapper.readTree(response);
+			MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class);
 
-			String jobID = overview.get("jobs").get(0).get("jid").asText();
-			JsonNode jobDetails = mapper.readTree(getFromHTTP(baseUrl + "/jobs/" + jobID));
-			Assert.assertNotNull(jobDetails.get("jid"));
+			Assert.assertEquals(1, overview.getJobs().size());
 		} finally {
 			hs.stop();
 		}


[12/12] flink git commit: [FLINK-9304] Timer service shutdown should not stop if interrupted

Posted by tr...@apache.org.
[FLINK-9304] Timer service shutdown should not stop if interrupted

This closes #5962.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4e03689
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4e03689
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4e03689

Branch: refs/heads/master
Commit: f4e03689dd5fef8eafeb0996a31ea021c5ea2203
Parents: d734032
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon May 7 11:55:35 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 00:23:10 2018 +0200

----------------------------------------------------------------------
 .../runtime/tasks/ProcessingTimeService.java    |  11 ++
 .../streaming/runtime/tasks/StreamTask.java     |  44 +++---
 .../tasks/SystemProcessingTimeService.java      |  32 +++++
 .../tasks/TestProcessingTimeService.java        |   6 +
 .../tasks/SystemProcessingTimeServiceTest.java  | 133 ++++++++++++++-----
 5 files changed, 168 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 2516299..4515ce2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -96,6 +96,17 @@ public abstract class ProcessingTimeService {
 	public abstract void shutdownService();
 
 	/**
+	 * Shuts down and clean up the timer service provider hard and immediately. This does not wait
+	 * for any timer to complete. Any further call to {@link #registerTimer(long, ProcessingTimeCallback)}
+	 * will result in a hard exception. This call cannot be interrupted and will block until the shutdown is completed
+	 * or the timeout is exceeded.
+	 *
+	 * @param timeoutMs timeout for blocking on the service shutdown in milliseconds.
+	 * @return returns true iff the shutdown was completed.
+	 */
+	public abstract boolean shutdownServiceUninterruptible(long timeoutMs);
+
+	/**
 	 * Shuts down and clean up the timer service provider hard and immediately. This does wait
 	 * for all timers to complete or until the time limit is exceeded. Any call to
 	 * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method.

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6790949..2cc8886 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -73,7 +73,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -348,30 +347,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			// clean up everything we initialized
 			isRunning = false;
 
-			// clear the interrupted status so that we can wait for the following resource shutdowns to complete
-			Thread.interrupted();
-
 			// stop all timers and threads
-			if (timerService != null && !timerService.isTerminated()) {
-				try {
-
-					final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
-						getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
-
-					// wait for a reasonable time for all pending timer threads to finish
-					boolean timerShutdownComplete =
-						timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS);
-
-					if (!timerShutdownComplete) {
-						LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
-							"timers. Will continue with shutdown procedure.", timeoutMs);
-					}
-				}
-				catch (Throwable t) {
-					// catch and log the exception to not replace the original exception
-					LOG.error("Could not shut down timer service", t);
-				}
-			}
+			tryShutdownTimerService();
 
 			// stop all asynchronous checkpoint threads
 			try {
@@ -706,6 +683,25 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
+	private void tryShutdownTimerService() {
+
+		if (timerService != null && !timerService.isTerminated()) {
+
+			try {
+				final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration().
+					getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
+
+				if (!timerService.shutdownServiceUninterruptible(timeoutMs)) {
+					LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " +
+						"timers. Will continue with shutdown procedure.", timeoutMs);
+				}
+			} catch (Throwable t) {
+				// catch and log the exception to not replace the original exception
+				LOG.error("Could not shut down timer service", t);
+			}
+		}
+	}
+
 	private void checkpointState(
 			CheckpointMetaData checkpointMetaData,
 			CheckpointOptions checkpointOptions,

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index be8b23c..4e4208f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -18,10 +18,15 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Delayed;
@@ -41,6 +46,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SystemProcessingTimeService extends ProcessingTimeService {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);
+
 	private static final int STATUS_ALIVE = 0;
 	private static final int STATUS_QUIESCED = 1;
 	private static final int STATUS_SHUTDOWN = 2;
@@ -197,6 +204,31 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 		return timerService.awaitTermination(time, timeUnit);
 	}
 
+	@Override
+	public boolean shutdownServiceUninterruptible(long timeoutMs) {
+
+		final Deadline deadline = Deadline.fromNow(Duration.ofMillis(timeoutMs));
+
+		boolean shutdownComplete = false;
+		boolean receivedInterrupt = false;
+
+		do {
+			try {
+				// wait for a reasonable time for all pending timer threads to finish
+				shutdownComplete = shutdownAndAwaitPending(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			} catch (InterruptedException iex) {
+				receivedInterrupt = true;
+				LOG.trace("Intercepted attempt to interrupt timer service shutdown.", iex);
+			}
+		} while (deadline.hasTimeLeft() && !shutdownComplete);
+
+		if (receivedInterrupt) {
+			Thread.currentThread().interrupt();
+		}
+
+		return shutdownComplete;
+	}
+
 	// safety net to destroy the thread pool
 	@Override
 	protected void finalize() throws Throwable {

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 2081f19..f4a5f37 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -135,6 +135,12 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 	}
 
 	@Override
+	public boolean shutdownServiceUninterruptible(long timeoutMs) {
+		shutdownService();
+		return true;
+	}
+
+	@Override
 	public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
 		shutdownService();
 		return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 01fd778..cfcaf72 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -449,41 +450,11 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 	public void testShutdownAndWaitPending() {
 
 		final Object lock = new Object();
-		final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
-		final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch();
 		final OneShotLatch blockUntilTriggered = new OneShotLatch();
-		final AtomicBoolean check = new AtomicBoolean(true);
-
-		final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
-			(message, exception) -> {
-			},
-			lock);
-
-		timeService.scheduleAtFixedRate(
-			timestamp -> {
-
-				waitUntilTimerStarted.trigger();
-
-				try {
-					blockUntilTerminationInterrupts.await();
-					check.set(false);
-				} catch (InterruptedException ignore) {
-				}
-
-				try {
-					blockUntilTriggered.await();
-				} catch (InterruptedException ignore) {
-					check.set(false);
-				}
-			},
-			0L,
-			10L);
+		final AtomicBoolean timerExecutionFinished = new AtomicBoolean(false);
 
-		try {
-			waitUntilTimerStarted.await();
-		} catch (InterruptedException e) {
-			Assert.fail();
-		}
+		final SystemProcessingTimeService timeService =
+			createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerExecutionFinished);
 
 		Assert.assertFalse(timeService.isTerminated());
 
@@ -504,7 +475,101 @@ public class SystemProcessingTimeServiceTest extends TestLogger {
 			Assert.fail("Unexpected interruption.");
 		}
 
-		Assert.assertTrue(check.get());
+		Assert.assertTrue(timerExecutionFinished.get());
+		Assert.assertTrue(timeService.isTerminated());
+	}
+
+	@Test
+	public void testShutdownServiceUninterruptible() {
+		final Object lock = new Object();
+		final OneShotLatch blockUntilTriggered = new OneShotLatch();
+		final AtomicBoolean timerFinished = new AtomicBoolean(false);
+
+		final SystemProcessingTimeService timeService =
+			createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerFinished);
+
+		Assert.assertFalse(timeService.isTerminated());
+
+		final Thread interruptTarget = Thread.currentThread();
+		final AtomicBoolean runInterrupts = new AtomicBoolean(true);
+		final Thread interruptCallerThread = new Thread(() -> {
+			while (runInterrupts.get()) {
+				interruptTarget.interrupt();
+				try {
+					Thread.sleep(1);
+				} catch (InterruptedException ignore) {
+				}
+			}
+		});
+
+		interruptCallerThread.start();
+
+		final long timeoutMs = 50L;
+		final long startTime = System.nanoTime();
+		Assert.assertFalse(timeService.isTerminated());
+		// check that termination did not succeed (because of blocking timer execution)
+		Assert.assertFalse(timeService.shutdownServiceUninterruptible(timeoutMs));
+		// check that termination flag was set.
 		Assert.assertTrue(timeService.isTerminated());
+		// check that the blocked timer is still in flight.
+		Assert.assertFalse(timerFinished.get());
+		// check that we waited until timeout
+		Assert.assertTrue((System.nanoTime() - startTime) >= (1_000_000L * timeoutMs));
+
+		runInterrupts.set(false);
+
+		do {
+			try {
+				interruptCallerThread.join();
+			} catch (InterruptedException ignore) {
+			}
+		} while (interruptCallerThread.isAlive());
+
+		blockUntilTriggered.trigger();
+		Assert.assertTrue(timeService.shutdownServiceUninterruptible(timeoutMs));
+		Assert.assertTrue(timerFinished.get());
+	}
+
+	private static SystemProcessingTimeService createBlockingSystemProcessingTimeService(
+		final Object lock,
+		final OneShotLatch blockUntilTriggered,
+		final AtomicBoolean check) {
+
+		final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
+
+		Preconditions.checkState(!check.get());
+
+		final SystemProcessingTimeService timeService = new SystemProcessingTimeService(
+			(message, exception) -> {
+			},
+			lock);
+
+		timeService.scheduleAtFixedRate(
+			timestamp -> {
+
+				waitUntilTimerStarted.trigger();
+
+				boolean unblocked = false;
+
+				while (!unblocked) {
+					try {
+						blockUntilTriggered.await();
+						unblocked = true;
+					} catch (InterruptedException ignore) {
+					}
+				}
+
+				check.set(true);
+			},
+			0L,
+			10L);
+
+		try {
+			waitUntilTimerStarted.await();
+		} catch (InterruptedException e) {
+			Assert.fail("Problem while starting up service.");
+		}
+
+		return timeService;
 	}
 }


[04/12] flink git commit: [FLINK-9246][HS] Adjust HistoryServer for job overview changes

Posted by tr...@apache.org.
[FLINK-9246][HS] Adjust HistoryServer for job overview changes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cef5fde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cef5fde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cef5fde

Branch: refs/heads/master
Commit: 2cef5fded65575eb16511271c5820a992890195b
Parents: bcd028d
Author: hzyuqi1 <hz...@corp.netease.com>
Authored: Tue Apr 24 10:25:10 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../history/HistoryServerArchiveFetcher.java        | 16 +++++++---------
 1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2cef5fde/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 450436f..413473b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.FileUtils;
 
@@ -162,7 +163,7 @@ class HistoryServerArchiveFetcher {
 									String json = archive.getJson();
 
 									File target;
-									if (path.equals("/joboverview")) {
+									if (path.equals(JobsOverviewHeaders.URL)) {
 										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
 									} else {
 										target = new File(webDir, path + JSON_FILE_ENDING);
@@ -211,7 +212,7 @@ class HistoryServerArchiveFetcher {
 						}
 					}
 					if (updateOverview) {
-						updateJobOverview(webDir);
+						updateJobOverview(webOverviewDir, webDir);
 					}
 				}
 			} catch (Exception e) {
@@ -230,19 +231,16 @@ class HistoryServerArchiveFetcher {
 	 *
 	 * <p>For the display in the HistoryServer WebFrontend we have to combine these overviews.
 	 */
-	private static void updateJobOverview(File webDir) {
-		File webOverviewDir = new File(webDir, "overviews");
-		try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, "joboverview"))) {
+	private static void updateJobOverview(File webOverviewDir, File webDir) {
+		try (JsonGenerator gen = jacksonFactory.createGenerator(HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) {
 			gen.writeStartObject();
-			gen.writeArrayFieldStart("running");
-			gen.writeEndArray();
-			gen.writeArrayFieldStart("finished");
+			gen.writeArrayFieldStart("jobs");
 
 			File[] overviews = new File(webOverviewDir.getPath()).listFiles();
 			if (overviews != null) {
 				for (File overview : overviews) {
 					JsonNode root = mapper.readTree(overview);
-					JsonNode finished = root.get("finished");
+					JsonNode finished = root.get("jobs");
 					JsonNode job = finished.get(0);
 					mapper.writeTree(gen, job);
 				}


[06/12] flink git commit: [FLINK-9194][history] Adjust handlers

Posted by tr...@apache.org.
[FLINK-9194][history] Adjust handlers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb06ba95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb06ba95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb06ba95

Branch: refs/heads/master
Commit: bb06ba954b8c65407039edd239da973d8e97e75b
Parents: 5753b74
Author: zentol <ch...@apache.org>
Authored: Wed Apr 18 14:34:25 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../handler/job/JobAccumulatorsHandler.java     | 22 ++++-
 .../rest/handler/job/JobConfigHandler.java      | 21 ++++-
 .../rest/handler/job/JobDetailsHandler.java     | 21 ++++-
 .../rest/handler/job/JobExceptionsHandler.java  | 21 ++++-
 .../rest/handler/job/JobPlanHandler.java        | 21 ++++-
 .../handler/job/JobVertexDetailsHandler.java    | 27 +++++-
 .../job/JobVertexTaskManagersHandler.java       | 29 ++++++-
 .../rest/handler/job/JobsOverviewHandler.java   | 19 ++++-
 ...taskExecutionAttemptAccumulatorsHandler.java | 50 ++++++++++-
 .../SubtaskExecutionAttemptDetailsHandler.java  | 59 ++++++++++++-
 .../rest/handler/job/SubtasksTimesHandler.java  | 27 +++++-
 .../checkpoints/CheckpointConfigHandler.java    | 28 ++++++-
 .../CheckpointStatisticDetailsHandler.java      | 33 +++++++-
 .../CheckpointingStatisticsHandler.java         | 26 +++++-
 .../TaskCheckpointStatisticDetailsHandler.java  | 87 ++++++++++++++------
 15 files changed, 447 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
index 964aee3..edb529c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
@@ -28,13 +28,19 @@ import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValue
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +50,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler that returns the aggregated accumulators of a job.
  */
-public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobAccumulatorsMessageParameters> {
+public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobAccumulatorsMessageParameters> implements JsonArchivist {
 
 	public JobAccumulatorsHandler(
 			CompletableFuture<String> localRestAddress,
@@ -66,7 +72,6 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc
 
 	@Override
 	protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobAccumulatorsMessageParameters> request, AccessExecutionGraph graph) throws RestHandlerException {
-		JobAccumulatorsInfo accumulatorsInfo;
 		List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
 
 		final boolean includeSerializedValue;
@@ -76,6 +81,18 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc
 			includeSerializedValue = false;
 		}
 
+		return createJobAccumulatorsInfo(graph, includeSerializedValue);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = createJobAccumulatorsInfo(graph, true);
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singleton(new ArchivedJson(path, json));
+	}
+
+	private static JobAccumulatorsInfo createJobAccumulatorsInfo(AccessExecutionGraph graph, boolean includeSerializedValue) {
 		StringifiedAccumulatorResult[] stringifiedAccs = graph.getAccumulatorResultsStringified();
 		List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(stringifiedAccs.length);
 
@@ -87,6 +104,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc
 					acc.getValue()));
 		}
 
+		JobAccumulatorsInfo accumulatorsInfo;
 		if (includeSerializedValue) {
 			Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserTaskAccumulators = graph.getAccumulatorsSerialized();
 			accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, serializedUserTaskAccumulators);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index 7154246..3231668 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -25,11 +25,18 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobConfigInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -37,7 +44,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job configuration.
  */
-public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> {
+public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> implements JsonArchivist {
 
 	public JobConfigHandler(
 			CompletableFuture<String> localRestAddress,
@@ -60,6 +67,18 @@ public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInf
 
 	@Override
 	protected JobConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
+		return createJobConfigInfo(executionGraph);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = createJobConfigInfo(graph);
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singleton(new ArchivedJson(path, json));
+	}
+
+	private static JobConfigInfo createJobConfigInfo(AccessExecutionGraph executionGraph) {
 		final ArchivedExecutionConfig executionConfig = executionGraph.getArchivedExecutionConfig();
 		final JobConfigInfo.ExecutionConfigInfo executionConfigInfo;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 82f24d3..d1383eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -32,16 +32,24 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -50,7 +58,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler returning the details for the specified job.
  */
-public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> {
+public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> implements JsonArchivist {
 
 	private final MetricFetcher<?> metricFetcher;
 
@@ -79,7 +87,18 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
 	protected JobDetailsInfo handleRequest(
 			HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
 			AccessExecutionGraph executionGraph) throws RestHandlerException {
+		return createJobDetailsInfo(executionGraph, metricFetcher);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = createJobDetailsInfo(graph, null);
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singleton(new ArchivedJson(path, json));
+	}
 
+	private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher<?> metricFetcher) {
 		final long now = System.currentTimeMillis();
 		final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED);
 		final long endTime = executionGraph.getState().isGloballyTerminalState() ?

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 63dc604..8ec1af0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -27,14 +27,21 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -43,7 +50,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job exceptions.
  */
-public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> {
+public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> implements JsonArchivist {
 
 	static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
 
@@ -68,6 +75,18 @@ public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExcep
 
 	@Override
 	protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) {
+		return createJobExceptionsInfo(executionGraph);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = createJobExceptionsInfo(graph);
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singletonList(new ArchivedJson(path, json));
+	}
+
+	private static JobExceptionsInfo createJobExceptionsInfo(AccessExecutionGraph executionGraph) {
 		ErrorInfo rootException = executionGraph.getFailureInfo();
 		String rootExceptionMessage = null;
 		Long rootTimestamp = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
index e7a30fb..6eb2573 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
@@ -23,12 +23,19 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -36,7 +43,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job execution plan.
  */
-public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters> {
+public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters> implements JsonArchivist {
 
 	public JobPlanHandler(
 		CompletableFuture<String> localRestAddress,
@@ -59,6 +66,18 @@ public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, J
 
 	@Override
 	protected JobPlanInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+		return createJobPlanInfo(executionGraph);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = createJobPlanInfo(graph);
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singleton(new ArchivedJson(path, json));
+	}
+
+	private static JobPlanInfo createJobPlanInfo(AccessExecutionGraph executionGraph) {
 		return new JobPlanInfo(executionGraph.getJsonPlan());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index b4693a5..034f01d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -36,12 +36,19 @@ import org.apache.flink.runtime.rest.messages.JobVertexDetailsInfo;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -50,7 +57,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the job vertex details.
  */
-public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> {
+public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> implements JsonArchivist {
 	private final MetricFetcher<? extends RestfulGateway> metricFetcher;
 
 	public JobVertexDetailsHandler(
@@ -85,6 +92,24 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe
 			throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
 		}
 
+		return createJobVertexDetailsInfo(jobVertex, jobID, metricFetcher);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		Collection<? extends AccessExecutionJobVertex> vertices = graph.getAllVertices().values();
+		List<ArchivedJson> archive = new ArrayList<>(vertices.size());
+		for (AccessExecutionJobVertex task : vertices) {
+			ResponseBody json = createJobVertexDetailsInfo(task, graph.getJobID(), null);
+			String path = getMessageHeaders().getTargetRestEndpointURL()
+				.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+				.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString());
+			archive.add(new ArchivedJson(path, json));
+		}
+		return archive;
+	}
+
+	private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) {
 		List<JobVertexDetailsInfo.VertexTaskDetail> subtasks = new ArrayList<>();
 		final long now = System.currentTimeMillis();
 		int num = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 24650a3..efb6fc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -38,13 +38,20 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -55,7 +62,7 @@ import java.util.concurrent.Executor;
  * A request handler that provides the details of a job vertex, including id, name, and the
  * runtime and metrics of all its subtasks aggregated by TaskManager.
  */
-public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> {
+public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> implements JsonArchivist {
 	private MetricFetcher<?> metricFetcher;
 
 	public JobVertexTaskManagersHandler(
@@ -83,6 +90,24 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 			throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID));
 		}
 
+		return createJobVertexTaskManagersInfo(jobVertex, jobID, metricFetcher);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		Collection<? extends AccessExecutionJobVertex> vertices = graph.getAllVertices().values();
+		List<ArchivedJson> archive = new ArrayList<>(vertices.size());
+		for (AccessExecutionJobVertex task : vertices) {
+			ResponseBody json = createJobVertexTaskManagersInfo(task, graph.getJobID(), null);
+			String path = getMessageHeaders().getTargetRestEndpointURL()
+				.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+				.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString());
+			archive.add(new ArchivedJson(path, json));
+		}
+		return archive;
+	}
+
+	private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) {
 		// Build a map that groups tasks by TaskManager
 		Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
 		for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
@@ -173,6 +198,6 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<
 				statusCounts));
 		}
 
-		return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList);
+		return new JobVertexTaskManagersInfo(jobVertex.getJobVertexId(), jobVertex.getName(), now, taskManagersInfoList);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
index 94bdbd2..6d2b1e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
@@ -19,25 +19,34 @@
 package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Overview handler for jobs.
  */
-public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> implements JsonArchivist {
 
 	public JobsOverviewHandler(
 			CompletableFuture<String> localRestAddress,
@@ -57,4 +66,12 @@ public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, Emp
 	protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
 		return gateway.requestMultipleJobDetails(timeout);
 	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json = new MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph)));
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singletonList(new ArchivedJson(path, json));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
index e3b1719..e335238 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -21,18 +21,31 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -40,7 +53,10 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the subtask execution attempt accumulators.
  */
-public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters> {
+public class SubtaskExecutionAttemptAccumulatorsHandler
+	extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters>
+	implements JsonArchivist {
+
 	/**
 	 * Instantiates a new Abstract job vertex handler.
 	 *
@@ -68,7 +84,39 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 	protected SubtaskExecutionAttemptAccumulatorsInfo handleRequest(
 			HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request,
 			AccessExecution execution) throws RestHandlerException {
+		return createAccumulatorInfo(execution);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		List<ArchivedJson> archive = new ArrayList<>(16);
+		for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+			for (AccessExecutionVertex subtask : task.getTaskVertices()) {
+				ResponseBody curAttemptJson = createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
+				String curAttemptPath = getMessageHeaders().getTargetRestEndpointURL()
+					.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+					.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+					.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
+					.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+				archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+
+				for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+					AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
+					ResponseBody json = createAccumulatorInfo(attempt);
+					String path = getMessageHeaders().getTargetRestEndpointURL()
+						.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+						.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+						.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
+						.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+					archive.add(new ArchivedJson(path, json));
+				}
+			}
+		}
+		return archive;
+	}
 
+	private static SubtaskExecutionAttemptAccumulatorsInfo createAccumulatorInfo(AccessExecution execution) {
 		final StringifiedAccumulatorResult[] accs = execution.getUserAccumulatorsStringified();
 		final ArrayList<UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index b781ee7..bae80c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -31,12 +34,23 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
 import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -44,7 +58,9 @@ import java.util.concurrent.Executor;
 /**
  * Handler of specific sub task execution attempt.
  */
-public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> {
+public class SubtaskExecutionAttemptDetailsHandler
+	extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters>
+	implements JsonArchivist {
 
 	private final MetricFetcher<?> metricFetcher;
 
@@ -79,11 +95,48 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 			HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request,
 			AccessExecution execution) throws RestHandlerException {
 
-		final MutableIOMetrics ioMetrics = new MutableIOMetrics();
-
 		final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
 		final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
 
+		return createDetailsInfo(execution, jobID, jobVertexID, metricFetcher);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		List<ArchivedJson> archive = new ArrayList<>(16);
+		for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
+			for (AccessExecutionVertex subtask : task.getTaskVertices()) {
+				ResponseBody curAttemptJson = createDetailsInfo(subtask.getCurrentExecutionAttempt(), graph.getJobID(), task.getJobVertexId(), null);
+				String curAttemptPath = getMessageHeaders().getTargetRestEndpointURL()
+					.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+					.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+					.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
+					.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+				archive.add(new ArchivedJson(curAttemptPath, curAttemptJson));
+
+				for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+					AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
+					ResponseBody json = createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
+					String path = getMessageHeaders().getTargetRestEndpointURL()
+						.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+						.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+						.replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex()))
+						.replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+					archive.add(new ArchivedJson(path, json));
+				}
+			}
+		}
+		return archive;
+	}
+
+	private static SubtaskExecutionAttemptDetailsInfo createDetailsInfo(
+			AccessExecution execution,
+			JobID jobID,
+			JobVertexID jobVertexID,
+			@Nullable MetricFetcher<?> metricFetcher) {
+		final MutableIOMetrics ioMetrics = new MutableIOMetrics();
+
 		ioMetrics.addIOMetrics(
 			execution,
 			metricFetcher,

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
index 29c0f93..5026951 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -20,19 +20,27 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +50,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the subtasks times info.
  */
-public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters>  {
+public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters> implements JsonArchivist {
 	public SubtasksTimesHandler(
 			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -63,7 +71,24 @@ public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimes
 
 	@Override
 	protected SubtasksTimesInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionJobVertex jobVertex) {
+		return createSubtaskTimesInfo(jobVertex);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		Collection<? extends AccessExecutionJobVertex> allVertices = graph.getAllVertices().values();
+		List<ArchivedJson> archive = new ArrayList<>(allVertices.size());
+		for (AccessExecutionJobVertex task : allVertices) {
+			ResponseBody json = createSubtaskTimesInfo(task);
+			String path = getMessageHeaders().getTargetRestEndpointURL()
+				.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+				.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString());
+			archive.add(new ArchivedJson(path, json));
+		}
+		return archive;
+	}
 
+	private static SubtasksTimesInfo createSubtaskTimesInfo(AccessExecutionJobVertex jobVertex) {
 		final String id = jobVertex.getJobVertexId().toString();
 		final String name = jobVertex.getName();
 		final long now = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index b88183e..3a03575 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -27,14 +27,23 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -42,7 +51,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler which serves the checkpoint configuration.
  */
-public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> {
+public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> implements JsonArchivist {
 
 	public CheckpointConfigHandler(
 			CompletableFuture<String> localRestAddress,
@@ -64,6 +73,23 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check
 
 	@Override
 	protected CheckpointConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+		return createCheckpointConfigInfo(executionGraph);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody response;
+		try {
+			response = createCheckpointConfigInfo(graph);
+		} catch (RestHandlerException rhe) {
+			response = new ErrorResponseBody(rhe.getMessage());
+		}
+		String path = CheckpointConfigHeaders.getInstance().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singletonList(new ArchivedJson(path, response));
+	}
+
+	private static CheckpointConfigInfo createCheckpointConfigInfo(AccessExecutionGraph executionGraph) throws RestHandlerException {
 		final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = executionGraph.getCheckpointCoordinatorConfiguration();
 
 		if (checkpointCoordinatorConfiguration == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
index 2816336..8f2d713 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
@@ -20,15 +20,28 @@ package org.apache.flink.runtime.rest.handler.job.checkpoints;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -36,7 +49,7 @@ import java.util.concurrent.Executor;
 /**
  * REST handler which returns the details for a checkpoint.
  */
-public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics, CheckpointMessageParameters> {
+public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics, CheckpointMessageParameters> implements JsonArchivist {
 
 	public CheckpointStatisticDetailsHandler(
 			CompletableFuture<String> localRestAddress,
@@ -62,4 +75,22 @@ public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler
 	protected CheckpointStatistics handleCheckpointRequest(HandlerRequest<EmptyRequestBody, CheckpointMessageParameters> ignored, AbstractCheckpointStats checkpointStats) {
 		return CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true);
 	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+		if (stats == null) {
+			return Collections.emptyList();
+		}
+		CheckpointStatsHistory history = stats.getHistory();
+		List<ArchivedJson> archive = new ArrayList<>(history.getCheckpoints().size());
+		for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+			ResponseBody json = CheckpointStatistics.generateCheckpointStatistics(checkpoint, true);
+			String path = getMessageHeaders().getTargetRestEndpointURL()
+				.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
+				.replace(':' + CheckpointIdPathParameter.KEY, String.valueOf(checkpoint.getCheckpointId()));
+			archive.add(new ArchivedJson(path, json));
+		}
+		return archive;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
index b9db367..d4345ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -32,17 +32,25 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
 import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -51,7 +59,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler which serves the checkpoint statistics.
  */
-public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> {
+public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> implements JsonArchivist {
 
 	public CheckpointingStatisticsHandler(
 			CompletableFuture<String> localRestAddress,
@@ -66,7 +74,23 @@ public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandle
 
 	@Override
 	protected CheckpointingStatistics handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
+		return createCheckpointingStatistics(executionGraph);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		ResponseBody json;
+		try {
+			json = createCheckpointingStatistics(graph);
+		} catch (RestHandlerException rhe) {
+			json = new ErrorResponseBody(rhe.getMessage());
+		}
+		String path = getMessageHeaders().getTargetRestEndpointURL()
+			.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
+		return Collections.singletonList(new ArchivedJson(path, json));
+	}
 
+	private static CheckpointingStatistics createCheckpointingStatistics(AccessExecutionGraph executionGraph) throws RestHandlerException {
 		final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
 
 		if (checkpointStatsSnapshot == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
index cff3bf0..2084c50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
@@ -20,26 +20,35 @@ package org.apache.flink.runtime.rest.handler.job.checkpoints;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
 import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics;
 import org.apache.flink.runtime.rest.messages.checkpoints.SubtaskCheckpointStatistics;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointMessageParameters;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsWithSubtaskDetails;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -48,7 +57,9 @@ import java.util.concurrent.Executor;
 /**
  * REST handler which serves checkpoint statistics for subtasks.
  */
-public class TaskCheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters> {
+public class TaskCheckpointStatisticDetailsHandler
+	extends AbstractCheckpointHandler<TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters>
+	implements JsonArchivist {
 
 	public TaskCheckpointStatisticDetailsHandler(
 			CompletableFuture<String> localRestAddress,
@@ -79,30 +90,54 @@ public class TaskCheckpointStatisticDetailsHandler extends AbstractCheckpointHan
 
 		final TaskStateStats taskStatistics = checkpointStats.getTaskStateStats(jobVertexId);
 
-		if (taskStatistics != null) {
-
-			final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary(
-				taskStatistics.getSummaryStats(),
-				checkpointStats.getTriggerTimestamp());
-
-			final List<SubtaskCheckpointStatistics> subtaskCheckpointStatistics = createSubtaskCheckpointStatistics(
-				taskStatistics.getSubtaskStats(),
-				checkpointStats.getTriggerTimestamp());
-
-			return new TaskCheckpointStatisticsWithSubtaskDetails(
-				checkpointStats.getCheckpointId(),
-				checkpointStats.getStatus(),
-				taskStatistics.getLatestAckTimestamp(),
-				taskStatistics.getStateSize(),
-				taskStatistics.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
-				taskStatistics.getAlignmentBuffered(),
-				taskStatistics.getNumberOfSubtasks(),
-				taskStatistics.getNumberOfAcknowledgedSubtasks(),
-				summary,
-				subtaskCheckpointStatistics);
-		} else {
-			throw new RestHandlerException("There is no checkpoint statistics for task " + jobVertexId + '.', HttpResponseStatus.NOT_FOUND);
+		if (taskStatistics == null) {
+			throw new NotFoundException("There is no checkpoint statistics for task " + jobVertexId + '.');
+		}
+
+		return createCheckpointDetails(checkpointStats, taskStatistics);
+	}
+
+	@Override
+	public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+		CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot();
+		if (stats == null) {
+			return Collections.emptyList();
+		}
+		CheckpointStatsHistory history = stats.getHistory();
+		List<ArchivedJson> archive = new ArrayList<>(history.getCheckpoints().size());
+		for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
+			for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) {
+				ResponseBody json = createCheckpointDetails(checkpoint, subtaskStats);
+				String path = getMessageHeaders().getTargetRestEndpointURL()
+					.replace(':' + JobVertexIdPathParameter.KEY, graph.getJobID().toString())
+					.replace(':' + CheckpointIdPathParameter.KEY, String.valueOf(checkpoint.getCheckpointId()))
+					.replace(':' + JobVertexIdPathParameter.KEY, subtaskStats.getJobVertexId().toString());
+				archive.add(new ArchivedJson(path, json));
+			}
 		}
+		return archive;
+	}
+
+	private static TaskCheckpointStatisticsWithSubtaskDetails createCheckpointDetails(AbstractCheckpointStats checkpointStats, TaskStateStats taskStatistics) {
+		final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary(
+			taskStatistics.getSummaryStats(),
+			checkpointStats.getTriggerTimestamp());
+
+		final List<SubtaskCheckpointStatistics> subtaskCheckpointStatistics = createSubtaskCheckpointStatistics(
+			taskStatistics.getSubtaskStats(),
+			checkpointStats.getTriggerTimestamp());
+
+		return new TaskCheckpointStatisticsWithSubtaskDetails(
+			checkpointStats.getCheckpointId(),
+			checkpointStats.getStatus(),
+			taskStatistics.getLatestAckTimestamp(),
+			taskStatistics.getStateSize(),
+			taskStatistics.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
+			taskStatistics.getAlignmentBuffered(),
+			taskStatistics.getNumberOfSubtasks(),
+			taskStatistics.getNumberOfAcknowledgedSubtasks(),
+			summary,
+			subtaskCheckpointStatistics);
 	}
 
 	private static TaskCheckpointStatisticsWithSubtaskDetails.Summary createSummary(TaskStateStats.TaskStateStatsSummary taskStatisticsSummary, long triggerTimestamp) {


[02/12] flink git commit: [FLINK-9194] Add support for legacy history server formats

Posted by tr...@apache.org.
[FLINK-9194] Add support for legacy history server formats


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69583800
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69583800
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69583800

Branch: refs/heads/master
Commit: 6958380038b82f43ba44ebb3e0af8091399da93b
Parents: 6d8cc73
Author: zentol <ch...@apache.org>
Authored: Tue May 8 13:13:59 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../history/HistoryServerArchiveFetcher.java    | 49 +++++++++++
 .../webmonitor/history/HistoryServerTest.java   | 88 +++++++++++++++++++-
 2 files changed, 135 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/69583800/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index ac19197..5acbe54 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -23,7 +23,9 @@ import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
@@ -33,6 +35,7 @@ import org.apache.flink.util.FileUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.slf4j.Logger;
@@ -41,10 +44,12 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.StringWriter;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -168,6 +173,9 @@ class HistoryServerArchiveFetcher {
 									File target;
 									if (path.equals(JobsOverviewHeaders.URL)) {
 										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
+									} else if (path.equals("/joboverview")) { // legacy path
+										json = convertLegacyJobOverview(json);
+										target = new File(webOverviewDir, jobID + JSON_FILE_ENDING);
 									} else {
 										target = new File(webDir, path + JSON_FILE_ENDING);
 									}
@@ -225,6 +233,47 @@ class HistoryServerArchiveFetcher {
 		}
 	}
 
+	private static String convertLegacyJobOverview(String legacyOverview) throws IOException {
+		JsonNode root = mapper.readTree(legacyOverview);
+		JsonNode finishedJobs = root.get("finished");
+		JsonNode job = finishedJobs.get(0);
+
+		JobID jobId = JobID.fromHexString(job.get("jid").asText());
+		String name = job.get("name").asText();
+		JobStatus state = JobStatus.valueOf(job.get("state").asText());
+
+		long startTime = job.get("start-time").asLong();
+		long endTime = job.get("end-time").asLong();
+		long duration = job.get("duration").asLong();
+		long lastMod = job.get("last-modification").asLong();
+
+		JsonNode tasks = job.get("tasks");
+		int numTasks = tasks.get("total").asInt();
+		int pending = tasks.get("pending").asInt();
+		int running = tasks.get("running").asInt();
+		int finished = tasks.get("finished").asInt();
+		int canceling = tasks.get("canceling").asInt();
+		int canceled = tasks.get("canceled").asInt();
+		int failed = tasks.get("failed").asInt();
+
+		int[] tasksPerState = new int[ExecutionState.values().length];
+		// pending is a mix of CREATED/SCHEDULED/DEPLOYING
+		// to maintain the correct number of task states we have to pick one of them
+		tasksPerState[ExecutionState.SCHEDULED.ordinal()] = pending;
+		tasksPerState[ExecutionState.RUNNING.ordinal()] = running;
+		tasksPerState[ExecutionState.FINISHED.ordinal()] = finished;
+		tasksPerState[ExecutionState.CANCELING.ordinal()] = canceling;
+		tasksPerState[ExecutionState.CANCELED.ordinal()] = canceled;
+		tasksPerState[ExecutionState.FAILED.ordinal()] = failed;
+
+		JobDetails jobDetails = new JobDetails(jobId, name, startTime, endTime, duration, state, lastMod, tasksPerState, numTasks);
+		MultipleJobsDetails multipleJobsDetails = new MultipleJobsDetails(Collections.singleton(jobDetails));
+
+		StringWriter sw = new StringWriter();
+		mapper.writeValue(sw, multipleJobsDetails);
+		return sw.toString();
+	}
+
 	/**
 	 * This method replicates the JSON response that would be given by the {@link JobsOverviewHandler} when
 	 * listing both running and finished jobs.

http://git-wip-us.apache.org/repos/asf/flink/blob/69583800/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 580d80f..9407af2 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -18,15 +18,19 @@
 
 package org.apache.flink.runtime.webmonitor.history;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.commons.io.IOUtils;
@@ -38,12 +42,18 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.StringWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils.JACKSON_FACTORY;
+
 /**
  * Tests for the HistoryServer.
  */
@@ -88,6 +98,7 @@ public class HistoryServerTest extends TestLogger {
 		for (int x = 0; x < numJobs; x++) {
 			runJob();
 		}
+		createLegacyArchive(jmDirectory.toPath());
 
 		CountDownLatch numFinishedPolls = new CountDownLatch(1);
 
@@ -99,7 +110,7 @@ public class HistoryServerTest extends TestLogger {
 
 		// the job is archived asynchronously after env.execute() returns
 		File[] archives = jmDirectory.listFiles();
-		while (archives == null || archives.length != numJobs) {
+		while (archives == null || archives.length != numJobs + 1) {
 			Thread.sleep(50);
 			archives = jmDirectory.listFiles();
 		}
@@ -114,7 +125,7 @@ public class HistoryServerTest extends TestLogger {
 			String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
 			MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class);
 
-			Assert.assertEquals(numJobs, overview.getJobs().size());
+			Assert.assertEquals(numJobs + 1, overview.getJobs().size());
 		} finally {
 			hs.stop();
 		}
@@ -143,4 +154,77 @@ public class HistoryServerTest extends TestLogger {
 
 		return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
 	}
+
+	private static void createLegacyArchive(Path directory) throws IOException {
+		JobID jobID = JobID.generate();
+
+		StringWriter sw = new StringWriter();
+		try (JsonGenerator gen = JACKSON_FACTORY.createGenerator(sw)) {
+			try (JsonObject root = new JsonObject(gen)) {
+				try (JsonArray finished = new JsonArray(gen, "finished")) {
+					try (JsonObject job = new JsonObject(gen)) {
+						gen.writeStringField("jid", jobID.toString());
+						gen.writeStringField("name", "testjob");
+						gen.writeStringField("state", JobStatus.FINISHED.name());
+
+						gen.writeNumberField("start-time", 0L);
+						gen.writeNumberField("end-time", 1L);
+						gen.writeNumberField("duration", 1L);
+						gen.writeNumberField("last-modification", 1L);
+
+						try (JsonObject tasks = new JsonObject(gen, "tasks")) {
+							gen.writeNumberField("total", 0);
+
+							gen.writeNumberField("pending", 0);
+							gen.writeNumberField("running", 0);
+							gen.writeNumberField("finished", 0);
+							gen.writeNumberField("canceling", 0);
+							gen.writeNumberField("canceled", 0);
+							gen.writeNumberField("failed", 0);
+						}
+					}
+				}
+			}
+		}
+		String json = sw.toString();
+
+		ArchivedJson archivedJson = new ArchivedJson("/joboverview", json);
+
+		FsJobArchivist.archiveJob(new org.apache.flink.core.fs.Path(directory.toUri()), jobID, Collections.singleton(archivedJson));
+	}
+
+	private static final class JsonObject implements AutoCloseable {
+
+		private final JsonGenerator gen;
+
+		JsonObject(JsonGenerator gen) throws IOException {
+			this.gen = gen;
+			gen.writeStartObject();
+		}
+
+		private JsonObject(JsonGenerator gen, String name) throws IOException {
+			this.gen = gen;
+			gen.writeObjectFieldStart(name);
+		}
+
+		@Override
+		public void close() throws IOException {
+			gen.writeEndObject();
+		}
+	}
+
+	private static final class JsonArray implements AutoCloseable {
+
+		private final JsonGenerator gen;
+
+		JsonArray(JsonGenerator gen, String name) throws IOException {
+			this.gen = gen;
+			gen.writeArrayFieldStart(name);
+		}
+
+		@Override
+		public void close() throws IOException {
+			gen.writeEndArray();
+		}
+	}
 }


[03/12] flink git commit: [FLINK-9194][history] Rework and extend the HistoryServer test

Posted by tr...@apache.org.
[FLINK-9194][history] Rework and extend the HistoryServer test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d8cc733
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d8cc733
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d8cc733

Branch: refs/heads/master
Commit: 6d8cc733d45477fc784fb68be01c2b2b4d16cd87
Parents: bb06ba9
Author: zentol <ch...@apache.org>
Authored: Mon Apr 23 15:13:41 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../webmonitor/history/HistoryServerTest.java   | 100 ++++++++++++-------
 1 file changed, 63 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d8cc733/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index a16f6fb..580d80f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -21,25 +21,19 @@ package org.apache.flink.runtime.webmonitor.history;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HistoryServerOptions;
 import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.messages.ArchiveMessages;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.TestActorRef;
 import org.apache.commons.io.IOUtils;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.Rule;
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
@@ -50,43 +44,67 @@ import java.net.URL;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-
 /**
  * Tests for the HistoryServer.
  */
 public class HistoryServerTest extends TestLogger {
 
-	@Rule
-	public TemporaryFolder tmpDir = new TemporaryFolder();
-
-	@Test
-	public void testFullArchiveLifecycle() throws Exception {
-		ArchivedExecutionGraph graph = (ArchivedExecutionGraph) ArchivedJobGenerationUtils.getTestJob();
-
-		File jmDirectory = tmpDir.newFolder("jm");
-		File hsDirectory = tmpDir.newFolder("hs");
-
-		Configuration config = new Configuration();
-		config.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
+	@ClassRule
+	public static final TemporaryFolder TMP = new TemporaryFolder();
+
+	private MiniClusterResource cluster;
+	private File jmDirectory;
+	private File hsDirectory;
+
+	@Before
+	public void setUp() throws Exception {
+		jmDirectory = TMP.newFolder("jm");
+		hsDirectory = TMP.newFolder("hs");
+
+		Configuration clusterConfig = new Configuration();
+		clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString());
+
+		cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				clusterConfig,
+				1,
+				1
+			),
+			MiniClusterResource.MiniClusterType.NEW
+		);
+		cluster.before();
+	}
 
-		config.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
-		config.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
+	@After
+	public void tearDown() {
+		if (cluster != null) {
+			cluster.after();
+		}
+	}
 
-		config.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
+	@Test
+	public void testHistoryServerIntegration() throws Exception {
+		final int numJobs = 2;
+		for (int x = 0; x < numJobs; x++) {
+			runJob();
+		}
 
-		ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(config);
-		Option<Path> archivePath = Option.apply(new Path(jmDirectory.toURI().toString()));
+		CountDownLatch numFinishedPolls = new CountDownLatch(1);
 
-		ActorRef memoryArchivist = TestActorRef.apply(JobManager.getArchiveProps(MemoryArchivist.class, 1, archivePath), actorSystem);
-		memoryArchivist.tell(new ArchiveMessages.ArchiveExecutionGraph(graph.getJobID(), graph), null);
+		Configuration historyServerConfig = new Configuration();
+		historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_DIRS, jmDirectory.toURI().toString());
+		historyServerConfig.setString(HistoryServerOptions.HISTORY_SERVER_WEB_DIR, hsDirectory.getAbsolutePath());
 
-		File archive = new File(jmDirectory, graph.getJobID().toString());
-		Assert.assertTrue(archive.exists());
+		historyServerConfig.setInteger(HistoryServerOptions.HISTORY_SERVER_WEB_PORT, 0);
 
-		CountDownLatch numFinishedPolls = new CountDownLatch(1);
+		// the job is archived asynchronously after env.execute() returns
+		File[] archives = jmDirectory.listFiles();
+		while (archives == null || archives.length != numJobs) {
+			Thread.sleep(50);
+			archives = jmDirectory.listFiles();
+		}
 
-		HistoryServer hs = new HistoryServer(config, numFinishedPolls);
+		HistoryServer hs = new HistoryServer(historyServerConfig, numFinishedPolls);
 		try {
 			hs.start();
 			String baseUrl = "http://localhost:" + hs.getWebPort();
@@ -96,12 +114,20 @@ public class HistoryServerTest extends TestLogger {
 			String response = getFromHTTP(baseUrl + JobsOverviewHeaders.URL);
 			MultipleJobsDetails overview = mapper.readValue(response, MultipleJobsDetails.class);
 
-			Assert.assertEquals(1, overview.getJobs().size());
+			Assert.assertEquals(numJobs, overview.getJobs().size());
 		} finally {
 			hs.stop();
 		}
 	}
 
+	private static void runJob() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.fromElements(1, 2, 3)
+			.print();
+
+		env.execute();
+	}
+
 	public static String getFromHTTP(String url) throws Exception {
 		URL u = new URL(url);
 		HttpURLConnection connection = (HttpURLConnection) u.openConnection();


[09/12] flink git commit: [FLINK-9194] Introduce HistoryServerArchivist interface

Posted by tr...@apache.org.
[FLINK-9194] Introduce HistoryServerArchivist interface

The HistoryServerArchivist interface encapsulates the archiving logic of an
AccessExecutionGraph to the history server. Currently this means to generate
the JSON responses for all possible HTTP requests and writing them to a
target directory.

This closes #5902.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d734032e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d734032e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d734032e

Branch: refs/heads/master
Commit: d734032ee3f0de1c786f61aa52e7a054e7fca034
Parents: cd37feb
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 19:58:20 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 00:22:46 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    | 65 +++++++-------------
 .../dispatcher/HistoryServerArchivist.java      | 55 +++++++++++++++++
 .../JsonResponseHistoryServerArchivist.java     | 56 +++++++++++++++++
 .../runtime/dispatcher/MiniDispatcher.java      |  5 +-
 .../dispatcher/StandaloneDispatcher.java        |  5 +-
 .../dispatcher/VoidHistoryServerArchivist.java  | 36 +++++++++++
 .../runtime/entrypoint/ClusterEntrypoint.java   |  8 ++-
 .../entrypoint/JobClusterEntrypoint.java        |  6 +-
 .../entrypoint/SessionClusterEntrypoint.java    |  6 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  5 +-
 .../runtime/dispatcher/DispatcherTest.java      |  2 +-
 .../runtime/dispatcher/MiniDispatcherTest.java  |  2 +-
 12 files changed, 191 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 82b9291..4d30870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.client.JobSubmissionException;
@@ -34,7 +32,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -64,8 +61,6 @@ import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
@@ -123,18 +118,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final JobManagerMetricGroup jobManagerMetricGroup;
 
+	private final HistoryServerArchivist historyServerArchivist;
+
 	@Nullable
 	private final String metricQueryServicePath;
 
 	@Nullable
 	protected final String restAddress;
 
-	@Nullable
-	private final JsonArchivist jsonArchivist;
-
-	@Nullable
-	private final Path archivePath;
-
 	private CompletableFuture<Void> orphanedJobManagerRunnersTerminationFuture = CompletableFuture.completedFuture(null);
 
 	public Dispatcher(
@@ -152,7 +143,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 		super(rpcService, endpointId);
 
 		this.configuration = Preconditions.checkNotNull(configuration);
@@ -177,21 +168,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		this.restAddress = restAddress;
 
-		this.jsonArchivist = jsonArchivist;
-
-		String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
-		if (configuredArchivePath != null) {
-			Path tmpArchivePath = null;
-			try {
-				tmpArchivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
-			} catch (Exception e) {
-				log.warn("Failed to validate specified archive directory in '{}'. " +
-					"Jobs will not be archived for the HistoryServer.", configuredArchivePath, e);
-			}
-			archivePath = tmpArchivePath;
-		} else {
-			archivePath = null;
-		}
+		this.historyServerArchivist = Preconditions.checkNotNull(historyServerArchivist);
 
 		this.archivedExecutionGraphStore = Preconditions.checkNotNull(archivedExecutionGraphStore);
 
@@ -639,6 +616,14 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 		log.info("Job {} reached globally terminal state {}.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState());
 
+		archiveExecutionGraph(archivedExecutionGraph);
+
+		final JobID jobId = archivedExecutionGraph.getJobID();
+
+		removeJob(jobId, true);
+	}
+
+	private void archiveExecutionGraph(ArchivedExecutionGraph archivedExecutionGraph) {
 		try {
 			archivedExecutionGraphStore.put(archivedExecutionGraph);
 		} catch (IOException e) {
@@ -649,22 +634,18 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				e);
 		}
 
-		try {
-			if (jsonArchivist != null && archivePath != null) {
-				FsJobArchivist.archiveJob(archivePath, archivedExecutionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(archivedExecutionGraph));
-				log.info("Archived job {} to {}", archivedExecutionGraph.getJobID(), archivePath);
-			}
-		} catch (IOException e) {
-			log.info(
-				"Could not archive completed job {}({}).",
-				archivedExecutionGraph.getJobName(),
-				archivedExecutionGraph.getJobID(),
-				e);
-		}
-
-		final JobID jobId = archivedExecutionGraph.getJobID();
+		final CompletableFuture<Acknowledge> executionGraphFuture = historyServerArchivist.archiveExecutionGraph(archivedExecutionGraph);
 
-		removeJob(jobId, true);
+		executionGraphFuture.whenComplete(
+			(Acknowledge ignored, Throwable throwable) -> {
+				if (throwable != null) {
+					log.info(
+						"Could not archive completed job {}({}) to the history server.",
+						archivedExecutionGraph.getJobName(),
+						archivedExecutionGraph.getJobID(),
+						throwable);
+				}
+			});
 	}
 
 	protected void jobNotFinished(JobID jobId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
new file mode 100644
index 0000000..0030159
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/HistoryServerArchivist.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Writer for an {@link AccessExecutionGraph}.
+ */
+public interface HistoryServerArchivist {
+
+	/**
+	 * Archives the given {@link AccessExecutionGraph} on the history server.
+	 *
+	 * @param executionGraph to store on the history server
+	 * @return Future which is completed once the archiving has been completed.
+	 */
+	CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph);
+
+	static HistoryServerArchivist createHistoryServerArchivist(Configuration configuration, JsonArchivist jsonArchivist) {
+		final String configuredArchivePath = configuration.getString(JobManagerOptions.ARCHIVE_DIR);
+
+		if (configuredArchivePath != null) {
+			final Path archivePath = WebMonitorUtils.validateAndNormalizeUri(new Path(configuredArchivePath).toUri());
+
+			return new JsonResponseHistoryServerArchivist(jsonArchivist, archivePath);
+		} else {
+			return VoidHistoryServerArchivist.INSTANCE;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
new file mode 100644
index 0000000..be58399
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JsonResponseHistoryServerArchivist.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Implementation which archives an {@link AccessExecutionGraph} such that it stores
+ * the JSON requests for all possible history server requests.
+ */
+class JsonResponseHistoryServerArchivist implements HistoryServerArchivist {
+
+	private final JsonArchivist jsonArchivist;
+
+	private final Path archivePath;
+
+	JsonResponseHistoryServerArchivist(JsonArchivist jsonArchivist, Path archivePath) {
+		this.jsonArchivist = Preconditions.checkNotNull(jsonArchivist);
+		this.archivePath = Preconditions.checkNotNull(archivePath);
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph) {
+		try {
+			FsJobArchivist.archiveJob(archivePath, executionGraph.getJobID(), jsonArchivist.archiveJsonWithPath(executionGraph));
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		} catch (IOException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 38e74fb..8b497b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
 
 import javax.annotation.Nullable;
@@ -72,7 +71,7 @@ public class MiniDispatcher extends Dispatcher {
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist,
+			HistoryServerArchivist historyServerArchivist,
 			JobGraph jobGraph,
 			JobClusterEntrypoint.ExecutionMode executionMode) throws Exception {
 		super(
@@ -90,7 +89,7 @@ public class MiniDispatcher extends Dispatcher {
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist);
+			historyServerArchivist);
 
 		this.executionMode = checkNotNull(executionMode);
 		this.jobTerminationFuture = new CompletableFuture<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 5c6a7ab..c39615c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import javax.annotation.Nullable;
 
@@ -52,7 +51,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			JobManagerRunnerFactory jobManagerRunnerFactory,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 		super(
 			rpcService,
 			endpointId,
@@ -68,6 +67,6 @@ public class StandaloneDispatcher extends Dispatcher {
 			jobManagerRunnerFactory,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist);
+			historyServerArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
new file mode 100644
index 0000000..2d34d83
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/VoidHistoryServerArchivist.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * No-op implementation of the {@link HistoryServerArchivist}.
+ */
+public enum VoidHistoryServerArchivist implements HistoryServerArchivist {
+	INSTANCE {
+		@Override
+		public CompletableFuture<Acknowledge> archiveExecutionGraph(AccessExecutionGraph executionGraph) {
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 933add8..a267abb 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -64,7 +65,6 @@ import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
@@ -347,6 +347,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 
 			jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, rpcService.getAddress());
 
+			final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, webMonitorEndpoint);
+
 			dispatcher = createDispatcher(
 				configuration,
 				rpcService,
@@ -359,7 +361,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 				archivedExecutionGraphStore,
 				this,
 				webMonitorEndpoint.getRestBaseUrl(),
-				webMonitorEndpoint);
+				historyServerArchivist);
 
 			LOG.debug("Starting ResourceManager.");
 			resourceManager.start();
@@ -660,7 +662,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		ArchivedExecutionGraphStore archivedExecutionGraphStore,
 		FatalErrorHandler fatalErrorHandler,
 		@Nullable String restAddress,
-		@Nullable JsonArchivist jsonArchivist) throws Exception;
+		HistoryServerArchivist historyServerArchivist) throws Exception;
 
 	protected abstract ResourceManager<?> createResourceManager(
 		Configuration configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 2a5b8ea..80a9da2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.MiniDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
@@ -39,7 +40,6 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.FlinkException;
@@ -102,7 +102,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 
 		final JobGraph jobGraph = retrieveJobGraph(configuration);
 
@@ -124,7 +124,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist,
+			historyServerArchivist,
 			jobGraph,
 			executionMode);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 85446eb..40eb8b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -40,7 +41,6 @@ import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
@@ -116,7 +116,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			ArchivedExecutionGraphStore archivedExecutionGraphStore,
 			FatalErrorHandler fatalErrorHandler,
 			@Nullable String restAddress,
-			@Nullable JsonArchivist jsonArchivist) throws Exception {
+			HistoryServerArchivist historyServerArchivist) throws Exception {
 
 		// create the default dispatcher
 		return new StandaloneDispatcher(
@@ -133,6 +133,6 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 			fatalErrorHandler,
 			restAddress,
-			jsonArchivist);
+			historyServerArchivist);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 09f8bf4..aca4fdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
 import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
@@ -357,6 +358,8 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 
 				this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost");
 
+				final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint);
+
 				dispatcher = new StandaloneDispatcher(
 					jobManagerRpcService,
 					Dispatcher.DISPATCHER_NAME + UUID.randomUUID(),
@@ -371,7 +374,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 					Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE,
 					new ShutDownFatalErrorHandler(),
 					dispatcherRestEndpoint.getRestBaseUrl(),
-					dispatcherRestEndpoint);
+					historyServerArchivist);
 
 				dispatcher.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d9482e7..eff63e7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -568,7 +568,7 @@ public class DispatcherTest extends TestLogger {
 				jobManagerRunnerFactory,
 				fatalErrorHandler,
 				null,
-				null);
+				VoidHistoryServerArchivist.INSTANCE);
 		}
 
 		@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/d734032e/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index 157fca7..914531d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -258,7 +258,7 @@ public class MiniDispatcherTest extends TestLogger {
 			testingJobManagerRunnerFactory,
 			testingFatalErrorHandler,
 			null,
-			null,
+			VoidHistoryServerArchivist.INSTANCE,
 			jobGraph,
 			executionMode);
 	}


[11/12] flink git commit: [FLINK-9358] Avoid NPE when closing an unestablished ResourceManager connection

Posted by tr...@apache.org.
[FLINK-9358] Avoid NPE when closing an unestablished ResourceManager connection

A NPE occurred when trying to disconnect an unestablished ResourceManager connection.
In order to fix this problem, we now check whether the connection has been established
or not.

This closes #6011.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a95ec5ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a95ec5ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a95ec5ac

Branch: refs/heads/master
Commit: a95ec5acf259884347ae539913bcffcad5bfc340
Parents: f4e0368
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 14:14:45 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 00:23:10 2018 +0200

----------------------------------------------------------------------
 .../EstablishedResourceManagerConnection.java   | 59 ++++++++++++++
 .../flink/runtime/jobmaster/JobMaster.java      | 83 +++++++++++---------
 .../flink/runtime/jobmaster/JobMasterTest.java  | 48 +++++++++++
 3 files changed, 155 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
new file mode 100644
index 0000000..46c1b4b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Class which contains the connection details of an established
+ * connection with the ResourceManager.
+ */
+class EstablishedResourceManagerConnection {
+
+	private final ResourceManagerGateway resourceManagerGateway;
+
+	private final ResourceManagerId resourceManagerId;
+
+	private final ResourceID resourceManagerResourceID;
+
+	EstablishedResourceManagerConnection(
+			@Nonnull ResourceManagerGateway resourceManagerGateway,
+			@Nonnull ResourceManagerId resourceManagerId,
+			@Nonnull ResourceID resourceManagerResourceID) {
+		this.resourceManagerGateway = resourceManagerGateway;
+		this.resourceManagerId = resourceManagerId;
+		this.resourceManagerResourceID = resourceManagerResourceID;
+	}
+
+	public ResourceManagerGateway getResourceManagerGateway() {
+		return resourceManagerGateway;
+	}
+
+	public ResourceManagerId getResourceManagerId() {
+		return resourceManagerId;
+	}
+
+	public ResourceID getResourceManagerResourceID() {
+		return resourceManagerResourceID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f30c119..aff3280 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -191,9 +191,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	private LeaderRetrievalService resourceManagerLeaderRetriever;
 
-	@Nullable
-	private ResourceManagerConnection resourceManagerConnection;
-
 	// --------- TaskManagers --------
 
 	private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
@@ -211,6 +208,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	@Nullable
 	private String lastInternalSavepoint;
 
+	@Nullable
+	private ResourceManagerConnection resourceManagerConnection;
+
+	@Nullable
+	private EstablishedResourceManagerConnection establishedResourceManagerConnection;
+
 	// ------------------------------------------------------------------------
 
 	public JobMaster(
@@ -290,6 +293,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
 		this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
 		this.jobStatusListener = null;
+
+		this.resourceManagerConnection = null;
+		this.establishedResourceManagerConnection = null;
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -881,12 +887,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			final ResourceManagerId resourceManagerId,
 			final Exception cause) {
 
-		if (resourceManagerConnection != null
-				&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId)) {
+		if (isConnectingToResourceManager(resourceManagerId)) {
 			closeResourceManagerConnection(cause);
 		}
 	}
 
+	private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
+		return resourceManagerConnection != null
+				&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId);
+	}
+
 	@Override
 	public void heartbeatFromTaskManager(final ResourceID resourceID, AccumulatorReport accumulatorReport) {
 		taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport);
@@ -1238,11 +1248,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					return;
 				}
 
-				closeResourceManagerConnection(new Exception(
-					"ResourceManager leader changed to new address " + resourceManagerAddress));
-
 				log.info("ResourceManager leader changed from {} to {}. Registering at new leader.",
 					resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+
+				closeResourceManagerConnection(new Exception(
+					"ResourceManager leader changed to new address " + resourceManagerAddress));
 			} else {
 				log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
 					resourceManagerConnection.getTargetAddress());
@@ -1277,9 +1287,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 			final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
 
+			final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId();
+
+			establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
+				resourceManagerGateway,
+				success.getResourceManagerId(),
+				resourceManagerResourceId);
+
 			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
 
-			resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>() {
+			resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() {
 				@Override
 				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
 					resourceManagerGateway.heartbeatFromJobManager(resourceID);
@@ -1297,22 +1314,31 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	private void closeResourceManagerConnection(Exception cause) {
-		if (resourceManagerConnection != null) {
-			if (log.isDebugEnabled()) {
-				log.debug("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause);
-			} else {
-				log.info("Close ResourceManager connection {}: {}.", resourceManagerConnection.getResourceManagerResourceID(), cause.getMessage());
-			}
-
-			resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID());
-
-			ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
-			resourceManagerGateway.disconnectJobManager(resourceManagerConnection.getJobID(), cause);
+		if (establishedResourceManagerConnection != null) {
+			dissolveResourceManagerConnection(establishedResourceManagerConnection, cause);
+			establishedResourceManagerConnection = null;
+		}
 
+		if (resourceManagerConnection != null) {
+			// stop a potentially ongoing registration process
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
 		}
+	}
+
+	private void dissolveResourceManagerConnection(EstablishedResourceManagerConnection establishedResourceManagerConnection, Exception cause) {
+		final ResourceID resourceManagerResourceID = establishedResourceManagerConnection.getResourceManagerResourceID();
 
+		if (log.isDebugEnabled()) {
+			log.debug("Close ResourceManager connection {}.", resourceManagerResourceID, cause);
+		} else {
+			log.info("Close ResourceManager connection {}: {}.", resourceManagerResourceID, cause.getMessage());
+		}
+
+		resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID);
+
+		ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway();
+		resourceManagerGateway.disconnectJobManager(jobGraph.getJobID(), cause);
 		slotPoolGateway.disconnectResourceManager();
 	}
 
@@ -1473,8 +1499,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		private final JobMasterId jobMasterId;
 
-		private ResourceID resourceManagerResourceID;
-
 		ResourceManagerConnection(
 				final Logger log,
 				final JobID jobID,
@@ -1498,7 +1522,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					getTargetAddress(), getTargetLeaderId()) {
 				@Override
 				protected CompletableFuture<RegistrationResponse> invokeRegistration(
-						ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
+						ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) {
 					Time timeout = Time.milliseconds(timeoutMillis);
 
 					return gateway.registerJobManager(
@@ -1513,24 +1537,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		@Override
 		protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) {
-			runAsync(() -> {
-				resourceManagerResourceID = success.getResourceManagerResourceId();
-				establishResourceManagerConnection(success);
-			});
+			runAsync(() -> establishResourceManagerConnection(success));
 		}
 
 		@Override
 		protected void onRegistrationFailure(final Throwable failure) {
 			handleJobMasterError(failure);
 		}
-
-		public ResourceID getResourceManagerResourceID() {
-			return resourceManagerResourceID;
-		}
-
-		public JobID getJobID() {
-			return jobID;
-		}
 	}
 
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 2f61681..c0c9162 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
@@ -361,6 +362,53 @@ public class JobMasterTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that we can close an unestablished ResourceManager connection.
+	 */
+	@Test
+	public void testCloseUnestablishedResourceManagerConnection() throws Exception {
+		final JobMaster jobMaster = createJobMaster(
+			JobMasterConfiguration.fromConfiguration(configuration),
+			jobGraph,
+			haServices,
+			new TestingJobManagerSharedServicesBuilder().build());
+
+		try {
+			jobMaster.start(JobMasterId.generate(), testingTimeout).get();
+			final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+			final String firstResourceManagerAddress = "address1";
+			final String secondResourceManagerAddress = "address2";
+
+			final TestingResourceManagerGateway firstResourceManagerGateway = new TestingResourceManagerGateway();
+			final TestingResourceManagerGateway secondResourceManagerGateway = new TestingResourceManagerGateway();
+
+			rpcService.registerGateway(firstResourceManagerAddress, firstResourceManagerGateway);
+			rpcService.registerGateway(secondResourceManagerAddress, secondResourceManagerGateway);
+
+			final OneShotLatch firstJobManagerRegistration = new OneShotLatch();
+			final OneShotLatch secondJobManagerRegistration = new OneShotLatch();
+
+			firstResourceManagerGateway.setRegisterJobManagerConsumer(
+				jobMasterIdResourceIDStringJobIDTuple4 -> firstJobManagerRegistration.trigger());
+
+			secondResourceManagerGateway.setRegisterJobManagerConsumer(
+				jobMasterIdResourceIDStringJobIDTuple4 -> secondJobManagerRegistration.trigger());
+
+			rmLeaderRetrievalService.notifyListener(firstResourceManagerAddress, resourceManagerId.toUUID());
+
+			// wait until we have seen the first registration attempt
+			firstJobManagerRegistration.await();
+
+			// this should stop the connection attempts towards the first RM
+			rmLeaderRetrievalService.notifyListener(secondResourceManagerAddress, resourceManagerId.toUUID());
+
+			// check that we start registering at the second RM
+			secondJobManagerRegistration.await();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
+
 	private File createSavepoint(long savepointId) throws IOException {
 		final File savepointFile = temporaryFolder.newFile();
 		final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());


[08/12] flink git commit: [FLINK-9194][history] Add convenience ArchivedJson constructor

Posted by tr...@apache.org.
[FLINK-9194][history] Add convenience ArchivedJson constructor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5753b746
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5753b746
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5753b746

Branch: refs/heads/master
Commit: 5753b746c1dade748b6576f7e6024cc18c32d6a3
Parents: fd374b8
Author: zentol <ch...@apache.org>
Authored: Wed Apr 18 14:33:16 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../runtime/webmonitor/history/ArchivedJson.java     | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5753b746/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
index a15dd52..9200248 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -18,8 +18,14 @@
 
 package org.apache.flink.runtime.webmonitor.history;
 
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.Objects;
 
 /**
@@ -30,6 +36,8 @@ import java.util.Objects;
  */
 public class ArchivedJson {
 
+	private static final ObjectMapper MAPPER = RestMapperUtils.getStrictObjectMapper();
+
 	private final String path;
 	private final String json;
 
@@ -38,6 +46,13 @@ public class ArchivedJson {
 		this.json = Preconditions.checkNotNull(json);
 	}
 
+	public ArchivedJson(String path, ResponseBody json) throws IOException {
+		this.path = Preconditions.checkNotNull(path);
+		StringWriter sw = new StringWriter();
+		MAPPER.writeValue(sw, Preconditions.checkNotNull(json));
+		this.json = sw.toString();
+	}
+
 	public String getPath() {
 		return path;
 	}


[10/12] flink git commit: [hotfix] Make field JobMaster#resourceManagerLeaderRetriever final

Posted by tr...@apache.org.
[hotfix] Make field JobMaster#resourceManagerLeaderRetriever final


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4de72bbe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4de72bbe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4de72bbe

Branch: refs/heads/master
Commit: 4de72bbee189ab357e4d9e6fea33e27ff1ab233f
Parents: a95ec5a
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon May 14 17:30:55 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue May 15 00:23:10 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/runtime/jobmaster/JobMaster.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4de72bbe/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index aff3280..f1dbbb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -189,7 +189,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	// --------- ResourceManager --------
 
-	private LeaderRetrievalService resourceManagerLeaderRetriever;
+	private final LeaderRetrievalService resourceManagerLeaderRetriever;
 
 	// --------- TaskManagers --------