You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/03 22:00:17 UTC
[1/3] flink git commit: [FLINK-7409] [web] Make WebRuntimeMonitor
reactive
Repository: flink
Updated Branches:
refs/heads/master 1804aa33d -> ab1fbfdfe
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
index bcc62cb..fde16fc 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Request handler that returns for a given task manager a list of all available metrics or the values for a set of metrics.
@@ -37,8 +38,8 @@ public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
private static final String TASKMANAGER_METRICS_REST_PATH = "/taskmanagers/:taskmanagerid/metrics";
- public TaskManagerMetricsHandler(MetricFetcher fetcher) {
- super(fetcher);
+ public TaskManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
index 865385f..69ee762 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
import org.junit.Assert;
import org.junit.Test;
@@ -29,7 +30,7 @@ import org.junit.Test;
public class ClusterOverviewHandlerTest {
@Test
public void testGetPaths() {
- ClusterOverviewHandler handler = new ClusterOverviewHandler(Time.seconds(0L));
+ ClusterOverviewHandler handler = new ClusterOverviewHandler(Executors.directExecutor(), Time.seconds(0L));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/overview", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
index ea26f5d..6061e4b 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
import org.junit.Assert;
import org.junit.Test;
@@ -29,7 +30,7 @@ import org.junit.Test;
public class CurrentJobIdsHandlerTest {
@Test
public void testGetPaths() {
- CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Time.seconds(0L));
+ CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Executors.directExecutor(), Time.seconds(0L));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index 64360d3..ccfafd4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -66,17 +67,17 @@ public class CurrentJobsOverviewHandlerTest {
@Test
public void testGetPaths() {
- CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Time.seconds(0L), true, true);
+ CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, true);
String[] pathsAll = handlerAll.getPaths();
Assert.assertEquals(1, pathsAll.length);
Assert.assertEquals("/joboverview", pathsAll[0]);
- CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Time.seconds(0L), true, false);
+ CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, false);
String[] pathsRunning = handlerRunning.getPaths();
Assert.assertEquals(1, pathsRunning.length);
Assert.assertEquals("/joboverview/running", pathsRunning[0]);
- CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Time.seconds(0L), false, true);
+ CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, true);
String[] pathsCompleted = handlerCompleted.getPaths();
Assert.assertEquals(1, pathsCompleted.length);
Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
index d17b55f..22b3e5e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
@@ -33,7 +34,7 @@ import java.util.TimeZone;
public class DashboardConfigHandlerTest {
@Test
public void testGetPaths() {
- DashboardConfigHandler handler = new DashboardConfigHandler(10000L);
+ DashboardConfigHandler handler = new DashboardConfigHandler(Executors.directExecutor(), 10000L);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/config", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
index a498cf2..97091cf 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
+
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.junit.Assert;
@@ -31,7 +33,7 @@ import java.util.List;
public class JarAccessDeniedHandlerTest {
@Test
public void testGetPaths() {
- JarAccessDeniedHandler handler = new JarAccessDeniedHandler();
+ JarAccessDeniedHandler handler = new JarAccessDeniedHandler(Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(5, paths.length);
List<String> pathsList = Lists.newArrayList(paths);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
index bcbb1ea..a067d8f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
+
import org.junit.Assert;
import org.junit.Test;
@@ -27,7 +29,7 @@ import org.junit.Test;
public class JarDeleteHandlerTest {
@Test
public void testGetPaths() {
- JarDeleteHandler handler = new JarDeleteHandler(null);
+ JarDeleteHandler handler = new JarDeleteHandler(Executors.directExecutor(), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jars/:jarid", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
index 863c248..5da4913 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
+
import org.junit.Assert;
import org.junit.Test;
@@ -27,7 +29,7 @@ import org.junit.Test;
public class JarListHandlerTest {
@Test
public void testGetPaths() {
- JarListHandler handler = new JarListHandler(null);
+ JarListHandler handler = new JarListHandler(Executors.directExecutor(), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jars", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
index a3ded83..f5ed339 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
+
import org.junit.Assert;
import org.junit.Test;
@@ -27,7 +29,7 @@ import org.junit.Test;
public class JarPlanHandlerTest {
@Test
public void testGetPaths() {
- JarPlanHandler handler = new JarPlanHandler(null);
+ JarPlanHandler handler = new JarPlanHandler(Executors.directExecutor(), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jars/:jarid/plan", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index 82aa87a..67dad13 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Executors;
import org.junit.Assert;
import org.junit.Test;
@@ -30,7 +31,7 @@ import org.junit.Test;
public class JarRunHandlerTest {
@Test
public void testGetPaths() {
- JarRunHandler handler = new JarRunHandler(null, Time.seconds(0L), new Configuration());
+ JarRunHandler handler = new JarRunHandler(Executors.directExecutor(), null, Time.seconds(0L), new Configuration());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jars/:jarid/run", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index e57ca34..ea8b524 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
+
import org.junit.Assert;
import org.junit.Test;
@@ -27,7 +29,7 @@ import org.junit.Test;
public class JarUploadHandlerTest {
@Test
public void testGetPaths() {
- JarUploadHandler handler = new JarUploadHandler(null);
+ JarUploadHandler handler = new JarUploadHandler(Executors.directExecutor(), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jars/upload", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index fe55f51..5510fed 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -54,7 +55,7 @@ public class JobAccumulatorsHandlerTest {
@Test
public void testGetPaths() {
- JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+ JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
index a436b2d..86c5295 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -33,7 +34,7 @@ import java.util.List;
public class JobCancellationHandlerTest {
@Test
public void testGetPaths() {
- JobCancellationHandler handler = new JobCancellationHandler(TestingUtils.TIMEOUT());
+ JobCancellationHandler handler = new JobCancellationHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
String[] paths = handler.getPaths();
Assert.assertEquals(2, paths.length);
List<String> pathsList = Lists.newArrayList(paths);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
index b48ee66..529d130 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -92,7 +92,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
ExecutionGraph graph = mock(ExecutionGraph.class);
CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
- when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
when(graph.getCheckpointCoordinator()).thenReturn(coord);
when(coord.getCheckpointTimeout()).thenReturn(timeout);
@@ -121,7 +121,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
ExecutionGraph graph = mock(ExecutionGraph.class);
CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
- when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
when(graph.getCheckpointCoordinator()).thenReturn(coord);
when(coord.getCheckpointTimeout()).thenReturn(timeout);
@@ -152,7 +152,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
handler = handlers.getTriggerHandler();
try {
- handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+ handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
fail("Did not throw expected test Exception");
} catch (Exception e) {
IllegalStateException cause = (IllegalStateException) e.getCause();
@@ -169,7 +169,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
ExecutionGraph graph = mock(ExecutionGraph.class);
CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
- when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
when(graph.getCheckpointCoordinator()).thenReturn(coord);
JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -187,7 +187,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint);
// Trigger
- FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+ FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
@@ -206,7 +206,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
assertEquals(location, root.get("location").asText());
// Trigger again
- response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+ response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
@@ -225,7 +225,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
// Query progress
params.put("requestId", "1");
- response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+ response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
@@ -239,7 +239,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
// Complete
successfulCancelWithSavepoint.complete("_path-savepoint_");
- response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+ response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
assertEquals(HttpResponseStatus.CREATED, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
@@ -255,7 +255,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
// Query again, keep recent history
- response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+ response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
assertEquals(HttpResponseStatus.CREATED, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
@@ -272,7 +272,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
// Query for unknown request
params.put("requestId", "9929");
- response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+ response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
@@ -295,7 +295,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
ExecutionGraph graph = mock(ExecutionGraph.class);
CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
- when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+ when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
when(graph.getCheckpointCoordinator()).thenReturn(coord);
JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -319,7 +319,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
// Query progress
params.put("requestId", "1");
- FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+ FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index 104b0a3..1c08ae8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -55,7 +56,7 @@ public class JobConfigHandlerTest {
@Test
public void testGetPaths() {
- JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class));
+ JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/config", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index bfeb40a..ee0498e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -68,7 +69,7 @@ public class JobDetailsHandlerTest {
@Test
public void testGetPaths() {
- JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), null);
+ JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
String[] paths = handler.getPaths();
Assert.assertEquals(2, paths.length);
List<String> pathsList = Lists.newArrayList(paths);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index f54ab06..6e0f918 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -58,7 +59,7 @@ public class JobExceptionsHandlerTest {
@Test
public void testGetPaths() {
- JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class));
+ JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
index 8e16e8a..94fd5a8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
+
import org.junit.Assert;
import org.junit.Test;
@@ -27,7 +29,7 @@ import org.junit.Test;
public class JobManagerConfigHandlerTest {
@Test
public void testGetPaths() {
- JobManagerConfigHandler handler = new JobManagerConfigHandler(null);
+ JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobmanager/config", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
index 17b4c44..4a934ec 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -51,7 +52,7 @@ public class JobPlanHandlerTest {
@Test
public void testGetPaths() {
- JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class));
+ JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/plan", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
index 89bf426..8c05c83 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
@@ -34,7 +35,7 @@ import java.util.List;
public class JobStoppingHandlerTest extends TestLogger {
@Test
public void testGetPaths() {
- JobStoppingHandler handler = new JobStoppingHandler(TestingUtils.TIMEOUT());
+ JobStoppingHandler handler = new JobStoppingHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
String[] paths = handler.getPaths();
Assert.assertEquals(2, paths.length);
List<String> pathsList = Lists.newArrayList(paths);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index b7af323..5af9aa6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -57,7 +58,7 @@ public class JobVertexAccumulatorsHandlerTest {
@Test
public void testGetPaths() {
- JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+ JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
index d2ac0d6..0d15e08 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -46,7 +47,7 @@ import static org.mockito.Mockito.when;
public class JobVertexBackPressureHandlerTest {
@Test
public void testGetPaths() {
- JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), mock(BackPressureStatsTracker.class), 0);
+ JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);
@@ -63,10 +64,11 @@ public class JobVertexBackPressureHandlerTest {
JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
mock(ExecutionGraphHolder.class),
+ Executors.directExecutor(),
statsTracker,
9999);
- String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+ String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(response);
@@ -96,10 +98,11 @@ public class JobVertexBackPressureHandlerTest {
JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
mock(ExecutionGraphHolder.class),
+ Executors.directExecutor(),
statsTracker,
9999);
- String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+ String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(response);
@@ -157,10 +160,11 @@ public class JobVertexBackPressureHandlerTest {
JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
mock(ExecutionGraphHolder.class),
+ Executors.directExecutor(),
statsTracker,
0); // <----- refresh interval should fire immediately
- String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+ String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(response);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index bc4fe9c..1b8d9aa 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -59,7 +60,7 @@ public class JobVertexDetailsHandlerTest {
@Test
public void testGetPaths() {
- JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), null);
+ JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index d5d877a..badb952 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -61,7 +62,7 @@ public class JobVertexTaskManagersHandlerTest {
@Test
public void testGetPaths() {
- JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), null);
+ JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
index d992b85..a80bac9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.junit.Assert;
@@ -31,7 +32,7 @@ import static org.mockito.Mockito.mock;
public class SubtaskCurrentAttemptDetailsHandlerTest {
@Test
public void testGetPaths() {
- SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
+ SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index ce8e72f..6773fd4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -64,7 +65,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
@Test
public void testGetPaths() {
- SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+ SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index e1fbf92..7777d2d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -73,7 +74,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
@Test
public void testGetPaths() {
- SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
+ SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index f33da80..7b400da 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -58,7 +59,7 @@ public class SubtasksAllAccumulatorsHandlerTest {
@Test
public void testGetPaths() {
- SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+ SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 548efaf..31c2212 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -59,7 +60,7 @@ public class SubtasksTimesHandlerTest {
@Test
public void testGetPaths() {
- SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class));
+ SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
index afc2764..e3a71a1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -33,7 +34,7 @@ import java.util.List;
public class TaskManagersHandlerTest {
@Test
public void testGetPaths() {
- TaskManagersHandler handler = new TaskManagersHandler(Time.seconds(0L), null);
+ TaskManagersHandler handler = new TaskManagersHandler(Executors.directExecutor(), Time.seconds(0L), null);
String[] paths = handler.getPaths();
Assert.assertEquals(2, paths.length);
List<String> pathsList = Lists.newArrayList(paths);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index ce943b1..47298be 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -79,7 +80,7 @@ public class CheckpointConfigHandlerTest {
@Test
public void testGetPaths() {
- CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+ CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/checkpoints/config", paths[0]);
@@ -95,8 +96,8 @@ public class CheckpointConfigHandlerTest {
AccessExecutionGraph graph = graphAndSettings.graph;
JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings;
- CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
- String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+ CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(json);
@@ -121,8 +122,8 @@ public class CheckpointConfigHandlerTest {
AccessExecutionGraph graph = graphAndSettings.graph;
- CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
- String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+ CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(json);
@@ -140,8 +141,8 @@ public class CheckpointConfigHandlerTest {
AccessExecutionGraph graph = graphAndSettings.graph;
ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
- CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
- String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+ CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
ObjectMapper mapper = new ObjectMapper();
JsonNode externalizedNode = mapper.readTree(json).get("externalization");
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 0259aa5..f16d623 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -101,7 +102,7 @@ public class CheckpointStatsDetailsHandlerTest {
@Test
public void testGetPaths() {
- CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]);
@@ -113,10 +114,10 @@ public class CheckpointStatsDetailsHandlerTest {
@Test
public void testIllegalCheckpointId() throws Exception {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
params.put("checkpointid", "illegal checkpoint");
- String json = handler.handleRequest(graph, params);
+ String json = handler.handleRequest(graph, params).get();
assertEquals("{}", json);
}
@@ -127,8 +128,8 @@ public class CheckpointStatsDetailsHandlerTest {
@Test
public void testNoCheckpointIdParam() throws Exception {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
- String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+ CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
assertEquals("{}", json);
}
@@ -147,10 +148,10 @@ public class CheckpointStatsDetailsHandlerTest {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
- CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
params.put("checkpointid", "123");
- String json = handler.handleRequest(graph, params);
+ String json = handler.handleRequest(graph, params).get();
assertEquals("{}", json);
verify(history, times(1)).getCheckpointById(anyLong());
@@ -318,10 +319,10 @@ public class CheckpointStatsDetailsHandlerTest {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
- CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
params.put("checkpointid", "123");
- String json = handler.handleRequest(graph, params);
+ String json = handler.handleRequest(graph, params).get();
ObjectMapper mapper = new ObjectMapper();
return mapper.readTree(json);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index 9425a4c..ed73a62 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -86,7 +87,7 @@ public class CheckpointStatsHandlerTest {
@Test
public void testGetPaths() {
- CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
+ CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/checkpoints", paths[0]);
@@ -99,8 +100,8 @@ public class CheckpointStatsHandlerTest {
public void testCheckpointStatsRequest() throws Exception {
TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
- CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
- String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap());
+ CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+ String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap()).get();
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(json);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index b8eb715..9c5e168 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -100,7 +101,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
@Test
public void testGetPaths() {
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", paths[0]);
@@ -149,10 +150,10 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
@Test
public void testIllegalCheckpointId() throws Exception {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
params.put("checkpointid", "illegal checkpoint");
- String json = handler.handleRequest(graph, params);
+ String json = handler.handleRequest(graph, params).get();
assertEquals("{}", json);
}
@@ -163,8 +164,8 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
@Test
public void testNoCheckpointIdParam() throws Exception {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
- String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+ CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+ String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
assertEquals("{}", json);
}
@@ -183,11 +184,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
params.put("checkpointid", "123");
params.put("vertexid", new JobVertexID().toString());
- String json = handler.handleRequest(graph, params);
+ String json = handler.handleRequest(graph, params).get();
assertEquals("{}", json);
verify(history, times(1)).getCheckpointById(anyLong());
@@ -199,11 +200,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
@Test
public void testIllegalJobVertexIdParam() throws Exception {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
params.put("checkpointid", "1");
params.put("vertexid", "illegal vertex id");
- String json = handler.handleRequest(graph, params);
+ String json = handler.handleRequest(graph, params).get();
assertEquals("{}", json);
}
@@ -214,10 +215,10 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
@Test
public void testNoJobVertexIdParam() throws Exception {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
params.put("checkpointid", "1");
- String json = handler.handleRequest(graph, params);
+ String json = handler.handleRequest(graph, params).get();
assertEquals("{}", json);
}
@@ -238,11 +239,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
params.put("checkpointid", "123");
params.put("vertexid", new JobVertexID().toString());
- String json = handler.handleRequest(graph, params);
+ String json = handler.handleRequest(graph, params).get();
assertEquals("{}", json);
verify(inProgress, times(1)).getTaskStateStats(any(JobVertexID.class));
@@ -259,11 +260,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
- CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+ CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
params.put("checkpointid", "123");
params.put("vertexid", new JobVertexID().toString());
- String json = handler.handleRequest(graph, params);
+ String json = handler.handleRequest(graph, params).get();
ObjectMapper mapper = new ObjectMapper();
return mapper.readTree(json);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 90e032d..5296d33 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -49,7 +49,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStoreTest.setupStore(fetcher.getMetricStore());
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+ JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
Map<String, String> queryParams = new HashMap<>();
@@ -58,7 +58,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
pathParams.put("vertexid", "taskid");
// get list of available metrics
- String availableList = handler.handleJsonRequest(pathParams, queryParams, null);
+ String availableList = handler.handleJsonRequest(pathParams, queryParams, null).get();
assertEquals("[" +
"{\"id\":\"8.opname.abc.metric5\"}," +
@@ -69,7 +69,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
// get value for a single metric
queryParams.put("get", "8.opname.abc.metric5");
- String metricValue = handler.handleJsonRequest(pathParams, queryParams, null);
+ String metricValue = handler.handleJsonRequest(pathParams, queryParams, null).get();
assertEquals("[" +
"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
@@ -80,7 +80,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
// get values for multiple metrics
queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
- String metricValues = handler.handleJsonRequest(pathParams, queryParams, null);
+ String metricValues = handler.handleJsonRequest(pathParams, queryParams, null).get();
assertEquals("[" +
"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +
@@ -102,7 +102,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStoreTest.setupStore(fetcher.getMetricStore());
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+ JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
Map<String, String> queryParams = new HashMap<>();
@@ -114,7 +114,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
pathParams.put("jobid", "nonexistent");
try {
- assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+ assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
} catch (Exception e) {
fail();
}
@@ -132,7 +132,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStoreTest.setupStore(fetcher.getMetricStore());
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+ JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
Map<String, String> queryParams = new HashMap<>();
@@ -144,7 +144,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
queryParams.put("get", "");
try {
- assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+ assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
} catch (Exception e) {
fail(e.getMessage());
}
@@ -154,7 +154,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
queryParams.put("get", "subindex.opname.abc.metric5");
try {
- assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+ assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
} catch (Exception e) {
fail(e.getMessage());
}
@@ -164,7 +164,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
queryParams.put("get", "subindex.opname.abc.nonexistant");
try {
- assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+ assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
} catch (Exception e) {
fail(e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
index 994fc5e..b02949a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -40,7 +40,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
public class JobManagerMetricsHandlerTest extends TestLogger {
@Test
public void testGetPaths() {
- JobManagerMetricsHandler handler = new JobManagerMetricsHandler(mock(MetricFetcher.class));
+ JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobmanager/metrics", paths[0]);
@@ -55,7 +55,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
- JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
+ JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
@@ -73,7 +73,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStore store = fetcher.getMetricStore();
- JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
+ JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
index a35af22..569f772 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -41,7 +41,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
public class JobMetricsHandlerTest extends TestLogger {
@Test
public void testGetPaths() {
- JobMetricsHandler handler = new JobMetricsHandler(mock(MetricFetcher.class));
+ JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/metrics", paths[0]);
@@ -56,7 +56,7 @@ public class JobMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
- JobMetricsHandler handler = new JobMetricsHandler(fetcher);
+ JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
pathParams.put(PARAMETER_JOB_ID, "jobid");
@@ -75,7 +75,7 @@ public class JobMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStore store = fetcher.getMetricStore();
- JobMetricsHandler handler = new JobMetricsHandler(fetcher);
+ JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
index e84b11d..e6bbd2e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -42,7 +42,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
public class JobVertexMetricsHandlerTest extends TestLogger {
@Test
public void testGetPaths() {
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(mock(MetricFetcher.class));
+ JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/metrics", paths[0]);
@@ -57,7 +57,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+ JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
pathParams.put(PARAMETER_JOB_ID, "jobid");
@@ -79,7 +79,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStore store = fetcher.getMetricStore();
- JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+ JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
index c20ea98..c4c1c7a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -41,7 +41,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
public class TaskManagerMetricsHandlerTest extends TestLogger {
@Test
public void testGetPaths() {
- TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(mock(MetricFetcher.class));
+ TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
String[] paths = handler.getPaths();
Assert.assertEquals(1, paths.length);
Assert.assertEquals("/taskmanagers/:taskmanagerid/metrics", paths[0]);
@@ -56,7 +56,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
- TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
+ TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
pathParams.put(TASK_MANAGER_ID_KEY, "tmid");
@@ -75,7 +75,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
TestingUtils.TIMEOUT());
MetricStore store = fetcher.getMetricStore();
- TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
+ TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
Map<String, String> pathParams = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8c551a7..f0073db 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2232,7 +2232,7 @@ object JobManager {
new AkkaJobManagerRetriever(jobManagerSystem, timeout),
new AkkaQueryServiceRetriever(jobManagerSystem, timeout),
timeout,
- jobManagerSystem.dispatcher)
+ futureExecutor)
Option(webServer)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e622130..41e5629 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -1094,7 +1094,7 @@ public class TaskExecutorTest extends TestLogger {
fail("The slot request should have failed.");
} catch (Exception e) {
- assertTrue(ExceptionUtils.containsThrowable(e, SlotAllocationException.class));
+ assertTrue(ExceptionUtils.findThrowable(e, SlotAllocationException.class).isPresent());
}
// re-register
[3/3] flink git commit: [FLINK-7409] [web] Make WebRuntimeMonitor
reactive
Posted by tr...@apache.org.
[FLINK-7409] [web] Make WebRuntimeMonitor reactive
This commit changes the behaviour of the WebRuntimeMonitor to not longer block serving
threads by waiting on the result of futures. Instead the RequestHandler now returns a
CompletableFuture<FullHttpResponse> which is written out to the Netty channel upon
completion. This will improve the performance of our WebRuntimeMonitor.
This closes #4527.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab1fbfdf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab1fbfdf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab1fbfdf
Branch: refs/heads/master
Commit: ab1fbfdfe6c1f2b6885710f98b9480cb90396ac0
Parents: 1804aa3
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Aug 10 10:56:12 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Sun Sep 3 23:43:58 2017 +0200
----------------------------------------------------------------------
.../RetryRejectedExecutionFailureHandler.java | 2 +-
.../org/apache/flink/util/ExceptionUtils.java | 13 +-
.../webmonitor/ExecutionGraphHolder.java | 16 +-
.../webmonitor/RuntimeMonitorHandler.java | 60 +++--
.../runtime/webmonitor/WebRuntimeMonitor.java | 90 +++----
.../AbstractExecutionGraphRequestHandler.java | 35 ++-
.../AbstractJobVertexRequestHandler.java | 10 +-
.../handlers/AbstractJsonRequestHandler.java | 33 ++-
.../AbstractSubtaskAttemptRequestHandler.java | 22 +-
.../handlers/AbstractSubtaskRequestHandler.java | 18 +-
.../handlers/ClusterOverviewHandler.java | 61 +++--
.../handlers/CurrentJobIdsHandler.java | 101 ++++----
.../handlers/CurrentJobsOverviewHandler.java | 89 ++++---
.../handlers/DashboardConfigHandler.java | 9 +-
.../handlers/JarAccessDeniedHandler.java | 10 +-
.../webmonitor/handlers/JarActionHandler.java | 4 +-
.../webmonitor/handlers/JarDeleteHandler.java | 62 +++--
.../webmonitor/handlers/JarListHandler.java | 157 ++++++------
.../webmonitor/handlers/JarPlanHandler.java | 43 ++--
.../webmonitor/handlers/JarRunHandler.java | 59 +++--
.../webmonitor/handlers/JarUploadHandler.java | 51 ++--
.../handlers/JobAccumulatorsHandler.java | 19 +-
.../handlers/JobCancellationHandler.java | 38 +--
.../JobCancellationWithSavepointHandlers.java | 158 ++++++------
.../webmonitor/handlers/JobConfigHandler.java | 20 +-
.../webmonitor/handlers/JobDetailsHandler.java | 19 +-
.../handlers/JobExceptionsHandler.java | 20 +-
.../handlers/JobManagerConfigHandler.java | 60 +++--
.../webmonitor/handlers/JobPlanHandler.java | 10 +-
.../webmonitor/handlers/JobStoppingHandler.java | 38 +--
.../handlers/JobVertexAccumulatorsHandler.java | 20 +-
.../handlers/JobVertexBackPressureHandler.java | 17 +-
.../handlers/JobVertexDetailsHandler.java | 19 +-
.../handlers/JobVertexTaskManagersHandler.java | 19 +-
.../webmonitor/handlers/RequestHandler.java | 9 +-
.../SubtaskCurrentAttemptDetailsHandler.java | 8 +-
...taskExecutionAttemptAccumulatorsHandler.java | 19 +-
.../SubtaskExecutionAttemptDetailsHandler.java | 19 +-
.../SubtasksAllAccumulatorsHandler.java | 19 +-
.../handlers/SubtasksTimesHandler.java | 19 +-
.../handlers/TaskManagersHandler.java | 256 ++++++++++---------
.../checkpoints/CheckpointConfigHandler.java | 19 +-
.../CheckpointStatsDetailsHandler.java | 63 +++--
.../CheckpointStatsDetailsSubtasksHandler.java | 33 ++-
.../checkpoints/CheckpointStatsHandler.java | 19 +-
.../metrics/AbstractMetricsHandler.java | 27 +-
.../metrics/JobManagerMetricsHandler.java | 5 +-
.../webmonitor/metrics/JobMetricsHandler.java | 5 +-
.../metrics/JobVertexMetricsHandler.java | 5 +-
.../metrics/TaskManagerMetricsHandler.java | 5 +-
.../handlers/ClusterOverviewHandlerTest.java | 3 +-
.../handlers/CurrentJobIdsHandlerTest.java | 3 +-
.../CurrentJobsOverviewHandlerTest.java | 7 +-
.../handlers/DashboardConfigHandlerTest.java | 3 +-
.../handlers/JarAccessDeniedHandlerTest.java | 4 +-
.../handlers/JarDeleteHandlerTest.java | 4 +-
.../webmonitor/handlers/JarListHandlerTest.java | 4 +-
.../webmonitor/handlers/JarPlanHandlerTest.java | 4 +-
.../webmonitor/handlers/JarRunHandlerTest.java | 3 +-
.../handlers/JarUploadHandlerTest.java | 4 +-
.../handlers/JobAccumulatorsHandlerTest.java | 3 +-
.../handlers/JobCancellationHandlerTest.java | 3 +-
...obCancellationWithSavepointHandlersTest.java | 24 +-
.../handlers/JobConfigHandlerTest.java | 3 +-
.../handlers/JobDetailsHandlerTest.java | 3 +-
.../handlers/JobExceptionsHandlerTest.java | 3 +-
.../handlers/JobManagerConfigHandlerTest.java | 4 +-
.../webmonitor/handlers/JobPlanHandlerTest.java | 3 +-
.../handlers/JobStoppingHandlerTest.java | 3 +-
.../JobVertexAccumulatorsHandlerTest.java | 3 +-
.../JobVertexBackPressureHandlerTest.java | 12 +-
.../handlers/JobVertexDetailsHandlerTest.java | 3 +-
.../JobVertexTaskManagersHandlerTest.java | 3 +-
...SubtaskCurrentAttemptDetailsHandlerTest.java | 3 +-
...ExecutionAttemptAccumulatorsHandlerTest.java | 3 +-
...btaskExecutionAttemptDetailsHandlerTest.java | 3 +-
.../SubtasksAllAccumulatorsHandlerTest.java | 3 +-
.../handlers/SubtasksTimesHandlerTest.java | 3 +-
.../handlers/TaskManagersHandlerTest.java | 3 +-
.../CheckpointConfigHandlerTest.java | 15 +-
.../CheckpointStatsDetailsHandlerTest.java | 19 +-
.../checkpoints/CheckpointStatsHandlerTest.java | 7 +-
...heckpointStatsSubtaskDetailsHandlerTest.java | 31 +--
.../metrics/AbstractMetricsHandlerTest.java | 20 +-
.../metrics/JobManagerMetricsHandlerTest.java | 6 +-
.../metrics/JobMetricsHandlerTest.java | 6 +-
.../metrics/JobVertexMetricsHandlerTest.java | 6 +-
.../metrics/TaskManagerMetricsHandlerTest.java | 6 +-
.../flink/runtime/jobmanager/JobManager.scala | 2 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 2 +-
90 files changed, 1278 insertions(+), 901 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
index 9380959..3706257 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -36,7 +36,7 @@ public class RetryRejectedExecutionFailureHandler implements ActionRequestFailur
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
- if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+ if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
indexer.add(action);
} else {
// rethrow all other failures
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 9c8907b..d141ecb 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -276,27 +277,27 @@ public final class ExceptionUtils {
}
/**
- * Checks whether a throwable chain contains a specific type of exception.
+ * Checks whether a throwable chain contains a specific type of exception and returns it.
*
* @param throwable the throwable chain to check.
* @param searchType the type of exception to search for in the chain.
- * @return True, if the searched type is nested in the throwable, false otherwise.
+ * @return Optional throwable of the requested type if available, otherwise empty
*/
- public static boolean containsThrowable(Throwable throwable, Class<?> searchType) {
+ public static Optional<Throwable> findThrowable(Throwable throwable, Class<?> searchType) {
if (throwable == null || searchType == null) {
- return false;
+ return Optional.empty();
}
Throwable t = throwable;
while (t != null) {
if (searchType.isAssignableFrom(t.getClass())) {
- return true;
+ return Optional.of(t);
} else {
t = t.getCause();
}
}
- return false;
+ return Optional.empty();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 739b375..8a96969 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -65,26 +64,23 @@ public class ExecutionGraphHolder {
*
* @param jid jobID of the execution graph to be retrieved
* @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
- * @throws Exception if the ExecutionGraph retrieval failed.
*/
- public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) throws Exception {
+ public CompletableFuture<Optional<AccessExecutionGraph>> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) {
AccessExecutionGraph cached = cache.get(jid);
if (cached != null) {
if (cached.getState() == JobStatus.SUSPENDED) {
cache.remove(jid);
} else {
- return Optional.of(cached);
+ return CompletableFuture.completedFuture(Optional.of(cached));
}
}
CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
- Optional<AccessExecutionGraph> result = executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ executionGraphFuture.thenAcceptAsync(
+ optExecutionGraph ->
+ optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph)));
- return result.map((executionGraph) -> {
- cache.put(jid, executionGraph);
-
- return executionGraph;
- });
+ return executionGraphFuture;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 35d13dd..6305537 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
@@ -45,6 +46,7 @@ import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +90,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
@Override
protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
- FullHttpResponse response;
+ CompletableFuture<FullHttpResponse> responseFuture;
try {
// we only pass the first element in the list to the handlers.
@@ -106,29 +108,41 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
queryParams.put(WEB_MONITOR_ADDRESS_KEY,
(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
- response = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
+ responseFuture = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
+ } catch (Exception e) {
+ responseFuture = FutureUtils.completedExceptionally(e);
}
- catch (NotFoundException e) {
- // this should result in a 404 error code (not found)
- ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
- : Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
- response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
- LOG.debug("Error while handling request", e);
- }
- catch (Exception e) {
- byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);
- response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
- HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
- LOG.debug("Error while handling request", e);
- }
-
- response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
- KeepAliveWrite.flush(ctx, routed.request(), response);
+ responseFuture.whenComplete(
+ (FullHttpResponse httpResponse, Throwable throwable) -> {
+ final FullHttpResponse finalResponse;
+
+ if (throwable != null) {
+ LOG.debug("Error while handling request.", throwable);
+
+ Optional<Throwable> optNotFound = ExceptionUtils.findThrowable(throwable, NotFoundException.class);
+
+ if (optNotFound.isPresent()) {
+ // this should result in a 404 error code (not found)
+ Throwable e = optNotFound.get();
+ ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
+ : Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
+ finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
+ finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+ finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
+ } else {
+ byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
+ finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+ HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
+ finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+ finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
+ }
+ } else {
+ finalResponse = httpResponse;
+ }
+
+ finalResponse.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
+ KeepAliveWrite.flush(ctx, routed.request(), finalResponse);
+ });
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 17f02f0..e74541e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -232,41 +232,41 @@ public class WebRuntimeMonitor implements WebMonitor {
Router router = new Router();
// config how to interact with this web server
- get(router, new DashboardConfigHandler(cfg.getRefreshInterval()));
+ get(router, new DashboardConfigHandler(executor, cfg.getRefreshInterval()));
// the overview - how many task managers, slots, free slots, ...
- get(router, new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT));
+ get(router, new ClusterOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT));
// job manager configuration
- get(router, new JobManagerConfigHandler(config));
+ get(router, new JobManagerConfigHandler(executor, config));
// overview over jobs
- get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true));
- get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false));
- get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true));
-
- get(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT));
-
- get(router, new JobDetailsHandler(currentGraphs, metricFetcher));
-
- get(router, new JobVertexDetailsHandler(currentGraphs, metricFetcher));
- get(router, new SubtasksTimesHandler(currentGraphs));
- get(router, new JobVertexTaskManagersHandler(currentGraphs, metricFetcher));
- get(router, new JobVertexAccumulatorsHandler(currentGraphs));
- get(router, new JobVertexBackPressureHandler(currentGraphs, backPressureStatsTracker, refreshInterval));
- get(router, new JobVertexMetricsHandler(metricFetcher));
- get(router, new SubtasksAllAccumulatorsHandler(currentGraphs));
- get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher));
- get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher));
- get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs));
-
- get(router, new JobPlanHandler(currentGraphs));
- get(router, new JobConfigHandler(currentGraphs));
- get(router, new JobExceptionsHandler(currentGraphs));
- get(router, new JobAccumulatorsHandler(currentGraphs));
- get(router, new JobMetricsHandler(metricFetcher));
-
- get(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher));
+ get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, true));
+ get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, false));
+ get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, false, true));
+
+ get(router, new CurrentJobIdsHandler(executor, DEFAULT_REQUEST_TIMEOUT));
+
+ get(router, new JobDetailsHandler(currentGraphs, executor, metricFetcher));
+
+ get(router, new JobVertexDetailsHandler(currentGraphs, executor, metricFetcher));
+ get(router, new SubtasksTimesHandler(currentGraphs, executor));
+ get(router, new JobVertexTaskManagersHandler(currentGraphs, executor, metricFetcher));
+ get(router, new JobVertexAccumulatorsHandler(currentGraphs, executor));
+ get(router, new JobVertexBackPressureHandler(currentGraphs, executor, backPressureStatsTracker, refreshInterval));
+ get(router, new JobVertexMetricsHandler(executor, metricFetcher));
+ get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, executor));
+ get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
+ get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
+ get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs, executor));
+
+ get(router, new JobPlanHandler(currentGraphs, executor));
+ get(router, new JobConfigHandler(currentGraphs, executor));
+ get(router, new JobExceptionsHandler(currentGraphs, executor));
+ get(router, new JobAccumulatorsHandler(currentGraphs, executor));
+ get(router, new JobMetricsHandler(executor, metricFetcher));
+
+ get(router, new TaskManagersHandler(executor, DEFAULT_REQUEST_TIMEOUT, metricFetcher));
get(router,
new TaskManagerLogHandler(
retriever,
@@ -287,7 +287,7 @@ public class WebRuntimeMonitor implements WebMonitor {
config,
enableSSL,
blobView));
- get(router, new TaskManagerMetricsHandler(metricFetcher));
+ get(router, new TaskManagerMetricsHandler(executor, metricFetcher));
router
// log and stdout
@@ -299,48 +299,48 @@ public class WebRuntimeMonitor implements WebMonitor {
new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
enableSSL));
- get(router, new JobManagerMetricsHandler(metricFetcher));
+ get(router, new JobManagerMetricsHandler(executor, metricFetcher));
// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
- get(router, new JobCancellationHandler(timeout));
+ get(router, new JobCancellationHandler(executor, timeout));
// DELETE is the preferred way of canceling a job (Rest-conform)
- delete(router, new JobCancellationHandler(timeout));
+ delete(router, new JobCancellationHandler(executor, timeout));
get(router, triggerHandler);
get(router, inProgressHandler);
// stop a job via GET (for proper integration with YARN this has to be performed via GET)
- get(router, new JobStoppingHandler(timeout));
+ get(router, new JobStoppingHandler(executor, timeout));
// DELETE is the preferred way of stopping a job (Rest-conform)
- delete(router, new JobStoppingHandler(timeout));
+ delete(router, new JobStoppingHandler(executor, timeout));
int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
// Register the checkpoint stats handlers
- get(router, new CheckpointStatsHandler(currentGraphs));
- get(router, new CheckpointConfigHandler(currentGraphs));
- get(router, new CheckpointStatsDetailsHandler(currentGraphs, cache));
- get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache));
+ get(router, new CheckpointStatsHandler(currentGraphs, executor));
+ get(router, new CheckpointConfigHandler(currentGraphs, executor));
+ get(router, new CheckpointStatsDetailsHandler(currentGraphs, executor, cache));
+ get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, executor, cache));
if (webSubmitAllow) {
// fetch the list of uploaded jars.
- get(router, new JarListHandler(uploadDir));
+ get(router, new JarListHandler(executor, uploadDir));
// get plan for an uploaded jar
- get(router, new JarPlanHandler(uploadDir));
+ get(router, new JarPlanHandler(executor, uploadDir));
// run a jar
- post(router, new JarRunHandler(uploadDir, timeout, config));
+ post(router, new JarRunHandler(executor, uploadDir, timeout, config));
// upload a jar
- post(router, new JarUploadHandler(uploadDir));
+ post(router, new JarUploadHandler(executor, uploadDir));
// delete an uploaded jar from submission interface
- delete(router, new JarDeleteHandler(uploadDir));
+ delete(router, new JarDeleteHandler(executor, uploadDir));
} else {
// send an Access Denied message
- JarAccessDeniedHandler jad = new JarAccessDeniedHandler();
+ JarAccessDeniedHandler jad = new JarAccessDeniedHandler(executor);
get(router, jad);
post(router, jad);
delete(router, jad);
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index 89108db..053d3f7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -28,6 +30,8 @@ import org.apache.flink.util.Preconditions;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Base class for request handlers whose response depends on an ExecutionGraph
@@ -37,12 +41,16 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
private final ExecutionGraphHolder executionGraphHolder;
- public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) {
+ public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executor);
this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+ public CompletableFuture<String> handleJsonRequest(
+ Map<String, String> pathParams,
+ Map<String, String> queryParams,
+ JobManagerGateway jobManagerGateway) {
String jidString = pathParams.get("jobid");
if (jidString == null) {
throw new RuntimeException("JobId parameter missing");
@@ -53,21 +61,20 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
jid = JobID.fromHexString(jidString);
}
catch (Exception e) {
- throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage());
+ return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e));
}
- final Optional<AccessExecutionGraph> optGraph;
+ final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
- try {
- optGraph = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
- } catch (Exception e) {
- throw new FlinkException("Could not retrieve ExecutionGraph for job with jobId " + jid + " from the JobManager.", e);
- }
-
- final AccessExecutionGraph graph = optGraph.orElseThrow(() -> new NotFoundException("Could not find job with jobId " + jid + '.'));
-
- return handleRequest(graph, pathParams);
+ return graphFuture.thenComposeAsync(
+ (Optional<AccessExecutionGraph> optGraph) -> {
+ if (optGraph.isPresent()) {
+ return handleRequest(optGraph.get(), pathParams);
+ } else {
+ throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.'));
+ }
+ }, executor);
}
- public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;
+ public abstract CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
index af9fc6c..df09225 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Base class for request handlers whose response depends on a specific job vertex (defined
@@ -31,12 +33,12 @@ import java.util.Map;
*/
public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler {
- public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
- public final String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+ public final CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
final JobVertexID vid = parseJobVertexId(params);
final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid);
@@ -66,5 +68,5 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
}
}
- public abstract String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
+ public abstract CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
index 266ffb0..1ec3f9c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -29,6 +30,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import java.nio.charset.Charset;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Base class for most request handlers. The handlers must produce a JSON response.
@@ -37,18 +40,28 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
private static final Charset ENCODING = Charset.forName("UTF-8");
+ protected final Executor executor;
+
+ protected AbstractJsonRequestHandler(Executor executor) {
+ this.executor = Preconditions.checkNotNull(executor);
+ }
+
@Override
- public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- String result = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
- byte[] bytes = result.getBytes(ENCODING);
+ public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ CompletableFuture<String> resultFuture = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
+
+ return resultFuture.thenApplyAsync(
+ (String result) -> {
+ byte[] bytes = result.getBytes(ENCODING);
- DefaultFullHttpResponse response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+ DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
- return response;
+ return response;
+ });
}
/**
@@ -66,9 +79,9 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
* response with the exception message, other exceptions will cause a HTTP 500 response
* with the exception stack trace.
*/
- public abstract String handleJsonRequest(
+ public abstract CompletableFuture<String> handleJsonRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
- JobManagerGateway jobManagerGateway) throws Exception;
+ JobManagerGateway jobManagerGateway);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index 2792008..1b20673 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -18,11 +18,15 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.util.FlinkException;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Base class for request handlers whose response depends on a specific subtask execution attempt
@@ -32,15 +36,15 @@ import java.util.Map;
*/
public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler {
- public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
- public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
+ public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) {
final String attemptNumberString = params.get("attempt");
if (attemptNumberString == null) {
- throw new RuntimeException("Attempt number parameter missing");
+ return FutureUtils.completedExceptionally(new FlinkException("Attempt number parameter missing"));
}
final int attempt;
@@ -48,7 +52,7 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
attempt = Integer.parseInt(attemptNumberString);
}
catch (NumberFormatException e) {
- throw new RuntimeException("Invalid attempt number parameter");
+ return FutureUtils.completedExceptionally(new FlinkException("Invalid attempt number parameter"));
}
final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt();
@@ -61,14 +65,14 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
if (exec != null) {
return handleRequest(exec, params);
} else {
- throw new RequestHandlerException("Execution for attempt " + attempt +
- " has already been deleted.");
+ return FutureUtils.completedExceptionally(new RequestHandlerException("Execution for attempt " + attempt +
+ " has already been deleted."));
}
}
else {
- throw new RuntimeException("Attempt does not exist: " + attempt);
+ return FutureUtils.completedExceptionally(new FlinkException("Attempt does not exist: " + attempt));
}
}
- public abstract String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception;
+ public abstract CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
index b977228..ab85034 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
@@ -18,11 +18,15 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.util.FlinkException;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Base class for request handlers whose response depends on a specific subtask (defined via the
@@ -31,15 +35,15 @@ import java.util.Map;
*/
public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler {
- public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
- public final String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+ public final CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
final String subtaskNumberString = params.get("subtasknum");
if (subtaskNumberString == null) {
- throw new RuntimeException("Subtask number parameter missing");
+ return FutureUtils.completedExceptionally(new FlinkException("Subtask number parameter missing"));
}
final int subtask;
@@ -47,16 +51,16 @@ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexReq
subtask = Integer.parseInt(subtaskNumberString);
}
catch (NumberFormatException e) {
- throw new RuntimeException("Invalid subtask number parameter");
+ return FutureUtils.completedExceptionally(new FlinkException("Invalid subtask number parameter", e));
}
if (subtask < 0 || subtask >= jobVertex.getParallelism()) {
- throw new RuntimeException("subtask does not exist: " + subtask);
+ return FutureUtils.completedExceptionally(new FlinkException("subtask does not exist: " + subtask));
}
final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
return handleRequest(vertex, params);
}
- public abstract String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception;
+ public abstract CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 4ebc4e7..17db2e8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -19,16 +19,20 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
+import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -46,7 +50,8 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
private final Time timeout;
- public ClusterOverviewHandler(Time timeout) {
+ public ClusterOverviewHandler(Executor executor, Time timeout) {
+ super(executor);
this.timeout = checkNotNull(timeout);
}
@@ -56,39 +61,45 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
// we need no parameters, get all requests
try {
if (jobManagerGateway != null) {
CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
- StatusOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- gen.writeStartObject();
- gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
- gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
- gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
- gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
- gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
- gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
- gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
- gen.writeStringField("flink-version", version);
- if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
- gen.writeStringField("flink-commit", commitID);
- }
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
+ return overviewFuture.thenApplyAsync(
+ (StatusOverview overview) -> {
+ StringWriter writer = new StringWriter();
+ try {
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartObject();
+ gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
+ gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
+ gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
+ gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
+ gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
+ gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
+ gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
+ gen.writeStringField("flink-version", version);
+ if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
+ gen.writeStringField("flink-commit", commitID);
+ }
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
+ } catch (IOException exception) {
+ throw new FlinkFutureException("Could not write cluster overview.", exception);
+ }
+ },
+ executor);
} else {
throw new Exception("No connection to the leading JobManager.");
}
}
catch (Exception e) {
- throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
+ return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 778a300..acf1cd0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
@@ -28,6 +29,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
@@ -43,7 +45,8 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
private final Time timeout;
- public CurrentJobIdsHandler(Time timeout) {
+ public CurrentJobIdsHandler(Executor executor, Time timeout) {
+ super(executor);
this.timeout = requireNonNull(timeout);
}
@@ -53,53 +56,57 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- // we need no parameters, get all requests
- try {
- if (jobManagerGateway != null) {
- CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
- JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- gen.writeStartObject();
-
- gen.writeArrayFieldStart("jobs-running");
- for (JobID jid : overview.getJobsRunningOrPending()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-finished");
- for (JobID jid : overview.getJobsFinished()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-cancelled");
- for (JobID jid : overview.getJobsCancelled()) {
- gen.writeString(jid.toString());
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ // we need no parameters, get all requests
+ try {
+ if (jobManagerGateway != null) {
+ CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+ JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartObject();
+
+ gen.writeArrayFieldStart("jobs-running");
+ for (JobID jid : overview.getJobsRunningOrPending()) {
+ gen.writeString(jid.toString());
+ }
+ gen.writeEndArray();
+
+ gen.writeArrayFieldStart("jobs-finished");
+ for (JobID jid : overview.getJobsFinished()) {
+ gen.writeString(jid.toString());
+ }
+ gen.writeEndArray();
+
+ gen.writeArrayFieldStart("jobs-cancelled");
+ for (JobID jid : overview.getJobsCancelled()) {
+ gen.writeString(jid.toString());
+ }
+ gen.writeEndArray();
+
+ gen.writeArrayFieldStart("jobs-failed");
+ for (JobID jid : overview.getJobsFailed()) {
+ gen.writeString(jid.toString());
+ }
+ gen.writeEndArray();
+
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
+ }
+ else {
+ throw new Exception("No connection to the leading JobManager.");
+ }
}
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-failed");
- for (JobID jid : overview.getJobsFailed()) {
- gen.writeString(jid.toString());
+ catch (Exception e) {
+ throw new FlinkFutureException("Failed to fetch list of all running jobs.", e);
}
- gen.writeEndArray();
-
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
- }
- else {
- throw new Exception("No connection to the leading JobManager.");
- }
- }
- catch (Exception e) {
- throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
- }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index b324426..a5b116c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -36,7 +38,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -55,10 +57,12 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
private final boolean includeFinishedJobs;
public CurrentJobsOverviewHandler(
+ Executor executor,
Time timeout,
boolean includeRunningJobs,
boolean includeFinishedJobs) {
+ super(executor);
this.timeout = checkNotNull(timeout);
this.includeRunningJobs = includeRunningJobs;
this.includeFinishedJobs = includeFinishedJobs;
@@ -77,49 +81,50 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- if (jobManagerGateway != null) {
- CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
- MultipleJobsDetails result = jobDetailsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
- final long now = System.currentTimeMillis();
-
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartObject();
-
- if (includeRunningJobs && includeFinishedJobs) {
- gen.writeArrayFieldStart("running");
- for (JobDetails detail : result.getRunningJobs()) {
- writeJobDetailOverviewAsJson(detail, gen, now);
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("finished");
- for (JobDetails detail : result.getFinishedJobs()) {
- writeJobDetailOverviewAsJson(detail, gen, now);
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ if (jobManagerGateway != null) {
+ CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
+
+ return jobDetailsFuture.thenApplyAsync(
+ (MultipleJobsDetails result) -> {
+ final long now = System.currentTimeMillis();
+
+ StringWriter writer = new StringWriter();
+ try {
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+ gen.writeStartObject();
+
+ if (includeRunningJobs && includeFinishedJobs) {
+ gen.writeArrayFieldStart("running");
+ for (JobDetails detail : result.getRunningJobs()) {
+ writeJobDetailOverviewAsJson(detail, gen, now);
+ }
+ gen.writeEndArray();
+
+ gen.writeArrayFieldStart("finished");
+ for (JobDetails detail : result.getFinishedJobs()) {
+ writeJobDetailOverviewAsJson(detail, gen, now);
+ }
+ gen.writeEndArray();
+ } else {
+ gen.writeArrayFieldStart("jobs");
+ for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
+ writeJobDetailOverviewAsJson(detail, gen, now);
+ }
+ gen.writeEndArray();
+ }
+
+ gen.writeEndObject();
+ gen.close();
+ return writer.toString();
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write current jobs overview json.", e);
}
- gen.writeEndArray();
- }
- else {
- gen.writeArrayFieldStart("jobs");
- for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
- writeJobDetailOverviewAsJson(detail, gen, now);
- }
- gen.writeEndArray();
- }
-
- gen.writeEndObject();
- gen.close();
- return writer.toString();
- }
- else {
- throw new Exception("No connection to the leading JobManager.");
- }
+ },
+ executor);
}
- catch (Exception e) {
- throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
+ else {
+ return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index fe1d06b..39984b1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -27,6 +27,8 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Responder that returns the parameters that define how the asynchronous requests
@@ -39,7 +41,8 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
private final String configString;
- public DashboardConfigHandler(long refreshInterval) {
+ public DashboardConfigHandler(Executor executor, long refreshInterval) {
+ super(executor);
try {
this.configString = createConfigJson(refreshInterval);
}
@@ -55,8 +58,8 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- return this.configString;
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.completedFuture(configString);
}
public static String createConfigJson(long refreshInterval) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index db55169..978432b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Handler to deny access to jar-related REST calls.
@@ -30,6 +32,10 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
private static final String ERROR_MESSAGE = "{\"error\": \"Web submission interface is not " +
"available for this cluster. To enable it, set the configuration key ' jobmanager.web.submit.enable.'\"}";
+ public JarAccessDeniedHandler(Executor executor) {
+ super(executor);
+ }
+
@Override
public String[] getPaths() {
return new String[]{
@@ -42,7 +48,7 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- return ERROR_MESSAGE;
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.completedFuture(ERROR_MESSAGE);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index d86a21b..0b0d32e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -46,6 +46,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Abstract handler for fetching plan for a jar or running a jar.
@@ -54,7 +55,8 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
private final File jarDir;
- public JarActionHandler(File jarDirectory) {
+ public JarActionHandler(Executor executor, File jarDirectory) {
+ super(executor);
jarDir = jarDirectory;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 73771bd..d9df1d4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -26,6 +27,8 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Handles requests for deletion of jars.
@@ -36,7 +39,8 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
private final File jarDir;
- public JarDeleteHandler(File jarDirectory) {
+ public JarDeleteHandler(Executor executor, File jarDirectory) {
+ super(executor);
jarDir = jarDirectory;
}
@@ -46,33 +50,37 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
final String file = pathParams.get("jarid");
- try {
- File[] list = jarDir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.equals(file);
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ File[] list = jarDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.equals(file);
+ }
+ });
+ boolean success = false;
+ for (File f: list) {
+ // although next to impossible for multiple files, we still delete them.
+ success = success || f.delete();
+ }
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+ gen.writeStartObject();
+ if (!success) {
+ // this seems to always fail on Windows.
+ gen.writeStringField("error", "The requested jar couldn't be deleted. Please try again.");
+ }
+ gen.writeEndObject();
+ gen.close();
+ return writer.toString();
}
- });
- boolean success = false;
- for (File f: list) {
- // although next to impossible for multiple files, we still delete them.
- success = success || f.delete();
- }
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartObject();
- if (!success) {
- // this seems to always fail on Windows.
- gen.writeStringField("error", "The requested jar couldn't be deleted. Please try again.");
- }
- gen.writeEndObject();
- gen.close();
- return writer.toString();
- }
- catch (Exception e) {
- throw new RuntimeException("Failed to delete jar id " + pathParams.get("jarid") + ": " + e.getMessage(), e);
- }
+ catch (Exception e) {
+ throw new FlinkFutureException("Failed to delete jar id " + pathParams.get("jarid") + '.', e);
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 4f9b188..95281a4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
@@ -29,6 +30,8 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
@@ -41,7 +44,8 @@ public class JarListHandler extends AbstractJsonRequestHandler {
private final File jarDir;
- public JarListHandler(File jarDirectory) {
+ public JarListHandler(Executor executor, File jarDirectory) {
+ super(executor);
jarDir = jarDirectory;
}
@@ -51,88 +55,93 @@ public class JarListHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- gen.writeStartObject();
- gen.writeStringField("address", queryParams.get(RuntimeMonitorHandler.WEB_MONITOR_ADDRESS_KEY));
- gen.writeArrayFieldStart("files");
-
- File[] list = jarDir.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.endsWith(".jar");
- }
- });
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- for (File f : list) {
- // separate the uuid and the name parts.
- String id = f.getName();
+ gen.writeStartObject();
+ gen.writeStringField("address", queryParams.get(RuntimeMonitorHandler.WEB_MONITOR_ADDRESS_KEY));
+ gen.writeArrayFieldStart("files");
- int startIndex = id.indexOf("_");
- if (startIndex < 0) {
- continue;
- }
- String name = id.substring(startIndex + 1);
- if (name.length() < 5 || !name.endsWith(".jar")) {
- continue;
- }
+ File[] list = jarDir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.endsWith(".jar");
+ }
+ });
- gen.writeStartObject();
- gen.writeStringField("id", id);
- gen.writeStringField("name", name);
- gen.writeNumberField("uploaded", f.lastModified());
- gen.writeArrayFieldStart("entry");
+ for (File f : list) {
+ // separate the uuid and the name parts.
+ String id = f.getName();
- String[] classes = new String[0];
- try {
- JarFile jar = new JarFile(f);
- Manifest manifest = jar.getManifest();
- String assemblerClass = null;
-
- if (manifest != null) {
- assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
- if (assemblerClass == null) {
- assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
+ int startIndex = id.indexOf("_");
+ if (startIndex < 0) {
+ continue;
+ }
+ String name = id.substring(startIndex + 1);
+ if (name.length() < 5 || !name.endsWith(".jar")) {
+ continue;
}
- }
- if (assemblerClass != null) {
- classes = assemblerClass.split(",");
- }
- } catch (IOException ignored) {
- // we simply show no entries here
- }
-
- // show every entry class that can be loaded later on.
- for (String clazz : classes) {
- clazz = clazz.trim();
- PackagedProgram program = null;
- try {
- program = new PackagedProgram(f, clazz, new String[0]);
- } catch (Exception ignored) {
- // ignore jar files which throw an error upon creating a PackagedProgram
- }
- if (program != null) {
gen.writeStartObject();
- gen.writeStringField("name", clazz);
- String desc = program.getDescription();
- gen.writeStringField("description", desc == null ? "No description provided" : desc);
+ gen.writeStringField("id", id);
+ gen.writeStringField("name", name);
+ gen.writeNumberField("uploaded", f.lastModified());
+ gen.writeArrayFieldStart("entry");
+
+ String[] classes = new String[0];
+ try {
+ JarFile jar = new JarFile(f);
+ Manifest manifest = jar.getManifest();
+ String assemblerClass = null;
+
+ if (manifest != null) {
+ assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
+ if (assemblerClass == null) {
+ assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
+ }
+ }
+ if (assemblerClass != null) {
+ classes = assemblerClass.split(",");
+ }
+ } catch (IOException ignored) {
+ // we simply show no entries here
+ }
+
+ // show every entry class that can be loaded later on.
+ for (String clazz : classes) {
+ clazz = clazz.trim();
+
+ PackagedProgram program = null;
+ try {
+ program = new PackagedProgram(f, clazz, new String[0]);
+ } catch (Exception ignored) {
+ // ignore jar files which throw an error upon creating a PackagedProgram
+ }
+ if (program != null) {
+ gen.writeStartObject();
+ gen.writeStringField("name", clazz);
+ String desc = program.getDescription();
+ gen.writeStringField("description", desc == null ? "No description provided" : desc);
+ gen.writeEndObject();
+ }
+ }
+ gen.writeEndArray();
gen.writeEndObject();
}
+ gen.writeEndArray();
+ gen.writeEndObject();
+ gen.close();
+ return writer.toString();
}
- gen.writeEndArray();
- gen.writeEndObject();
- }
- gen.writeEndArray();
- gen.writeEndObject();
- gen.close();
- return writer.toString();
- }
- catch (Exception e) {
- throw new RuntimeException("Failed to fetch jar list: " + e.getMessage(), e);
- }
+ catch (Exception e) {
+ throw new FlinkFutureException("Failed to fetch jar list.", e);
+ }
+ },
+ executor);
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index b239160..b117b3d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -27,6 +28,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
import java.io.File;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* This handler handles requests to fetch plan for a jar.
@@ -35,8 +38,8 @@ public class JarPlanHandler extends JarActionHandler {
static final String JAR_PLAN_REST_PATH = "/jars/:jarid/plan";
- public JarPlanHandler(File jarDirectory) {
- super(jarDirectory);
+ public JarPlanHandler(Executor executor, File jarDirectory) {
+ super(executor, jarDirectory);
}
@Override
@@ -45,21 +48,25 @@ public class JarPlanHandler extends JarActionHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
- JobGraph graph = getJobGraphAndClassLoader(config).f0;
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartObject();
- gen.writeFieldName("plan");
- gen.writeRawValue(JsonPlanGenerator.generatePlan(graph));
- gen.writeEndObject();
- gen.close();
- return writer.toString();
- }
- catch (Exception e) {
- return sendError(e);
- }
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+ JobGraph graph = getJobGraphAndClassLoader(config).f0;
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+ gen.writeStartObject();
+ gen.writeFieldName("plan");
+ gen.writeRawValue(JsonPlanGenerator.generatePlan(graph));
+ gen.writeEndObject();
+ gen.close();
+ return writer.toString();
+ }
+ catch (Exception e) {
+ throw new FlinkFutureException(e);
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 12ffa4f..7ada0b4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.util.Preconditions;
@@ -33,6 +34,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
import java.io.File;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* This handler handles requests to fetch plan for a jar.
@@ -44,8 +47,8 @@ public class JarRunHandler extends JarActionHandler {
private final Time timeout;
private final Configuration clientConfig;
- public JarRunHandler(File jarDirectory, Time timeout, Configuration clientConfig) {
- super(jarDirectory);
+ public JarRunHandler(Executor executor, File jarDirectory, Time timeout, Configuration clientConfig) {
+ super(executor, jarDirectory);
this.timeout = Preconditions.checkNotNull(timeout);
this.clientConfig = Preconditions.checkNotNull(clientConfig);
}
@@ -56,31 +59,35 @@ public class JarRunHandler extends JarActionHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
- Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+ Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
- try {
- JobClient.submitJobDetached(
- jobManagerGateway,
- clientConfig,
- graph.f0,
- timeout,
- graph.f1);
- } catch (JobExecutionException e) {
- throw new ProgramInvocationException("Failed to submit the job to the job manager", e);
- }
+ try {
+ JobClient.submitJobDetached(
+ jobManagerGateway,
+ clientConfig,
+ graph.f0,
+ timeout,
+ graph.f1);
+ } catch (JobExecutionException e) {
+ throw new ProgramInvocationException("Failed to submit the job to the job manager", e);
+ }
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartObject();
- gen.writeStringField("jobid", graph.f0.getJobID().toString());
- gen.writeEndObject();
- gen.close();
- return writer.toString();
- } catch (Exception e) {
- return sendError(e);
- }
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+ gen.writeStartObject();
+ gen.writeStringField("jobid", graph.f0.getJobID().toString());
+ gen.writeEndObject();
+ gen.close();
+ return writer.toString();
+ } catch (Exception e) {
+ throw new FlinkFutureException("Could not run the jar.", e);
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 705c321..61b3f58 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import java.io.File;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Handles requests for uploading of jars.
@@ -33,7 +35,8 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
private final File jarDir;
- public JarUploadHandler(File jarDir) {
+ public JarUploadHandler(Executor executor, File jarDir) {
+ super(executor);
this.jarDir = jarDir;
}
@@ -43,34 +46,38 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(
+ public CompletableFuture<String> handleJsonRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
- JobManagerGateway jobManagerGateway) throws Exception {
+ JobManagerGateway jobManagerGateway) {
String tempFilePath = queryParams.get("filepath");
String filename = queryParams.get("filename");
- File tempFile;
- if (tempFilePath != null && (tempFile = new File(tempFilePath)).exists()) {
- if (!tempFile.getName().endsWith(".jar")) {
- //noinspection ResultOfMethodCallIgnored
- tempFile.delete();
- return "{\"error\": \"Only Jar files are allowed.\"}";
- }
+ return CompletableFuture.supplyAsync(
+ () -> {
+ File tempFile;
+ if (tempFilePath != null && (tempFile = new File(tempFilePath)).exists()) {
+ if (!tempFile.getName().endsWith(".jar")) {
+ //noinspection ResultOfMethodCallIgnored
+ tempFile.delete();
+ return "{\"error\": \"Only Jar files are allowed.\"}";
+ }
- String filenameWithUUID = UUID.randomUUID() + "_" + filename;
- File newFile = new File(jarDir, filenameWithUUID);
- if (tempFile.renameTo(newFile)) {
- // all went well
- return "{\"status\": \"success\", \"filename\": \"" + filenameWithUUID + "\"}";
- }
- else {
- //noinspection ResultOfMethodCallIgnored
- tempFile.delete();
- }
- }
+ String filenameWithUUID = UUID.randomUUID() + "_" + filename;
+ File newFile = new File(jarDir, filenameWithUUID);
+ if (tempFile.renameTo(newFile)) {
+ // all went well
+ return "{\"status\": \"success\", \"filename\": \"" + filenameWithUUID + "\"}";
+ }
+ else {
+ //noinspection ResultOfMethodCallIgnored
+ tempFile.delete();
+ }
+ }
- return "{\"error\": \"Failed to upload the file.\"}";
+ return "{\"error\": \"Failed to upload the file.\"}";
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index 163e583..4dede3a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -31,6 +32,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the aggregated user accumulators of a job.
@@ -39,8 +42,8 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators";
- public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -49,8 +52,16 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createJobAccumulatorsJson(graph);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createJobAccumulatorsJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create job accumulators json.", e);
+ }
+ },
+ executor);
}
/**
[2/3] flink git commit: [FLINK-7409] [web] Make WebRuntimeMonitor
reactive
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index 513dc08..1a7d868 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler for the CANCEL request.
@@ -36,7 +39,8 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
private final Time timeout;
- public JobCancellationHandler(Time timeout) {
+ public JobCancellationHandler(Executor executor, Time timeout) {
+ super(executor);
this.timeout = Preconditions.checkNotNull(timeout);
}
@@ -46,19 +50,23 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
- if (jobManagerGateway != null) {
- jobManagerGateway.cancelJob(jobId, timeout);
- return "{}";
- }
- else {
- throw new Exception("No connection to the leading JobManager.");
- }
- }
- catch (Exception e) {
- throw new Exception("Failed to cancel the job with id: " + pathParams.get("jobid") + e.getMessage(), e);
- }
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+ if (jobManagerGateway != null) {
+ jobManagerGateway.cancelJob(jobId, timeout);
+ return "{}";
+ }
+ else {
+ throw new Exception("No connection to the leading JobManager.");
+ }
+ }
+ catch (Exception e) {
+ throw new FlinkFutureException("Failed to cancel the job with id: " + pathParams.get("jobid"), e);
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index 9b474aa..4e41447 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -24,12 +24,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.NotFoundException;
-import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -140,48 +141,48 @@ public class JobCancellationWithSavepointHandlers {
@Override
@SuppressWarnings("unchecked")
- public FullHttpResponse handleRequest(
+ public CompletableFuture<FullHttpResponse> handleRequest(
Map<String, String> pathParams,
Map<String, String> queryParams,
- JobManagerGateway jobManagerGateway) throws Exception {
+ JobManagerGateway jobManagerGateway) {
- try {
- if (jobManagerGateway != null) {
- JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
- final Optional<AccessExecutionGraph> optGraph;
+ if (jobManagerGateway != null) {
+ JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+ final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
- try {
- optGraph = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
- } catch (Exception e) {
- throw new FlinkException("Could not retrieve the execution with jobId " + jobId + " from the JobManager.", e);
- }
+ graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
- final AccessExecutionGraph graph = optGraph.orElseThrow(
- () -> new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
+ return graphFuture.thenApplyAsync(
+ (Optional<AccessExecutionGraph> optGraph) -> {
+ final AccessExecutionGraph graph = optGraph.orElseThrow(
+ () -> new FlinkFutureException(
+ new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
- CheckpointCoordinator coord = graph.getCheckpointCoordinator();
- if (coord == null) {
- throw new Exception("Cannot find CheckpointCoordinator for job.");
- }
+ CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+ if (coord == null) {
+ throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job."));
+ }
- String targetDirectory = pathParams.get("targetDirectory");
- if (targetDirectory == null) {
- if (defaultSavepointDirectory == null) {
- throw new IllegalStateException("No savepoint directory configured. " +
+ String targetDirectory = pathParams.get("targetDirectory");
+ if (targetDirectory == null) {
+ if (defaultSavepointDirectory == null) {
+ throw new IllegalStateException("No savepoint directory configured. " +
"You can either specify a directory when triggering this savepoint or " +
"configure a cluster-wide default via key '" +
CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
- } else {
- targetDirectory = defaultSavepointDirectory;
+ } else {
+ targetDirectory = defaultSavepointDirectory;
+ }
}
- }
- return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
- } else {
- throw new Exception("No connection to the leading JobManager.");
- }
- } catch (Exception e) {
- throw new Exception("Failed to cancel the job: " + e.getMessage(), e);
+ try {
+ return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not cancel job with savepoint.", e);
+ }
+ }, executor);
+ } else {
+ return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
}
}
@@ -288,64 +289,63 @@ public class JobCancellationWithSavepointHandlers {
@Override
@SuppressWarnings("unchecked")
- public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- if (jobManagerGateway != null) {
- JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
- long requestId = Long.parseLong(pathParams.get("requestId"));
+ public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
+ long requestId = Long.parseLong(pathParams.get("requestId"));
- synchronized (lock) {
- Object result = completed.remove(requestId);
-
- if (result != null) {
- // Add to recent history
- recentlyCompleted.add(new Tuple2<>(requestId, result));
- if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
- recentlyCompleted.remove();
- }
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ synchronized (lock) {
+ Object result = completed.remove(requestId);
+
+ if (result != null) {
+ // Add to recent history
+ recentlyCompleted.add(new Tuple2<>(requestId, result));
+ if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
+ recentlyCompleted.remove();
+ }
- if (result.getClass() == String.class) {
- String savepointPath = (String) result;
- return createSuccessResponse(requestId, savepointPath);
- } else {
- Throwable cause = (Throwable) result;
- return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
- }
- } else {
- // Check in-progress
- Long inProgressRequestId = inProgress.get(jobId);
- if (inProgressRequestId != null) {
- // Sanity check
- if (inProgressRequestId == requestId) {
- return createInProgressResponse(requestId);
+ if (result.getClass() == String.class) {
+ String savepointPath = (String) result;
+ return createSuccessResponse(requestId, savepointPath);
} else {
- String msg = "Request ID does not belong to JobID";
- return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
+ Throwable cause = (Throwable) result;
+ return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
}
- }
-
- // Check recent history
- for (Tuple2<Long, Object> recent : recentlyCompleted) {
- if (recent.f0 == requestId) {
- if (recent.f1.getClass() == String.class) {
- String savepointPath = (String) recent.f1;
- return createSuccessResponse(requestId, savepointPath);
+ } else {
+ // Check in-progress
+ Long inProgressRequestId = inProgress.get(jobId);
+ if (inProgressRequestId != null) {
+ // Sanity check
+ if (inProgressRequestId == requestId) {
+ return createInProgressResponse(requestId);
} else {
- Throwable cause = (Throwable) recent.f1;
- return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+ String msg = "Request ID does not belong to JobID";
+ return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
}
}
- }
- return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
+ // Check recent history
+ for (Tuple2<Long, Object> recent : recentlyCompleted) {
+ if (recent.f0 == requestId) {
+ if (recent.f1.getClass() == String.class) {
+ String savepointPath = (String) recent.f1;
+ return createSuccessResponse(requestId, savepointPath);
+ } else {
+ Throwable cause = (Throwable) recent.f1;
+ return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
+ }
+ }
+ }
+
+ return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
+ }
}
+ } catch (Exception e) {
+ throw new FlinkFutureException("Could not handle in progress request.", e);
}
- } else {
- throw new Exception("No connection to the leading JobManager.");
- }
- } catch (Exception e) {
- throw new Exception("Failed to cancel the job: " + e.getMessage(), e);
- }
+ });
}
private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 72cf8b7..0b15b37 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -31,6 +32,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the execution config of a job.
@@ -39,8 +42,8 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config";
- public JobConfigHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -49,8 +52,17 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createJobConfigJson(graph);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createJobConfigJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write job config json.", e);
+ }
+ },
+ executor);
+
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 87ac7c3..8a50f87 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -39,6 +40,8 @@ import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns details about a job. This includes:
@@ -57,8 +60,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
private final MetricFetcher fetcher;
- public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
- super(executionGraphHolder);
+ public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+ super(executionGraphHolder, executor);
this.fetcher = fetcher;
}
@@ -68,8 +71,16 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createJobDetailsJson(graph, fetcher);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createJobDetailsJson(graph, fetcher);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create job details json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index e31299b..6ffd443 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -35,6 +36,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the configuration of a job.
@@ -45,8 +48,8 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
- public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -55,8 +58,17 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createJobExceptionsJson(graph);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createJobExceptionsJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create job exceptions json.", e);
+ }
+ },
+ executor
+ );
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index e2437e6..cb6d8c0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -19,12 +19,16 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import com.fasterxml.jackson.core.JsonGenerator;
+import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Returns the Job Manager's configuration.
@@ -35,7 +39,8 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
private final Configuration config;
- public JobManagerConfigHandler(Configuration config) {
+ public JobManagerConfigHandler(Executor executor, Configuration config) {
+ super(executor);
this.config = config;
}
@@ -45,31 +50,38 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartArray();
- for (String key : config.keySet()) {
- gen.writeStartObject();
- gen.writeStringField("key", key);
+ gen.writeStartArray();
+ for (String key : config.keySet()) {
+ gen.writeStartObject();
+ gen.writeStringField("key", key);
- // Mask key values which contain sensitive information
- if (key.toLowerCase().contains("password")) {
- String value = config.getString(key, null);
- if (value != null) {
- value = "******";
- }
- gen.writeStringField("value", value);
- }
- else {
- gen.writeStringField("value", config.getString(key, null));
- }
- gen.writeEndObject();
- }
- gen.writeEndArray();
+ // Mask key values which contain sensitive information
+ if (key.toLowerCase().contains("password")) {
+ String value = config.getString(key, null);
+ if (value != null) {
+ value = "******";
+ }
+ gen.writeStringField("value", value);
+ } else {
+ gen.writeStringField("value", config.getString(key, null));
+ }
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
- gen.close();
- return writer.toString();
+ gen.close();
+ return writer.toString();
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write configuration.", e);
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index d17b6bb..b3a9dd5 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -27,6 +27,8 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the JSON program plan of a job graph.
@@ -35,8 +37,8 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
- public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -45,8 +47,8 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return graph.getJsonPlan();
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.completedFuture(graph.getJsonPlan());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 3526734..f63403f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler for the STOP request.
@@ -36,7 +39,8 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
private final Time timeout;
- public JobStoppingHandler(Time timeout) {
+ public JobStoppingHandler(Executor executor, Time timeout) {
+ super(executor);
this.timeout = Preconditions.checkNotNull(timeout);
}
@@ -46,19 +50,23 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
- if (jobManagerGateway != null) {
- jobManagerGateway.stopJob(jobId, timeout);
- return "{}";
- }
- else {
- throw new Exception("No connection to the leading JobManager.");
- }
- }
- catch (Exception e) {
- throw new Exception("Failed to stop the job with id: " + pathParams.get("jobid") + e.getMessage(), e);
- }
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+ if (jobManagerGateway != null) {
+ jobManagerGateway.stopJob(jobId, timeout);
+ return "{}";
+ }
+ else {
+ throw new Exception("No connection to the leading JobManager.");
+ }
+ }
+ catch (Exception e) {
+ throw new FlinkFutureException("Failed to stop the job with id: " + pathParams.get("jobid") + '.', e);
+ }
+ },
+ executor);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 8e90dfc..9c613ff 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -33,6 +34,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the accummulators for a given vertex.
@@ -41,8 +44,8 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators";
- public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -51,8 +54,17 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
}
@Override
- public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
- return createVertexAccumulatorsJson(jobVertex);
+ public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createVertexAccumulatorsJson(jobVertex);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create job vertex accumulators json.", e);
+ }
+ },
+ executor);
+
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index cde8ca9..963153f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -27,8 +28,11 @@ import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
import com.fasterxml.jackson.core.JsonGenerator;
+import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import scala.Option;
@@ -51,10 +55,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
public JobVertexBackPressureHandler(
ExecutionGraphHolder executionGraphHolder,
+ Executor executor,
BackPressureStatsTracker backPressureStatsTracker,
int refreshInterval) {
- super(executionGraphHolder);
+ super(executionGraphHolder, executor);
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker");
checkArgument(refreshInterval >= 0, "Negative timeout");
this.refreshInterval = refreshInterval;
@@ -66,11 +71,11 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
}
@Override
- public String handleRequest(
+ public CompletableFuture<String> handleRequest(
AccessExecutionJobVertex accessJobVertex,
- Map<String, String> params) throws Exception {
+ Map<String, String> params) {
if (accessJobVertex instanceof ArchivedExecutionJobVertex) {
- return "";
+ return CompletableFuture.completedFuture("");
}
ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex;
try (StringWriter writer = new StringWriter();
@@ -116,7 +121,9 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
gen.writeEndObject();
gen.close();
- return writer.toString();
+ return CompletableFuture.completedFuture(writer.toString());
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 7757fdd..bd1745c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -39,6 +40,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* A request handler that provides the details of a job vertex, including id, name, parallelism,
@@ -50,8 +53,8 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
private final MetricFetcher fetcher;
- public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
- super(executionGraphHolder);
+ public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+ super(executionGraphHolder, executor);
this.fetcher = fetcher;
}
@@ -61,8 +64,16 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
- return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+ public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write the vertex details json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index a612782..0827720 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -41,6 +42,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* A request handler that provides the details of a job vertex, including id, name, and the
@@ -52,8 +55,8 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
private final MetricFetcher fetcher;
- public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
- super(executionGraphHolder);
+ public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+ super(executionGraphHolder, executor);
this.fetcher = fetcher;
}
@@ -63,8 +66,16 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
}
@Override
- public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
- return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+ public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create TaskManager json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 079be8f..8ca785f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -19,11 +19,11 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.NotFoundException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
/**
* Base interface for all request handlers.
@@ -44,13 +44,8 @@ public interface RequestHandler {
* @param jobManagerGateway to talk to the JobManager.
*
* @return The full http response.
- *
- * @throws Exception Handlers may forward exceptions. Exceptions of type
- * {@link NotFoundException} will cause a HTTP 404
- * response with the exception message, other exceptions will cause a HTTP 500 response
- * with the exception stack trace.
*/
- FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception;
+ CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway);
/**
* Returns an array of REST URL's under which this handler can be registered.
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 28e9ddf..301b217 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler providing details about a single task execution attempt.
@@ -31,8 +33,8 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
- public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
- super(executionGraphHolder, fetcher);
+ public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+ super(executionGraphHolder, executor, fetcher);
}
@Override
@@ -41,7 +43,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
}
@Override
- public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
+ public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) {
return handleRequest(vertex.getCurrentExecutionAttempt(), params);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 171277f..3c0d1d9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Base class for request handlers whose response depends on a specific job vertex (defined
@@ -44,8 +47,8 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
- public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -54,8 +57,16 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
}
@Override
- public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
- return createAttemptAccumulatorsJson(execAttempt);
+ public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createAttemptAccumulatorsJson(execAttempt);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create accumulator json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 37c0e50..ad836df 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -40,6 +41,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import static org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
@@ -52,8 +55,8 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
private final MetricFetcher fetcher;
- public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
- super(executionGraphHolder);
+ public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+ super(executionGraphHolder, executor);
this.fetcher = fetcher;
}
@@ -63,8 +66,16 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
}
@Override
- public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
- return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
+ public CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create attempt details json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 64bdfb4..8142548 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the accumulators for all subtasks of job vertex.
@@ -43,8 +46,8 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
- public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -53,8 +56,16 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
}
@Override
- public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
- return createSubtasksAccumulatorsJson(jobVertex);
+ public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createSubtasksAccumulatorsJson(jobVertex);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create subtasks accumulator json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index ea88587..d766206 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -35,6 +36,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns the state transition timestamps for all subtasks, plus their
@@ -44,8 +47,8 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes";
- public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -54,8 +57,16 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
}
@Override
- public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
- return createSubtaskTimesJson(jobVertex);
+ public CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createSubtaskTimesJson(jobVertex);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write subtask time json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index a8ab7a3..9f83ed0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -28,14 +30,14 @@ import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.core.JsonGenerator;
+import java.io.IOException;
import java.io.StringWriter;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
import static java.util.Objects.requireNonNull;
@@ -53,7 +55,8 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
private final MetricFetcher fetcher;
- public TaskManagersHandler(Time timeout, MetricFetcher fetcher) {
+ public TaskManagersHandler(Executor executor, Time timeout, MetricFetcher fetcher) {
+ super(executor);
this.timeout = requireNonNull(timeout);
this.fetcher = fetcher;
}
@@ -64,134 +67,139 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- try {
- if (jobManagerGateway != null) {
- // whether one task manager's metrics are requested, or all task manager, we
- // return them in an array. This avoids unnecessary code complexity.
- // If only one task manager is requested, we only fetch one task manager metrics.
- final List<Instance> instances = new ArrayList<>();
- if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
- try {
- InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
- CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
-
- Optional<Instance> instance = tmInstanceFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
- instance.ifPresent(instances::add);
- }
- // this means the id string was invalid. Keep the list empty.
- catch (IllegalArgumentException e){
- // do nothing.
- }
- } else {
- CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
- Collection<Instance> tmInstances = tmInstancesFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- instances.addAll(tmInstances);
- }
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ if (jobManagerGateway != null) {
+ // whether one task manager's metrics are requested, or all task manager, we
+ // return them in an array. This avoids unnecessary code complexity.
+ // If only one task manager is requested, we only fetch one task manager metrics.
+ if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+ InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
+ CompletableFuture<Optional<Instance>> tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, timeout);
+
+ return tmInstanceFuture.thenApplyAsync(
+ (Optional<Instance> optTaskManager) -> {
+ try {
+ return writeTaskManagersJson(
+ optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
+ pathParams);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+ }
+ },
+ executor);
+ } else {
+ CompletableFuture<Collection<Instance>> tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+ return tmInstancesFuture.thenApplyAsync(
+ (Collection<Instance> taskManagers) -> {
+ try {
+ return writeTaskManagersJson(taskManagers, pathParams);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+ }
+ },
+ executor);
+ }
+ }
+ else {
+ return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
+ }
+ }
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- gen.writeStartObject();
- gen.writeArrayFieldStart("taskmanagers");
-
- for (Instance instance : instances) {
- gen.writeStartObject();
- gen.writeStringField("id", instance.getId().toString());
- gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
- gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
- gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
- gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
- gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
- gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
- gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
- gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
- gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
-
- // only send metrics when only one task manager requests them.
- if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
- fetcher.update();
- MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
- if (metrics != null) {
- gen.writeObjectFieldStart("metrics");
- long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
- long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
- long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
- gen.writeNumberField("heapCommitted", heapCommitted);
- gen.writeNumberField("heapUsed", heapUsed);
- gen.writeNumberField("heapMax", heapTotal);
-
- long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
- long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
- long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
- gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
- gen.writeNumberField("nonHeapUsed", nonHeapUsed);
- gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
- gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
- gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
- gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
- long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
- long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
- long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
- gen.writeNumberField("directCount", directCount);
- gen.writeNumberField("directUsed", directUsed);
- gen.writeNumberField("directMax", directMax);
-
- long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
- long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
- long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
- gen.writeNumberField("mappedCount", mappedCount);
- gen.writeNumberField("mappedUsed", mappedUsed);
- gen.writeNumberField("mappedMax", mappedMax);
-
- long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
- long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
- gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
- gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
- gen.writeArrayFieldStart("garbageCollectors");
-
- for (String gcName : metrics.garbageCollectorNames) {
- String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
- String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
- if (count != null && time != null) {
- gen.writeStartObject();
- gen.writeStringField("name", gcName);
- gen.writeNumberField("count", Long.valueOf(count));
- gen.writeNumberField("time", Long.valueOf(time));
- gen.writeEndObject();
- }
- }
-
- gen.writeEndArray();
+ private String writeTaskManagersJson(Collection<Instance> instances, Map<String, String> pathParams) throws IOException {
+ StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+ gen.writeStartObject();
+ gen.writeArrayFieldStart("taskmanagers");
+
+ for (Instance instance : instances) {
+ gen.writeStartObject();
+ gen.writeStringField("id", instance.getId().toString());
+ gen.writeStringField("path", instance.getTaskManagerGateway().getAddress());
+ gen.writeNumberField("dataPort", instance.getTaskManagerLocation().dataPort());
+ gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
+ gen.writeNumberField("slotsNumber", instance.getTotalNumberOfSlots());
+ gen.writeNumberField("freeSlots", instance.getNumberOfAvailableSlots());
+ gen.writeNumberField("cpuCores", instance.getResources().getNumberOfCPUCores());
+ gen.writeNumberField("physicalMemory", instance.getResources().getSizeOfPhysicalMemory());
+ gen.writeNumberField("freeMemory", instance.getResources().getSizeOfJvmHeap());
+ gen.writeNumberField("managedMemory", instance.getResources().getSizeOfManagedMemory());
+
+ // only send metrics when only one task manager requests them.
+ if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+ fetcher.update();
+ MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+ if (metrics != null) {
+ gen.writeObjectFieldStart("metrics");
+ long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+ long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+ long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+ gen.writeNumberField("heapCommitted", heapCommitted);
+ gen.writeNumberField("heapUsed", heapUsed);
+ gen.writeNumberField("heapMax", heapTotal);
+
+ long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+ long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+ long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+ gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+ gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+ gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+ gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+ gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+ gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+ long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+ long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+ long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+ gen.writeNumberField("directCount", directCount);
+ gen.writeNumberField("directUsed", directUsed);
+ gen.writeNumberField("directMax", directMax);
+
+ long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+ long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+ long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+ gen.writeNumberField("mappedCount", mappedCount);
+ gen.writeNumberField("mappedUsed", mappedUsed);
+ gen.writeNumberField("mappedMax", mappedMax);
+
+ long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+ long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+ gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+ gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+ gen.writeArrayFieldStart("garbageCollectors");
+
+ for (String gcName : metrics.garbageCollectorNames) {
+ String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+ String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+ if (count != null && time != null) {
+ gen.writeStartObject();
+ gen.writeStringField("name", gcName);
+ gen.writeNumberField("count", Long.valueOf(count));
+ gen.writeNumberField("time", Long.valueOf(time));
gen.writeEndObject();
}
}
+ gen.writeEndArray();
gen.writeEndObject();
}
-
- gen.writeEndArray();
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
- }
- else {
- throw new Exception("No connection to the leading JobManager.");
}
+
+ gen.writeEndObject();
}
- catch (Exception e) {
- throw new RuntimeException("Failed to fetch list of all task managers: " + e.getMessage(), e);
- }
+
+ gen.writeEndArray();
+ gen.writeEndObject();
+
+ gen.close();
+ return writer.toString();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index d4c9b2a..3affd7c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -34,6 +35,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Handler that returns a job's snapshotting settings.
@@ -42,8 +45,8 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config";
- public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -52,8 +55,16 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createCheckpointConfigJson(graph);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createCheckpointConfigJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create checkpoint config json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 664744b..96cc3e0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
@@ -40,6 +41,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Request handler that returns checkpoint stats for a single job vertex.
@@ -50,8 +53,8 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
private final CheckpointStatsCache cache;
- public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
- super(executionGraphHolder);
+ public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+ super(executionGraphHolder, executor);
this.cache = cache;
}
@@ -61,30 +64,38 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- long checkpointId = parseCheckpointId(params);
- if (checkpointId == -1) {
- return "{}";
- }
-
- CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
- if (snapshot == null) {
- return "{}";
- }
-
- AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
-
- if (checkpoint != null) {
- cache.tryAdd(checkpoint);
- } else {
- checkpoint = cache.tryGet(checkpointId);
-
- if (checkpoint == null) {
- return "{}";
- }
- }
-
- return createCheckpointDetailsJson(checkpoint);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ long checkpointId = parseCheckpointId(params);
+ if (checkpointId == -1) {
+ return "{}";
+ }
+
+ CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+ if (snapshot == null) {
+ return "{}";
+ }
+
+ AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
+
+ if (checkpoint != null) {
+ cache.tryAdd(checkpoint);
+ } else {
+ checkpoint = cache.tryGet(checkpointId);
+
+ if (checkpoint == null) {
+ return "{}";
+ }
+ }
+
+ try {
+ return createCheckpointDetailsJson(checkpoint);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create checkpoint details json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index d116c56..045248b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -43,6 +44,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -57,8 +60,8 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
private final CheckpointStatsCache cache;
- public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, CheckpointStatsCache cache) {
- super(executionGraphHolder);
+ public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+ super(executionGraphHolder, executor);
this.cache = checkNotNull(cache);
}
@@ -68,28 +71,28 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
}
@Override
- public String handleJsonRequest(
- Map<String, String> pathParams,
- Map<String, String> queryParams,
- JobManagerGateway jobManagerGateway) throws Exception {
+ public CompletableFuture<String> handleJsonRequest(
+ Map<String, String> pathParams,
+ Map<String, String> queryParams,
+ JobManagerGateway jobManagerGateway) {
return super.handleJsonRequest(pathParams, queryParams, jobManagerGateway);
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
long checkpointId = CheckpointStatsDetailsHandler.parseCheckpointId(params);
if (checkpointId == -1) {
- return "{}";
+ return CompletableFuture.completedFuture("{}");
}
JobVertexID vertexId = AbstractJobVertexRequestHandler.parseJobVertexId(params);
if (vertexId == null) {
- return "{}";
+ return CompletableFuture.completedFuture("{}");
}
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot == null) {
- return "{}";
+ return CompletableFuture.completedFuture("{}");
}
AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
@@ -100,16 +103,20 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
checkpoint = cache.tryGet(checkpointId);
if (checkpoint == null) {
- return "{}";
+ return CompletableFuture.completedFuture("{}");
}
}
TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId);
if (taskStats == null) {
- return "{}";
+ return CompletableFuture.completedFuture("{}");
}
- return createSubtaskCheckpointDetailsJson(checkpoint, taskStats);
+ try {
+ return CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint, taskStats));
+ } catch (IOException e) {
+ return FutureUtils.completedExceptionally(e);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index a86c5fd..a60aee0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
@@ -43,6 +44,8 @@ import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Handler that returns checkpoint statistics for a job.
@@ -51,8 +54,8 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints";
- public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder) {
- super(executionGraphHolder);
+ public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+ super(executionGraphHolder, executor);
}
@Override
@@ -61,8 +64,16 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
}
@Override
- public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
- return createCheckpointStatsJson(graph);
+ public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return createCheckpointStatsJson(graph);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not create checkpoint stats json.", e);
+ }
+ },
+ executor);
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index b95f2c4..cf286ce 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.webmonitor.metrics;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
@@ -28,6 +29,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
/**
* Abstract request handler that returns a list of all available metrics or the values for a set of metrics.
@@ -43,17 +46,27 @@ import java.util.Map;
public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler {
private final MetricFetcher fetcher;
- public AbstractMetricsHandler(MetricFetcher fetcher) {
+ public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor);
this.fetcher = Preconditions.checkNotNull(fetcher);
}
@Override
- public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
- fetcher.update();
- String requestedMetricsList = queryParams.get("get");
- return requestedMetricsList != null
- ? getMetricsValues(pathParams, requestedMetricsList)
- : getAvailableMetricsList(pathParams);
+ public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ fetcher.update();
+ String requestedMetricsList = queryParams.get("get");
+ try {
+ return requestedMetricsList != null
+ ? getMetricsValues(pathParams, requestedMetricsList)
+ : getAvailableMetricsList(pathParams);
+ } catch (IOException e) {
+ throw new FlinkFutureException("Could not retrieve metrics.", e);
+ }
+ },
+ executor);
+
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 7252d8a..2bd6683 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.metrics;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler {
private static final String JOBMANAGER_METRICS_REST_PATH = "/jobmanager/metrics";
- public JobManagerMetricsHandler(MetricFetcher fetcher) {
- super(fetcher);
+ public JobManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index a193457..e5e2500 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.metrics;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Request handler that returns for a given job a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobMetricsHandler extends AbstractMetricsHandler {
public static final String PARAMETER_JOB_ID = "jobid";
private static final String JOB_METRICS_REST_PATH = "/jobs/:jobid/metrics";
- public JobMetricsHandler(MetricFetcher fetcher) {
- super(fetcher);
+ public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index e893da4..1d2cd84 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.metrics;
import java.util.Map;
+import java.util.concurrent.Executor;
/**
* Request handler that returns for a given task a list of all available metrics or the values for a set of metrics.
@@ -35,8 +36,8 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler {
public static final String PARAMETER_VERTEX_ID = "vertexid";
private static final String JOB_VERTEX_METRICS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/metrics";
- public JobVertexMetricsHandler(MetricFetcher fetcher) {
- super(fetcher);
+ public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) {
+ super(executor, fetcher);
}
@Override