You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/19 22:44:25 UTC
[15/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler
to flink-runtime
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
deleted file mode 100644
index 1ec3f9c..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for most request handlers. The handlers must produce a JSON response.
- */
-public abstract class AbstractJsonRequestHandler implements RequestHandler {
-
- private static final Charset ENCODING = Charset.forName("UTF-8");
-
- protected final Executor executor;
-
- protected AbstractJsonRequestHandler(Executor executor) {
- this.executor = Preconditions.checkNotNull(executor);
- }
-
- @Override
- public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
- CompletableFuture<String> resultFuture = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
-
- return resultFuture.thenApplyAsync(
- (String result) -> {
- byte[] bytes = result.getBytes(ENCODING);
-
- DefaultFullHttpResponse response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
-
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
- return response;
- });
- }
-
- /**
- * Core method that handles the request and generates the response. The method needs to
- * respond with a valid JSON string. Exceptions may be thrown and will be handled.
- *
- * @param pathParams The map of REST path parameters, decoded by the router.
- * @param queryParams The map of query parameters.
- * @param jobManagerGateway to communicate with the JobManager.
- *
- * @return The JSON string that is the HTTP response.
- *
- * @throws Exception Handlers may forward exceptions. Exceptions of type
- * {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
- * response with the exception message, other exceptions will cause a HTTP 500 response
- * with the exception stack trace.
- */
- public abstract CompletableFuture<String> handleJsonRequest(
- Map<String, String> pathParams,
- Map<String, String> queryParams,
- JobManagerGateway jobManagerGateway);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
deleted file mode 100644
index 1b20673..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.FlinkException;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on a specific subtask execution attempt
- * (defined via the "attempt" parameter) of a specific subtask (defined via the
- * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a
- * specific job, defined via (defined voa the "jobid" parameter).
- */
-public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler {
-
- public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
- super(executionGraphHolder, executor);
- }
-
- @Override
- public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) {
- final String attemptNumberString = params.get("attempt");
- if (attemptNumberString == null) {
- return FutureUtils.completedExceptionally(new FlinkException("Attempt number parameter missing"));
- }
-
- final int attempt;
- try {
- attempt = Integer.parseInt(attemptNumberString);
- }
- catch (NumberFormatException e) {
- return FutureUtils.completedExceptionally(new FlinkException("Invalid attempt number parameter"));
- }
-
- final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt();
- if (attempt == currentAttempt.getAttemptNumber()) {
- return handleRequest(currentAttempt, params);
- }
- else if (attempt >= 0 && attempt < currentAttempt.getAttemptNumber()) {
- AccessExecution exec = vertex.getPriorExecutionAttempt(attempt);
-
- if (exec != null) {
- return handleRequest(exec, params);
- } else {
- return FutureUtils.completedExceptionally(new RequestHandlerException("Execution for attempt " + attempt +
- " has already been deleted."));
- }
- }
- else {
- return FutureUtils.completedExceptionally(new FlinkException("Attempt does not exist: " + attempt));
- }
- }
-
- public abstract CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
deleted file mode 100644
index ab85034..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.FlinkException;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on a specific subtask (defined via the
- * "subtasknum" parameter) in a specific job vertex (defined via the "vertexid" parameter) in a
- * specific job, defined via (defined voa the "jobid" parameter).
- */
-public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler {
-
- public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
- super(executionGraphHolder, executor);
- }
-
- @Override
- public final CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
- final String subtaskNumberString = params.get("subtasknum");
- if (subtaskNumberString == null) {
- return FutureUtils.completedExceptionally(new FlinkException("Subtask number parameter missing"));
- }
-
- final int subtask;
- try {
- subtask = Integer.parseInt(subtaskNumberString);
- }
- catch (NumberFormatException e) {
- return FutureUtils.completedExceptionally(new FlinkException("Invalid subtask number parameter", e));
- }
-
- if (subtask < 0 || subtask >= jobVertex.getParallelism()) {
- return FutureUtils.completedExceptionally(new FlinkException("subtask does not exist: " + subtask));
- }
-
- final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
- return handleRequest(vertex, params);
- }
-
- public abstract CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
deleted file mode 100644
index 17db2e8..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.FlinkException;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Responder that returns the status of the Flink cluster, such as how many
- * TaskManagers are currently connected, and how many jobs are running.
- */
-public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
-
- private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
-
- private static final String version = EnvironmentInformation.getVersion();
-
- private static final String commitID = EnvironmentInformation.getRevisionInformation().commitId;
-
- private final Time timeout;
-
- public ClusterOverviewHandler(Executor executor, Time timeout) {
- super(executor);
- this.timeout = checkNotNull(timeout);
- }
-
- @Override
- public String[] getPaths() {
- return new String[]{CLUSTER_OVERVIEW_REST_PATH};
- }
-
- @Override
- public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
- // we need no parameters, get all requests
- try {
- if (jobManagerGateway != null) {
- CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
-
- return overviewFuture.thenApplyAsync(
- (StatusOverview overview) -> {
- StringWriter writer = new StringWriter();
- try {
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- gen.writeStartObject();
- gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
- gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
- gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
- gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
- gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
- gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
- gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
- gen.writeStringField("flink-version", version);
- if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
- gen.writeStringField("flink-commit", commitID);
- }
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
- } catch (IOException exception) {
- throw new FlinkFutureException("Could not write cluster overview.", exception);
- }
- },
- executor);
- } else {
- throw new Exception("No connection to the leading JobManager.");
- }
- }
- catch (Exception e) {
- return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
deleted file mode 100644
index 34898e7..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.configuration.ConfigConstants;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-
-/**
- * Responder that returns a constant String.
- */
-@ChannelHandler.Sharable
-public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
-
- private final byte[] encodedText;
-
- public ConstantTextHandler(String text) {
- this.encodedText = text.getBytes(ConfigConstants.DEFAULT_CHARSET);
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
- HttpResponse response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(encodedText));
-
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, encodedText.length);
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
-
- KeepAliveWrite.flush(ctx, routed.request(), response);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
deleted file mode 100644
index acf1cd0..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Responder that returns with a list of all JobIDs of jobs found at the target actor.
- * May serve the IDs of current jobs, or past jobs, depending on whether this handler is
- * given the JobManager or Archive Actor Reference.
- */
-public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
-
- private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
-
- private final Time timeout;
-
- public CurrentJobIdsHandler(Executor executor, Time timeout) {
- super(executor);
- this.timeout = requireNonNull(timeout);
- }
-
- @Override
- public String[] getPaths() {
- return new String[]{CURRENT_JOB_IDS_REST_PATH};
- }
-
- @Override
- public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
- return CompletableFuture.supplyAsync(
- () -> {
- // we need no parameters, get all requests
- try {
- if (jobManagerGateway != null) {
- CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
- JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- gen.writeStartObject();
-
- gen.writeArrayFieldStart("jobs-running");
- for (JobID jid : overview.getJobsRunningOrPending()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-finished");
- for (JobID jid : overview.getJobsFinished()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-cancelled");
- for (JobID jid : overview.getJobsCancelled()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("jobs-failed");
- for (JobID jid : overview.getJobsFailed()) {
- gen.writeString(jid.toString());
- }
- gen.writeEndArray();
-
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
- }
- else {
- throw new Exception("No connection to the leading JobManager.");
- }
- }
- catch (Exception e) {
- throw new FlinkFutureException("Failed to fetch list of all running jobs.", e);
- }
- },
- executor);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
deleted file mode 100644
index a5b116c..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler that returns a summary of the job status.
- */
-public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
-
- private static final String ALL_JOBS_REST_PATH = "/joboverview";
- private static final String RUNNING_JOBS_REST_PATH = "/joboverview/running";
- private static final String COMPLETED_JOBS_REST_PATH = "/joboverview/completed";
-
- private final Time timeout;
-
- private final boolean includeRunningJobs;
- private final boolean includeFinishedJobs;
-
- public CurrentJobsOverviewHandler(
- Executor executor,
- Time timeout,
- boolean includeRunningJobs,
- boolean includeFinishedJobs) {
-
- super(executor);
- this.timeout = checkNotNull(timeout);
- this.includeRunningJobs = includeRunningJobs;
- this.includeFinishedJobs = includeFinishedJobs;
- }
-
- @Override
- public String[] getPaths() {
- if (includeRunningJobs && includeFinishedJobs) {
- return new String[]{ALL_JOBS_REST_PATH};
- }
- if (includeRunningJobs) {
- return new String[]{RUNNING_JOBS_REST_PATH};
- } else {
- return new String[]{COMPLETED_JOBS_REST_PATH};
- }
- }
-
- @Override
- public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
- if (jobManagerGateway != null) {
- CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
-
- return jobDetailsFuture.thenApplyAsync(
- (MultipleJobsDetails result) -> {
- final long now = System.currentTimeMillis();
-
- StringWriter writer = new StringWriter();
- try {
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartObject();
-
- if (includeRunningJobs && includeFinishedJobs) {
- gen.writeArrayFieldStart("running");
- for (JobDetails detail : result.getRunningJobs()) {
- writeJobDetailOverviewAsJson(detail, gen, now);
- }
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("finished");
- for (JobDetails detail : result.getFinishedJobs()) {
- writeJobDetailOverviewAsJson(detail, gen, now);
- }
- gen.writeEndArray();
- } else {
- gen.writeArrayFieldStart("jobs");
- for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
- writeJobDetailOverviewAsJson(detail, gen, now);
- }
- gen.writeEndArray();
- }
-
- gen.writeEndObject();
- gen.close();
- return writer.toString();
- } catch (IOException e) {
- throw new FlinkFutureException("Could not write current jobs overview json.", e);
- }
- },
- executor);
- }
- else {
- return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
- }
- }
-
- /**
- * Archivist for the CurrentJobsOverviewHandler.
- */
- public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist {
-
- @Override
- public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
- StringWriter writer = new StringWriter();
- try (JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
- gen.writeStartObject();
- gen.writeArrayFieldStart("running");
- gen.writeEndArray();
- gen.writeArrayFieldStart("finished");
- writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis());
- gen.writeEndArray();
- gen.writeEndObject();
- }
- String json = writer.toString();
- String path = ALL_JOBS_REST_PATH;
- return Collections.singleton(new ArchivedJson(path, json));
- }
- }
-
- public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException {
- gen.writeStartObject();
-
- gen.writeStringField("jid", details.getJobId().toString());
- gen.writeStringField("name", details.getJobName());
- gen.writeStringField("state", details.getStatus().name());
-
- gen.writeNumberField("start-time", details.getStartTime());
- gen.writeNumberField("end-time", details.getEndTime());
- gen.writeNumberField("duration", (details.getEndTime() <= 0 ? now : details.getEndTime()) - details.getStartTime());
- gen.writeNumberField("last-modification", details.getLastUpdateTime());
-
- gen.writeObjectFieldStart("tasks");
- gen.writeNumberField("total", details.getNumTasks());
-
- final int[] perState = details.getNumVerticesPerExecutionState();
- gen.writeNumberField("pending", perState[ExecutionState.CREATED.ordinal()] +
- perState[ExecutionState.SCHEDULED.ordinal()] +
- perState[ExecutionState.DEPLOYING.ordinal()]);
- gen.writeNumberField("running", perState[ExecutionState.RUNNING.ordinal()]);
- gen.writeNumberField("finished", perState[ExecutionState.FINISHED.ordinal()]);
- gen.writeNumberField("canceling", perState[ExecutionState.CANCELING.ordinal()]);
- gen.writeNumberField("canceled", perState[ExecutionState.CANCELED.ordinal()]);
- gen.writeNumberField("failed", perState[ExecutionState.FAILED.ordinal()]);
- gen.writeEndObject();
-
- gen.writeEndObject();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
deleted file mode 100644
index 39984b1..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Responder that returns the parameters that define how the asynchronous requests
- * against this web server should behave. It defines for example the refresh interval,
- * and time zone of the server timestamps.
- */
-public class DashboardConfigHandler extends AbstractJsonRequestHandler {
-
- private static final String DASHBOARD_CONFIG_REST_PATH = "/config";
-
- private final String configString;
-
- public DashboardConfigHandler(Executor executor, long refreshInterval) {
- super(executor);
- try {
- this.configString = createConfigJson(refreshInterval);
- }
- catch (Exception e) {
- // should never happen
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- @Override
- public String[] getPaths() {
- return new String[]{DASHBOARD_CONFIG_REST_PATH};
- }
-
- @Override
- public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
- return CompletableFuture.completedFuture(configString);
- }
-
- public static String createConfigJson(long refreshInterval) throws IOException {
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- TimeZone timeZone = TimeZone.getDefault();
- String timeZoneName = timeZone.getDisplayName();
- long timeZoneOffset = timeZone.getRawOffset();
-
- gen.writeStartObject();
- gen.writeNumberField("refresh-interval", refreshInterval);
- gen.writeNumberField("timezone-offset", timeZoneOffset);
- gen.writeStringField("timezone-name", timeZoneName);
- gen.writeStringField("flink-version", EnvironmentInformation.getVersion());
-
- EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
- if (revision != null) {
- gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
- }
-
- gen.writeEndObject();
-
- gen.close();
-
- return writer.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 978432b..c95cc32 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index 0b0d32e..760c836 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -35,6 +35,8 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.util.ExceptionUtils;
import com.fasterxml.jackson.core.JsonGenerator;
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index d9df1d4..04f663d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 95281a4..4248dd4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
import com.fasterxml.jackson.core.JsonGenerator;
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index b117b3d..4d79492 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 7ada0b4..16a1565 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonGenerator;
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 61b3f58..9a0bac4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
import java.io.File;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
deleted file mode 100644
index 4dede3a..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the aggregated user accumulators of a job.
- */
-public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler {
-
- private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators";
-
- public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
- super(executionGraphHolder, executor);
- }
-
- @Override
- public String[] getPaths() {
- return new String[]{JOB_ACCUMULATORS_REST_PATH};
- }
-
- @Override
- public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
- return CompletableFuture.supplyAsync(
- () -> {
- try {
- return createJobAccumulatorsJson(graph);
- } catch (IOException e) {
- throw new FlinkFutureException("Could not create job accumulators json.", e);
- }
- },
- executor);
- }
-
- /**
- * Archivist for the JobAccumulatorsHandler.
- */
- public static class JobAccumulatorsJsonArchivist implements JsonArchivist {
-
- @Override
- public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
- String json = createJobAccumulatorsJson(graph);
- String path = JOB_ACCUMULATORS_REST_PATH
- .replace(":jobid", graph.getJobID().toString());
- return Collections.singletonList(new ArchivedJson(path, json));
- }
- }
-
- public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException {
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
-
- gen.writeStartObject();
-
- gen.writeArrayFieldStart("job-accumulators");
- // empty for now
- gen.writeEndArray();
-
- gen.writeArrayFieldStart("user-task-accumulators");
- for (StringifiedAccumulatorResult acc : allAccumulators) {
- gen.writeStartObject();
- gen.writeStringField("name", acc.getName());
- gen.writeStringField("type", acc.getType());
- gen.writeStringField("value", acc.getValue());
- gen.writeEndObject();
- }
- gen.writeEndArray();
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
deleted file mode 100644
index 1a7d868..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler for the CANCEL request.
- */
-public class JobCancellationHandler extends AbstractJsonRequestHandler {
-
- private static final String JOB_CONCELLATION_REST_PATH = "/jobs/:jobid/cancel";
- private static final String JOB_CONCELLATION_YARN_REST_PATH = "/jobs/:jobid/yarn-cancel";
-
- private final Time timeout;
-
- public JobCancellationHandler(Executor executor, Time timeout) {
- super(executor);
- this.timeout = Preconditions.checkNotNull(timeout);
- }
-
- @Override
- public String[] getPaths() {
- return new String[]{JOB_CONCELLATION_REST_PATH, JOB_CONCELLATION_YARN_REST_PATH};
- }
-
- @Override
- public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
- return CompletableFuture.supplyAsync(
- () -> {
- try {
- JobID jobId = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
- if (jobManagerGateway != null) {
- jobManagerGateway.cancelJob(jobId, timeout);
- return "{}";
- }
- else {
- throw new Exception("No connection to the leading JobManager.");
- }
- }
- catch (Exception e) {
- throw new FlinkFutureException("Failed to cancel the job with id: " + pathParams.get("jobid"), e);
- }
- },
- executor);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
deleted file mode 100644
index 4e41447..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.NotFoundException;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler for {@link CancelJobWithSavepoint} messages.
- */
-public class JobCancellationWithSavepointHandlers {
-
- private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = "/jobs/:jobid/cancel-with-savepoint";
- private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory";
-
- /** URL for in-progress cancellations. */
- private static final String CANCELLATION_IN_PROGRESS_REST_PATH = "/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
-
- /** Encodings for String. */
- private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
-
- /** Shared lock between Trigger and In-Progress handlers. */
- private final Object lock = new Object();
-
- /** In-Progress requests. */
- private final Map<JobID, Long> inProgress = new HashMap<>();
-
- /** Succeeded/failed request. Either String or Throwable. */
- private final Map<Long, Object> completed = new HashMap<>();
-
- /** Atomic request counter. */
- private long requestCounter;
-
- /** Handler for trigger requests. */
- private final TriggerHandler triggerHandler;
-
- /** Handler for in-progress requests. */
- private final InProgressHandler inProgressHandler;
-
- /** Default savepoint directory. */
- private final String defaultSavepointDirectory;
-
- public JobCancellationWithSavepointHandlers(
- ExecutionGraphHolder currentGraphs,
- Executor executor) {
- this(currentGraphs, executor, null);
- }
-
- public JobCancellationWithSavepointHandlers(
- ExecutionGraphHolder currentGraphs,
- Executor executor,
- @Nullable String defaultSavepointDirectory) {
-
- this.triggerHandler = new TriggerHandler(currentGraphs, executor);
- this.inProgressHandler = new InProgressHandler();
- this.defaultSavepointDirectory = defaultSavepointDirectory;
- }
-
- public TriggerHandler getTriggerHandler() {
- return triggerHandler;
- }
-
- public InProgressHandler getInProgressHandler() {
- return inProgressHandler;
- }
-
- // ------------------------------------------------------------------------
- // New requests
- // ------------------------------------------------------------------------
-
- /**
- * Handler for triggering a {@link CancelJobWithSavepoint} message.
- */
- class TriggerHandler implements RequestHandler {
-
- /** Current execution graphs. */
- private final ExecutionGraphHolder currentGraphs;
-
- /** Execution context for futures. */
- private final Executor executor;
-
- public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) {
- this.currentGraphs = checkNotNull(currentGraphs);
- this.executor = checkNotNull(executor);
- }
-
- @Override
- public String[] getPaths() {
- return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH};
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<FullHttpResponse> handleRequest(
- Map<String, String> pathParams,
- Map<String, String> queryParams,
- JobManagerGateway jobManagerGateway) {
-
- if (jobManagerGateway != null) {
- JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
- final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
-
- graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
-
- return graphFuture.thenApplyAsync(
- (Optional<AccessExecutionGraph> optGraph) -> {
- final AccessExecutionGraph graph = optGraph.orElseThrow(
- () -> new FlinkFutureException(
- new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
-
- CheckpointCoordinator coord = graph.getCheckpointCoordinator();
- if (coord == null) {
- throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job."));
- }
-
- String targetDirectory = pathParams.get("targetDirectory");
- if (targetDirectory == null) {
- if (defaultSavepointDirectory == null) {
- throw new IllegalStateException("No savepoint directory configured. " +
- "You can either specify a directory when triggering this savepoint or " +
- "configure a cluster-wide default via key '" +
- CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
- } else {
- targetDirectory = defaultSavepointDirectory;
- }
- }
-
- try {
- return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
- } catch (IOException e) {
- throw new FlinkFutureException("Could not cancel job with savepoint.", e);
- }
- }, executor);
- } else {
- return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
- }
- }
-
- @SuppressWarnings("unchecked")
- private FullHttpResponse handleNewRequest(JobManagerGateway jobManagerGateway, final JobID jobId, String targetDirectory, long checkpointTimeout) throws IOException {
- // Check whether a request exists
- final long requestId;
- final boolean isNewRequest;
- synchronized (lock) {
- if (inProgress.containsKey(jobId)) {
- requestId = inProgress.get(jobId);
- isNewRequest = false;
- } else {
- requestId = ++requestCounter;
- inProgress.put(jobId, requestId);
- isNewRequest = true;
- }
- }
-
- if (isNewRequest) {
- boolean success = false;
-
- try {
- // Trigger cancellation
- CompletableFuture<String> cancelJobFuture = jobManagerGateway
- .cancelJobWithSavepoint(jobId, targetDirectory, Time.milliseconds(checkpointTimeout));
-
- cancelJobFuture.whenCompleteAsync(
- (String path, Throwable throwable) -> {
- try {
- if (throwable != null) {
- completed.put(requestId, throwable);
- } else {
- completed.put(requestId, path);
- }
- } finally {
- inProgress.remove(jobId);
- }
- }, executor);
-
- success = true;
- } finally {
- synchronized (lock) {
- if (!success) {
- inProgress.remove(jobId);
- }
- }
- }
- }
-
- // In-progress location
- String location = CANCELLATION_IN_PROGRESS_REST_PATH
- .replace(":jobid", jobId.toString())
- .replace(":requestId", Long.toString(requestId));
-
- // Accepted response
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartObject();
- gen.writeStringField("status", "accepted");
- gen.writeNumberField("request-id", requestId);
- gen.writeStringField("location", location);
- gen.writeEndObject();
- gen.close();
-
- String json = writer.toString();
- byte[] bytes = json.getBytes(ENCODING);
-
- DefaultFullHttpResponse response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1,
- HttpResponseStatus.ACCEPTED,
- Unpooled.wrappedBuffer(bytes));
-
- response.headers().set(HttpHeaders.Names.LOCATION, location);
-
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
- FullHttpResponse accepted = response;
-
- return accepted;
- }
- }
-
- // ------------------------------------------------------------------------
- // In-progress requests
- // ------------------------------------------------------------------------
-
- /**
- * Handler for in-progress cancel with savepoint operations.
- */
- class InProgressHandler implements RequestHandler {
-
- /** The number of recent checkpoints whose IDs are remembered. */
- private static final int NUM_GHOST_REQUEST_IDS = 16;
-
- /** Remember some recently completed. */
- private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
-
- @Override
- public String[] getPaths() {
- return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH};
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
- JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
- long requestId = Long.parseLong(pathParams.get("requestId"));
-
- return CompletableFuture.supplyAsync(
- () -> {
- try {
- synchronized (lock) {
- Object result = completed.remove(requestId);
-
- if (result != null) {
- // Add to recent history
- recentlyCompleted.add(new Tuple2<>(requestId, result));
- if (recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
- recentlyCompleted.remove();
- }
-
- if (result.getClass() == String.class) {
- String savepointPath = (String) result;
- return createSuccessResponse(requestId, savepointPath);
- } else {
- Throwable cause = (Throwable) result;
- return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
- }
- } else {
- // Check in-progress
- Long inProgressRequestId = inProgress.get(jobId);
- if (inProgressRequestId != null) {
- // Sanity check
- if (inProgressRequestId == requestId) {
- return createInProgressResponse(requestId);
- } else {
- String msg = "Request ID does not belong to JobID";
- return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
- }
- }
-
- // Check recent history
- for (Tuple2<Long, Object> recent : recentlyCompleted) {
- if (recent.f0 == requestId) {
- if (recent.f1.getClass() == String.class) {
- String savepointPath = (String) recent.f1;
- return createSuccessResponse(requestId, savepointPath);
- } else {
- Throwable cause = (Throwable) recent.f1;
- return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, cause.getMessage());
- }
- }
- }
-
- return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown job/request ID");
- }
- }
- } catch (Exception e) {
- throw new FlinkFutureException("Could not handle in progress request.", e);
- }
- });
- }
-
- private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException {
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartObject();
-
- gen.writeStringField("status", "success");
- gen.writeNumberField("request-id", requestId);
- gen.writeStringField("savepoint-path", savepointPath);
-
- gen.writeEndObject();
- gen.close();
-
- String json = writer.toString();
- byte[] bytes = json.getBytes(ENCODING);
-
- DefaultFullHttpResponse response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1,
- HttpResponseStatus.CREATED,
- Unpooled.wrappedBuffer(bytes));
-
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
- return response;
- }
-
- private FullHttpResponse createInProgressResponse(long requestId) throws IOException {
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartObject();
-
- gen.writeStringField("status", "in-progress");
- gen.writeNumberField("request-id", requestId);
-
- gen.writeEndObject();
- gen.close();
-
- String json = writer.toString();
- byte[] bytes = json.getBytes(ENCODING);
-
- DefaultFullHttpResponse response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1,
- HttpResponseStatus.ACCEPTED,
- Unpooled.wrappedBuffer(bytes));
-
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
- return response;
- }
-
- private FullHttpResponse createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) throws IOException {
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
- gen.writeStartObject();
-
- gen.writeStringField("status", "failed");
- gen.writeNumberField("request-id", requestId);
- gen.writeStringField("cause", errMsg);
-
- gen.writeEndObject();
- gen.close();
-
- String json = writer.toString();
- byte[] bytes = json.getBytes(ENCODING);
-
- DefaultFullHttpResponse response = new DefaultFullHttpResponse(
- HttpVersion.HTTP_1_1,
- code,
- Unpooled.wrappedBuffer(bytes));
-
- response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
- response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
- return response;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
deleted file mode 100644
index 0b15b37..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the execution config of a job.
- */
-public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
-
- private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config";
-
- public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
- super(executionGraphHolder, executor);
- }
-
- @Override
- public String[] getPaths() {
- return new String[]{JOB_CONFIG_REST_PATH};
- }
-
- @Override
- public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
- return CompletableFuture.supplyAsync(
- () -> {
- try {
- return createJobConfigJson(graph);
- } catch (IOException e) {
- throw new FlinkFutureException("Could not write job config json.", e);
- }
- },
- executor);
-
- }
-
- /**
- * Archivist for the JobConfigHandler.
- */
- public static class JobConfigJsonArchivist implements JsonArchivist {
-
- @Override
- public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
- String json = createJobConfigJson(graph);
- String path = JOB_CONFIG_REST_PATH
- .replace(":jobid", graph.getJobID().toString());
- return Collections.singletonList(new ArchivedJson(path, json));
- }
- }
-
- public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException {
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- gen.writeStartObject();
- gen.writeStringField("jid", graph.getJobID().toString());
- gen.writeStringField("name", graph.getJobName());
-
- final ArchivedExecutionConfig summary = graph.getArchivedExecutionConfig();
-
- if (summary != null) {
- gen.writeObjectFieldStart("execution-config");
-
- gen.writeStringField("execution-mode", summary.getExecutionMode());
-
- gen.writeStringField("restart-strategy", summary.getRestartStrategyDescription());
- gen.writeNumberField("job-parallelism", summary.getParallelism());
- gen.writeBooleanField("object-reuse-mode", summary.getObjectReuseEnabled());
-
- Map<String, String> ucVals = summary.getGlobalJobParameters();
- if (ucVals != null) {
- gen.writeObjectFieldStart("user-config");
-
- for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
- gen.writeStringField(ucVal.getKey(), ucVal.getValue());
- }
-
- gen.writeEndObject();
- }
-
- gen.writeEndObject();
- }
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
deleted file mode 100644
index 8a50f87..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns details about a job. This includes:
- * <ul>
- * <li>Dataflow plan</li>
- * <li>id, name, and current status</li>
- * <li>start time, end time, duration</li>
- * <li>number of job vertices in each state (pending, running, finished, failed)</li>
- * <li>info about job vertices, including runtime, status, I/O bytes and records, subtasks in each status</li>
- * </ul>
- */
-public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
-
- private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid";
- private static final String JOB_DETAILS_VERTICES_REST_PATH = "/jobs/:jobid/vertices";
-
- private final MetricFetcher fetcher;
-
- public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
- super(executionGraphHolder, executor);
- this.fetcher = fetcher;
- }
-
- @Override
- public String[] getPaths() {
- return new String[]{JOB_DETAILS_REST_PATH, JOB_DETAILS_VERTICES_REST_PATH};
- }
-
- @Override
- public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
- return CompletableFuture.supplyAsync(
- () -> {
- try {
- return createJobDetailsJson(graph, fetcher);
- } catch (IOException e) {
- throw new FlinkFutureException("Could not create job details json.", e);
- }
- },
- executor);
- }
-
- /**
- * Archivist for the JobDetailsHandler.
- */
- public static class JobDetailsJsonArchivist implements JsonArchivist {
-
- @Override
- public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
- String json = createJobDetailsJson(graph, null);
- String path1 = JOB_DETAILS_REST_PATH
- .replace(":jobid", graph.getJobID().toString());
- String path2 = JOB_DETAILS_VERTICES_REST_PATH
- .replace(":jobid", graph.getJobID().toString());
- Collection<ArchivedJson> archives = new ArrayList<>();
- archives.add(new ArchivedJson(path1, json));
- archives.add(new ArchivedJson(path2, json));
- return archives;
- }
- }
-
- public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException {
- final StringWriter writer = new StringWriter();
- final JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- final long now = System.currentTimeMillis();
-
- gen.writeStartObject();
-
- // basic info
- gen.writeStringField("jid", graph.getJobID().toString());
- gen.writeStringField("name", graph.getJobName());
- gen.writeBooleanField("isStoppable", graph.isStoppable());
- gen.writeStringField("state", graph.getState().name());
-
- // times and duration
- final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED);
- final long jobEndTime = graph.getState().isGloballyTerminalState() ?
- graph.getStatusTimestamp(graph.getState()) : -1L;
- gen.writeNumberField("start-time", jobStartTime);
- gen.writeNumberField("end-time", jobEndTime);
- gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime);
- gen.writeNumberField("now", now);
-
- // timestamps
- gen.writeObjectFieldStart("timestamps");
- for (JobStatus status : JobStatus.values()) {
- gen.writeNumberField(status.name(), graph.getStatusTimestamp(status));
- }
- gen.writeEndObject();
-
- // job vertices
- int[] jobVerticesPerState = new int[ExecutionState.values().length];
- gen.writeArrayFieldStart("vertices");
-
- for (AccessExecutionJobVertex ejv : graph.getVerticesTopologically()) {
- int[] tasksPerState = new int[ExecutionState.values().length];
- long startTime = Long.MAX_VALUE;
- long endTime = 0;
- boolean allFinished = true;
-
- for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
- final ExecutionState state = vertex.getExecutionState();
- tasksPerState[state.ordinal()]++;
-
- // take the earliest start time
- long started = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
- if (started > 0) {
- startTime = Math.min(startTime, started);
- }
-
- allFinished &= state.isTerminal();
- endTime = Math.max(endTime, vertex.getStateTimestamp(state));
- }
-
- long duration;
- if (startTime < Long.MAX_VALUE) {
- if (allFinished) {
- duration = endTime - startTime;
- }
- else {
- endTime = -1L;
- duration = now - startTime;
- }
- }
- else {
- startTime = -1L;
- endTime = -1L;
- duration = -1L;
- }
-
- ExecutionState jobVertexState =
- ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());
- jobVerticesPerState[jobVertexState.ordinal()]++;
-
- gen.writeStartObject();
- gen.writeStringField("id", ejv.getJobVertexId().toString());
- gen.writeStringField("name", ejv.getName());
- gen.writeNumberField("parallelism", ejv.getParallelism());
- gen.writeStringField("status", jobVertexState.name());
-
- gen.writeNumberField("start-time", startTime);
- gen.writeNumberField("end-time", endTime);
- gen.writeNumberField("duration", duration);
-
- gen.writeObjectFieldStart("tasks");
- for (ExecutionState state : ExecutionState.values()) {
- gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]);
- }
- gen.writeEndObject();
-
- MutableIOMetrics counts = new MutableIOMetrics();
-
- for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
- counts.addIOMetrics(
- vertex.getCurrentExecutionAttempt(),
- fetcher,
- graph.getJobID().toString(),
- ejv.getJobVertexId().toString());
- }
-
- counts.writeIOMetricsAsJson(gen);
-
- gen.writeEndObject();
- }
- gen.writeEndArray();
-
- gen.writeObjectFieldStart("status-counts");
- for (ExecutionState state : ExecutionState.values()) {
- gen.writeNumberField(state.name(), jobVerticesPerState[state.ordinal()]);
- }
- gen.writeEndObject();
-
- gen.writeFieldName("plan");
- gen.writeRawValue(graph.getJsonPlan());
-
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
deleted file mode 100644
index 6ffd443..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.util.ExceptionUtils;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the configuration of a job.
- */
-public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
-
- private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions";
-
- static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
-
- public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
- super(executionGraphHolder, executor);
- }
-
- @Override
- public String[] getPaths() {
- return new String[]{JOB_EXCEPTIONS_REST_PATH};
- }
-
- @Override
- public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
- return CompletableFuture.supplyAsync(
- () -> {
- try {
- return createJobExceptionsJson(graph);
- } catch (IOException e) {
- throw new FlinkFutureException("Could not create job exceptions json.", e);
- }
- },
- executor
- );
- }
-
- /**
- * Archivist for the JobExceptionsHandler.
- */
- public static class JobExceptionsJsonArchivist implements JsonArchivist {
-
- @Override
- public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
- String json = createJobExceptionsJson(graph);
- String path = JOB_EXCEPTIONS_REST_PATH
- .replace(":jobid", graph.getJobID().toString());
- return Collections.singletonList(new ArchivedJson(path, json));
- }
- }
-
- public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
- StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
- gen.writeStartObject();
-
- // most important is the root failure cause
- ErrorInfo rootException = graph.getFailureCause();
- if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
- gen.writeStringField("root-exception", rootException.getExceptionAsString());
- gen.writeNumberField("timestamp", rootException.getTimestamp());
- }
-
- // we additionally collect all exceptions (up to a limit) that occurred in the individual tasks
- gen.writeArrayFieldStart("all-exceptions");
-
- int numExceptionsSoFar = 0;
- boolean truncated = false;
-
- for (AccessExecutionVertex task : graph.getAllExecutionVertices()) {
- String t = task.getFailureCauseAsString();
- if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
- if (numExceptionsSoFar >= MAX_NUMBER_EXCEPTION_TO_REPORT) {
- truncated = true;
- break;
- }
-
- TaskManagerLocation location = task.getCurrentAssignedResourceLocation();
- String locationString = location != null ?
- location.getFQDNHostname() + ':' + location.dataPort() : "(unassigned)";
-
- gen.writeStartObject();
- gen.writeStringField("exception", t);
- gen.writeStringField("task", task.getTaskNameWithSubtaskIndex());
- gen.writeStringField("location", locationString);
- long timestamp = task.getStateTimestamp(ExecutionState.FAILED);
- gen.writeNumberField("timestamp", timestamp == 0 ? -1 : timestamp);
- gen.writeEndObject();
- numExceptionsSoFar++;
- }
- }
- gen.writeEndArray();
-
- gen.writeBooleanField("truncated", truncated);
- gen.writeEndObject();
-
- gen.close();
- return writer.toString();
- }
-}