You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/03 22:00:18 UTC
[2/3] flink git commit: [FLINK-7409] [web] Make WebRuntimeMonitor
reactive
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index 513dc08..1a7d868 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler for the CANCEL request.
@@ -36,7 +39,8 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
private final Time timeout;
- public JobCancellationHandler(Time timeout) {
+ public JobCancellationHandler(Executor executor, Time timeout) {
+ super(executor);
this.timeout = Preconditions.checkNotNull(timeout);
}
@@ -46,19 +50,23 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
- if (jobManagerGateway != null) {
- jobManagerGateway.cancelJob(jobId, timeout);
- return "{}";
- }
- else {
- throw new Exception("No connection to the leading JobManager.");
- }
- }
- catch (Exception e) {
- throw new Exception("Failed to cancel the job with id: " + pathParams.get("jobid") + e.getMessage(), e);
- }
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+ if (jobManagerGateway != null) {
+ jobManagerGateway.cancelJob(jobId, timeout);
+ return "{}";
+ }
+ else {
+ throw new Exception("No connection to the leading JobManager.");
+ }
+ }
+ catch (Exception e) {
+ throw new FlinkFutureException("Failed to cancel the job with id: " + pathParams.get("jobid"), e);
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 9b474aa..4e41447 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -24,12 +24,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.NotFoundException;
-import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -140,48 +141,48 @@ public class JobCancellationWithSavepointHandlers {
@Override
@SuppressWarnings("unchecked")
- public FullHttpResponse handleRequest(
+ public CompletableFuture<FullHttpResponse> handleRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
- JobManagerGateway jobManagerGateway) throws Exception {
+ JobManagerGateway jobManagerGateway) {
- try {
- if (jobManagerGateway != null) {
- JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
- final Optional<AccessExecutionGraph> optGraph;
+ if (jobManagerGateway != null) {
+ JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+ final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
- try {
- optGraph = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
- } catch (Exception e) {
- throw new FlinkException("Could not retrieve the execution with jobId " + jobId + " from the JobManager.", e);
- }
+ graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
- final AccessExecutionGraph graph = optGraph.orElseThrow(
- () -> new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
+ return graphFuture.thenApplyAsync(
+ (Optional<AccessExecutionGraph> optGraph) -> {
+ final AccessExecutionGraph graph = optGraph.orElseThrow(
+ () -> new FlinkFutureException(
+ new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
- CheckpointCoordinator coord = graph.getCheckpointCoordinator();
- if (coord == null) {
- throw new Exception("Cannot find CheckpointCoordinator for job.");
- }
+ CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+ if (coord == null) {
+ throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job."));
+ }
- String targetDirectory = pathParams.get("targetDirectory");
- if (targetDirectory == null) {
- if (defaultSavepointDirectory == null) {
- throw new IllegalStateException("No savepoint directory configured. " +
+ String targetDirectory = pathParams.get("targetDirectory");
+ if (targetDirectory == null) {
+ if (defaultSavepointDirectory == null) {
+ throw new IllegalStateException("No savepoint directory configured. " +
"You can either specify a directory when triggering this savepoint or " +
"configure a cluster-wide default via key '" +
CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
- } else {
- targetDirectory = defaultSavepointDirectory;
+ } else {
+ targetDirectory = defaultSavepointDirectory;
+ }
}
- }
- return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
- } else {
- throw new Exception("No connection to the leading JobManager.");
- }
- } catch (Exception e) {
- throw new Exception("Failed to cancel the job: " + e.getMessage(), e);
+ try {
+ return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not cancel job with savepoint.", e);
+ }
+ }, executor);
+ } else {
+ return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
}
}
@@ -288,64 +289,63 @@ public class JobCancellationWithSavepointHandlers {
@Override
@SuppressWarnings("unchecked")
- public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- if (jobManagerGateway != null) {
- JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
- long requestId = Long.parseLong(pathParams.get("requestId"));
+ public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+ long requestId = Long.parseLong(pathParams.get("requestId"));
- synchronized (lock) {
- Object result = completed.remove(requestId);
-
- if (result != null) {
- // Add to recent history
- recentlyCompleted.add(new Tuple2<>(requestId, result));
- if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
- recentlyCompleted.remove();
- }
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ synchronized (lock) {
+ Object result = completed.remove(requestId);
+
+ if (result != null) {
+ // Add to recent history
+ recentlyCompleted.add(new Tuple2<>(requestId, result));
+ if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
+ recentlyCompleted.remove();
+ }
- if (result.getClass() == String.class) {
- String savepointPath = (String) result;
- return createSuccessResponse(requestId, savepointPath);
- } else {
- Throwable cause = (Throwable) result;
- return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
- }
- } else {
- // Check in-progress
- Long inProgressRequestId = inProgress.get(jobId);
- if (inProgressRequestId != null) {
- // Sanity check
- if (inProgressRequestId == requestId) {
- return createInProgressResponse(requestId);
+ if (result.getClass() == String.class) {
+ String savepointPath = (String) result;
+ return createSuccessResponse(requestId, savepointPath);
} else {
- String msg = "Request ID does not belong to JobID";
- return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
+ Throwable cause = (Throwable) result;
+ return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
}
- }
-
- // Check recent history
- for (Tuple2<Long, Object> recent : recentlyCompleted) {
- if (recent.f0 == requestId) {
- if (recent.f1.getClass() == String.class) {
- String savepointPath = (String) recent.f1;
- return createSuccessResponse(requestId, savepointPath);
+ } else {
+ // Check in-progress
+ Long inProgressRequestId = inProgress.get(jobId);
+ if (inProgressRequestId != null) {
+ // Sanity check
+ if (inProgressRequestId == requestId) {
+ return createInProgressResponse(requestId);
} else {
- Throwable cause = (Throwable) recent.f1;
- return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+ String msg = "Request ID does not belong to JobID";
+ return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
}
}
- }
- return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
+ // Check recent history
+ for (Tuple2<Long, Object> recent : recentlyCompleted) {
+ if (recent.f0 == requestId) {
+ if (recent.f1.getClass() == String.class) {
+ String savepointPath = (String) recent.f1;
+ return createSuccessResponse(requestId, savepointPath);
+ } else {
+ Throwable cause = (Throwable) recent.f1;
+ return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+ }
+ }
+ }
+
+ return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
+ }
}
+ } catch (Exception e) {
+ throw new FlinkFutureException("Could not handle in progress request.", e);
}
- } else {
- throw new Exception("No connection to the leading JobManager.");
- }
- } catch (Exception e) {
- throw new Exception("Failed to cancel the job: " + e.getMessage(), e);
- }
+ });
}
private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 72cf8b7..0b15b37 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -31,6 +32,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the execution config of a job.
@@ -39,8 +42,8 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config";
- public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -49,8 +52,17 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createJobConfigJson(graph);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createJobConfigJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write job config json.", e);
+ }
+ },
+ executor);
+
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 87ac7c3..8a50f87 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -39,6 +40,8 @@ import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns details about a job. This includes:
@@ -57,8 +60,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
private final MetricFetcher fetcher;
- public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
- super(executionGraphHolder);
+ public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+ super(executionGraphHolder, executor);
this.fetcher = fetcher;
}
@@ -68,8 +71,16 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createJobDetailsJson(graph, fetcher);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createJobDetailsJson(graph, fetcher);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create job details json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index e31299b..6ffd443 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -35,6 +36,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the configuration of a job.
@@ -45,8 +48,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
- public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -55,8 +58,17 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createJobExceptionsJson(graph);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createJobExceptionsJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create job exceptions json.", e);
+ }
+ },
+ executor
+ );
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index e2437e6..cb6d8c0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -19,12 +19,16 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import com.fasterxml.jackson.core.JsonGenerator;
+import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Returns the Job Manager's configuration.
@@ -35,7 +39,8 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
private final Configuration config;
- public JobManagerConfigHandler(Configuration config) {
+ public JobManagerConfigHandler(Executor executor, Configuration config) {
+ super(executor);
this.config = config;
}
@@ -45,31 +50,38 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartArray();
- for (String key : config.keySet()) {
- gen.writeStartObject();
- gen.writeStringField("key", key);
+ gen.writeStartArray();
+ for (String key : config.keySet()) {
+ gen.writeStartObject();
+ gen.writeStringField("key", key);
- // Mask key values which contain sensitive information
- if (key.toLowerCase().contains("password")) {
- String value = config.getString(key, null);
- if (value != null) {
- value = "******";
- }
- gen.writeStringField("value", value);
- }
- else {
- gen.writeStringField("value", config.getString(key, null));
- }
- gen.writeEndObject();
- }
- gen.writeEndArray();
+ // Mask key values which contain sensitive information
+ if (key.toLowerCase().contains("password")) {
+ String value = config.getString(key, null);
+ if (value != null) {
+ value = "******";
+ }
+ gen.writeStringField("value", value);
+ } else {
+ gen.writeStringField("value", config.getString(key, null));
+ }
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
- gen.close();
- return writer.toString();
+ gen.close();
+ return writer.toString();
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write configuration.", e);
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index d17b6bb..b3a9dd5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -27,6 +27,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the JSON program plan of a job graph.
@@ -35,8 +37,8 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
- public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -45,8 +47,8 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return graph.getJsonPlan();
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.completedFuture(graph.getJsonPlan());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 3526734..f63403f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler for the STOP request.
@@ -36,7 +39,8 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
private final Time timeout;
- public JobStoppingHandler(Time timeout) {
+ public JobStoppingHandler(Executor executor, Time timeout) {
+ super(executor);
this.timeout = Preconditions.checkNotNull(timeout);
}
@@ -46,19 +50,23 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
- if (jobManagerGateway != null) {
- jobManagerGateway.stopJob(jobId, timeout);
- return "{}";
- }
- else {
- throw new Exception("No connection to the leading JobManager.");
- }
- }
- catch (Exception e) {
- throw new Exception("Failed to stop the job with id: " + pathParams.get("jobid") + e.getMessage(), e);
- }
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+ if (jobManagerGateway != null) {
+ jobManagerGateway.stopJob(jobId, timeout);
+ return "{}";
+ }
+ else {
+ throw new Exception("No connection to the leading JobManager.");
+ }
+ }
+ catch (Exception e) {
+ throw new FlinkFutureException("Failed to stop the job with id: " + pathParams.get("jobid") + '.', e);
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 8e90dfc..9c613ff 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -33,6 +34,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the accummulators for a given vertex.
@@ -41,8 +44,8 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators";
- public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -51,8 +54,17 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
}
@Override
- public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
- return createVertexAccumulatorsJson(jobVertex);
+ public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createVertexAccumulatorsJson(jobVertex);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create job vertex accumulators json.", e);
+ }
+ },
+ executor);
+
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index cde8ca9..963153f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -27,8 +28,11 @@ import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
import com.fasterxml.jackson.core.JsonGenerator;
+import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import scala.Option;
@@ -51,10 +55,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
public JobVertexBackPressureHandler(
ExecutionGraphHolder executionGraphHolder,
+ Executor executor,
BackPressureStatsTracker backPressureStatsTracker,
int refreshInterval) {
- super(executionGraphHolder);
+ super(executionGraphHolder, executor);
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker");
checkArgument(refreshInterval >= 0, "Negative timeout");
this.refreshInterval = refreshInterval;
@@ -66,11 +71,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
}
@Override
- public String handleRequest(
+ public CompletableFuture<String> handleRequest(
AccessExecutionJobVertex accessJobVertex,
- Map<String, String> params) throws Exception {
+ Map<String, String> params) {
if (accessJobVertex instanceof ArchivedExecutionJobVertex) {
- return "";
+ return CompletableFuture.completedFuture("");
}
ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex;
try (StringWriter writer = new StringWriter();
@@ -116,7 +121,9 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
gen.writeEndObject();
gen.close();
- return writer.toString();
+ return CompletableFuture.completedFuture(writer.toString());
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 7757fdd..bd1745c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -39,6 +40,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* A request handler that provides the details of a job vertex, including id, name, parallelism,
@@ -50,8 +53,8 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
private final MetricFetcher fetcher;
- public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
- super(executionGraphHolder);
+ public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+ super(executionGraphHolder, executor);
this.fetcher = fetcher;
}
@@ -61,8 +64,16 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
- return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+ public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write the vertex details json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index a612782..0827720 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -41,6 +42,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* A request handler that provides the details of a job vertex, including id, name, and the
@@ -52,8 +55,8 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
private final MetricFetcher fetcher;
- public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
- super(executionGraphHolder);
+ public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+ super(executionGraphHolder, executor);
this.fetcher = fetcher;
}
@@ -63,8 +66,16 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
}
@Override
- public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
- return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+ public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create TaskManager json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 079be8f..8ca785f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -19,11 +19,11 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.NotFoundException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
/**
* Base interface for all request handlers.
@@ -44,13 +44,8 @@ public interface RequestHandler {
* @param jobManagerGateway to talk to the JobManager.
*
* @return The full http response.
- *
- * @throws Exception Handlers may forward exceptions. Exceptions of type
- * {@link NotFoundException} will cause a HTTP 404
- * response with the exception message, other exceptions will cause a HTTP 500 response
- * with the exception stack trace.
*/
- FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception;
+ CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway);
/**
* Returns an array of REST URL's under which this handler can be registered.
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 28e9ddf..301b217 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler providing details about a single task execution attempt.
@@ -31,8 +33,8 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
- public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
- super(executionGraphHolder, fetcher);
+ public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+ super(executionGraphHolder, executor, fetcher);
}
@Override
@@ -41,7 +43,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
}
@Override
- public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
+ public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) {
return handleRequest(vertex.getCurrentExecutionAttempt(), params);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 171277f..3c0d1d9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Base class for request handlers whose response depends on a specific job vertex (defined
@@ -44,8 +47,8 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
- public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -54,8 +57,16 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
}
@Override
- public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
- return createAttemptAccumulatorsJson(execAttempt);
+ public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createAttemptAccumulatorsJson(execAttempt);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create accumulator json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 37c0e50..ad836df 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -40,6 +41,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import static org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
@@ -52,8 +55,8 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
private final MetricFetcher fetcher;
- public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
- super(executionGraphHolder);
+ public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+ super(executionGraphHolder, executor);
this.fetcher = fetcher;
}
@@ -63,8 +66,16 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
}
@Override
- public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
- return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
+ public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create attempt details json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 64bdfb4..8142548 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the accumulators for all subtasks of job vertex.
@@ -43,8 +46,8 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
- public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -53,8 +56,16 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
}
@Override
- public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
- return createSubtasksAccumulatorsJson(jobVertex);
+ public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createSubtasksAccumulatorsJson(jobVertex);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create subtasks accumulator json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index ea88587..d766206 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the state transition timestamps for all subtasks, plus their
@@ -44,8 +47,8 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes";
- public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -54,8 +57,16 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
- return createSubtaskTimesJson(jobVertex);
+ public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createSubtaskTimesJson(jobVertex);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write subtask time json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index a8ab7a3..9f83ed0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -28,14 +30,14 @@ import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.core.JsonGenerator;
+import java.io.IOException;
import java.io.StringWriter;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
import static java.util.Objects.requireNonNull;
@@ -53,7 +55,8 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
private final MetricFetcher fetcher;
- public TaskManagersHandler(Time timeout, MetricFetcher fetcher) {
+ public TaskManagersHandler(Executor executor, Time timeout, MetricFetcher fetcher) {
+ super(executor);
this.timeout = requireNonNull(timeout);
this.fetcher = fetcher;
}
@@ -64,134 +67,139 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- if (jobManagerGateway != null) {
- // whether one task manager's metrics are requested, or all task manager, we
- // return them in an array. This avoids unnecessary code complexity.
- // If only one task manager is requested, we only fetch one task manager metrics.
- final List<Instance> instances = new ArrayList<>();
- if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
- try {
- InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
- CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
-
- Optional<Instance> instance = tmInstanceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
- instance.ifPresent(instances::add);
- }
- // this means the id string was invalid. Keep the list empty.
- catch (IllegalArgumentException e){
- // do nothing.
- }
- } else {
- CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
- Collection<Instance> tmInstances = tmInstancesFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- instances.addAll(tmInstances);
- }
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ if (jobManagerGateway != null) {
+ // whether one task manager's metrics are requested, or all task manager, we
+ // return them in an array. This avoids unnecessary code complexity.
+ // If only one task manager is requested, we only fetch one task manager metrics.
+ if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+ InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
+ CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+
+ return tmInstanceFuture.thenApplyAsync(
+ (Optional<Instance> optTaskManager) -> {
+ try {
+ return writeTaskManagersJson(
+ optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
+ pathParams);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+ }
+ },
+ executor);
+ } else {
+ CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+ return tmInstancesFuture.thenApplyAsync(
+ (Collection<Instance> taskManagers) -> {
+ try {
+ return writeTaskManagersJson(taskManagers, pathParams);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+ }
+ },
+ executor);
+ }
+ }
+ else {
+ return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
+ }
+ }
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- gen.writeStartObject();
- gen.writeArrayFieldStart("taskmanagers");
-
- for (Instance instance : instances) {
- gen.writeStartObject();
- gen.writeStringField("id", instance.getId().toString());
- gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
- gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
- gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
- gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
- gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
- gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
- gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
- gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
- gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
-
- // only send metrics when only one task manager requests them.
- if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
- fetcher.update();
- MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
- if (metrics != null) {
- gen.writeObjectFieldStart("metrics");
- long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
- long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
- long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
- gen.writeNumberField("heapCommitted", heapCommitted);
- gen.writeNumberField("heapUsed", heapUsed);
- gen.writeNumberField("heapMax", heapTotal);
-
- long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
- long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
- long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
- gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
- gen.writeNumberField("nonHeapUsed", nonHeapUsed);
- gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
- gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
- gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
- gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
- long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
- long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
- long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
- gen.writeNumberField("directCount", directCount);
- gen.writeNumberField("directUsed", directUsed);
- gen.writeNumberField("directMax", directMax);
-
- long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
- long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
- long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
- gen.writeNumberField("mappedCount", mappedCount);
- gen.writeNumberField("mappedUsed", mappedUsed);
- gen.writeNumberField("mappedMax", mappedMax);
-
- long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
- long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
- gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
- gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
- gen.writeArrayFieldStart("garbageCollectors");
-
- for (String gcName : metrics.garbageCollectorNames) {
- String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
- String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
- if (count != null && time != null) {
- gen.writeStartObject();
- gen.writeStringField("name", gcName);
- gen.writeNumberField("count", Long.valueOf(count));
- gen.writeNumberField("time", Long.valueOf(time));
- gen.writeEndObject();
- }
- }
-
- gen.writeEndArray();
+ private String writeTaskManagersJson(Collection<Instance> instances, Map<String, String> pathParams) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartObject();
+ gen.writeArrayFieldStart("taskmanagers");
+
+ for (Instance instance : instances) {
+ gen.writeStartObject();
+ gen.writeStringField("id", instance.getId().toString());
+ gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
+ gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
+ gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
+ gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
+ gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
+ gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
+ gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
+ gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
+ gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
+
+ // only send metrics when only one task manager requests them.
+ if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+ fetcher.update();
+ MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+ if (metrics != null) {
+ gen.writeObjectFieldStart("metrics");
+ long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+ long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+ long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+ gen.writeNumberField("heapCommitted", heapCommitted);
+ gen.writeNumberField("heapUsed", heapUsed);
+ gen.writeNumberField("heapMax", heapTotal);
+
+ long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+ long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+ long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+ gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+ gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+ gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+ gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+ gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+ gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+ long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+ long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+ long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+ gen.writeNumberField("directCount", directCount);
+ gen.writeNumberField("directUsed", directUsed);
+ gen.writeNumberField("directMax", directMax);
+
+ long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+ long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+ long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+ gen.writeNumberField("mappedCount", mappedCount);
+ gen.writeNumberField("mappedUsed", mappedUsed);
+ gen.writeNumberField("mappedMax", mappedMax);
+
+ long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+ long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+ gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+ gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+ gen.writeArrayFieldStart("garbageCollectors");
+
+ for (String gcName : metrics.garbageCollectorNames) {
+ String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+ String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+ if (count != null && time != null) {
+ gen.writeStartObject();
+ gen.writeStringField("name", gcName);
+ gen.writeNumberField("count", Long.valueOf(count));
+ gen.writeNumberField("time", Long.valueOf(time));
gen.writeEndObject();
}
}
+ gen.writeEndArray();
gen.writeEndObject();
}
-
- gen.writeEndArray();
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
- }
- else {
- throw new Exception("No connection to the leading JobManager.");
}
+
+ gen.writeEndObject();
}
- catch (Exception e) {
- throw new RuntimeException("Failed to fetch list of all task managers: " + e.getMessage(), e);
- }
+
+ gen.writeEndArray();
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index d4c9b2a..3affd7c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -34,6 +35,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Handler that returns a job's snapshotting settings.
@@ -42,8 +45,8 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config";
- public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -52,8 +55,16 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createCheckpointConfigJson(graph);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createCheckpointConfigJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create checkpoint config json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 664744b..96cc3e0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
@@ -40,6 +41,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns checkpoint stats for a single job vertex.
@@ -50,8 +53,8 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
private final CheckpointStatsCache cache;
- public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
- super(executionGraphHolder);
+ public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+ super(executionGraphHolder, executor);
this.cache = cache;
}
@@ -61,30 +64,38 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- long checkpointId = parseCheckpointId(params);
- if (checkpointId == -1) {
- return "{}";
- }
-
- CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
- if (snapshot == null) {
- return "{}";
- }
-
- AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
-
- if (checkpoint != null) {
- cache.tryAdd(checkpoint);
- } else {
- checkpoint = cache.tryGet(checkpointId);
-
- if (checkpoint == null) {
- return "{}";
- }
- }
-
- return createCheckpointDetailsJson(checkpoint);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ long checkpointId = parseCheckpointId(params);
+ if (checkpointId == -1) {
+ return "{}";
+ }
+
+ CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+ if (snapshot == null) {
+ return "{}";
+ }
+
+ AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+ if (checkpoint != null) {
+ cache.tryAdd(checkpoint);
+ } else {
+ checkpoint = cache.tryGet(checkpointId);
+
+ if (checkpoint == null) {
+ return "{}";
+ }
+ }
+
+ try {
+ return createCheckpointDetailsJson(checkpoint);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create checkpoint details json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index d116c56..045248b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -43,6 +44,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -57,8 +60,8 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
private final CheckpointStatsCache cache;
- public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
- super(executionGraphHolder);
+ public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+ super(executionGraphHolder, executor);
this.cache = checkNotNull(cache);
}
@@ -68,28 +71,28 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
}
@Override
- public String handleJsonRequest(
- Map<String, String> pathParams,
- Map<String, String> queryParams,
- JobManagerGateway jobManagerGateway) throws Exception {
+ public CompletableFuture<String> handleJsonRequest(
+ Map<String, String> pathParams,
+ Map<String, String> queryParams,
+ JobManagerGateway jobManagerGateway) {
return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway);
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params);
if (checkpointId == -1) {
- return "{}";
+ return CompletableFuture.completedFuture("{}");
}
JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params);
if (vertexId == null) {
- return "{}";
+ return CompletableFuture.completedFuture("{}");
}
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot == null) {
- return "{}";
+ return CompletableFuture.completedFuture("{}");
}
AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
@@ -100,16 +103,20 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
checkpoint = cache.tryGet(checkpointId);
if (checkpoint == null) {
- return "{}";
+ return CompletableFuture.completedFuture("{}");
}
}
TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId);
if (taskStats == null) {
- return "{}";
+ return CompletableFuture.completedFuture("{}");
}
- return createSubtaskCheckpointDetailsJson(checkpoint, taskStats);
+ try {
+ return CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint, taskStats));
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index a86c5fd..a60aee0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
@@ -43,6 +44,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Handler that returns checkpoint statistics for a job.
@@ -51,8 +54,8 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints";
- public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -61,8 +64,16 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createCheckpointStatsJson(graph);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createCheckpointStatsJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create checkpoint stats json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index b95f2c4..cf286ce 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.metrics;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
@@ -28,6 +29,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Abstract request handler that returns a list of all available metrics or the values for a set of metrics.
@@ -43,17 +46,27 @@ import java.util.Map;
public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
private final MetricFetcher fetcher;
- public AbstractMetricsHandler(MetricFetcher fetcher) {
+ public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor);
this.fetcher = Preconditions.checkNotNull(fetcher);
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- fetcher.update();
- String requestedMetricsList = queryParams.get("get");
- return requestedMetricsList != null
- ? getMetricsValues(pathParams, requestedMetricsList)
- : getAvailableMetricsList(pathParams);
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ fetcher.update();
+ String requestedMetricsList = queryParams.get("get");
+ try {
+ return requestedMetricsList != null
+ ? getMetricsValues(pathParams, requestedMetricsList)
+ : getAvailableMetricsList(pathParams);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not retrieve metrics.", e);
+ }
+ },
+ executor);
+
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 7252d8a..2bd6683 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.metrics;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler {
private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics";
- public JobManagerMetricsHandler(MetricFetcher fetcher) {
- super(fetcher);
+ public JobManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index a193457..e5e2500 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.metrics;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Request handler that returns for a given job a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobMetricsHandler extends AbstractMetricsHandler {
public static final String PARAMETER_JOB_ID = "jobid";
private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics";
- public JobMetricsHandler(MetricFetcher fetcher) {
- super(fetcher);
+ public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index e893da4..1d2cd84 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.metrics;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler {
public static final String PARAMETER_VERTEX_ID = "vertexid";
private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics";
- public JobVertexMetricsHandler(MetricFetcher fetcher) {
- super(fetcher);
+ public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
}
@Override