You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/24 19:59:58 UTC
[4/5] flink git commit: [FLINK-6687] [web] Activate strict checkstyle
for flink-runtime-web
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index ec8516d..745a110 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -47,10 +47,10 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
Map<String, String> pathParams,
Map<String, String> queryParams,
ActorGateway jobManager) throws Exception {
-
+
String tempFilePath = queryParams.get("filepath");
String filename = queryParams.get("filename");
-
+
File tempFile;
if (tempFilePath != null && (tempFile = new File(tempFilePath)).exists()) {
if (!tempFile.getName().endsWith(".jar")) {
@@ -58,7 +58,7 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
tempFile.delete();
return "{\"error\": \"Only Jar files are allowed.\"}";
}
-
+
String filenameWithUUID = UUID.randomUUID() + "_" + filename;
File newFile = new File(jarDir, filenameWithUUID);
if (tempFile.renameTo(newFile)) {
@@ -70,7 +70,7 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
tempFile.delete();
}
}
-
+
return "{\"error\": \"Failed to upload the file.\"}";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index c403aa2..163e583 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -18,13 +18,14 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.Collection;
@@ -37,7 +38,7 @@ import java.util.Map;
public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler {
private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators";
-
+
public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}
@@ -52,6 +53,9 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
return createJobAccumulatorsJson(graph);
}
+ /**
+ * Archivist for the JobAccumulatorsHandler.
+ */
public static class JobAccumulatorsJsonArchivist implements JsonArchivist {
@Override
@@ -65,7 +69,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
@@ -74,7 +78,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
gen.writeArrayFieldStart("job-accumulators");
// empty for now
gen.writeEndArray();
-
+
gen.writeArrayFieldStart("user-task-accumulators");
for (StringifiedAccumulatorResult acc : allAccumulators) {
gen.writeStartObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
index f5d6853..3f7b824 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
@@ -18,14 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import akka.dispatch.OnComplete;
-import com.fasterxml.jackson.core.JsonGenerator;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
@@ -36,11 +28,18 @@ import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoi
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import akka.dispatch.OnComplete;
+import com.fasterxml.jackson.core.JsonGenerator;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.StringWriter;
import java.nio.charset.Charset;
@@ -48,6 +47,10 @@ import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -67,13 +70,13 @@ public class JobCancellationWithSavepointHandlers {
/** Shared lock between Trigger and In-Progress handlers. */
private final Object lock = new Object();
- /** In-Progress requests */
+ /** In-Progress requests. */
private final Map<JobID, Long> inProgress = new HashMap<>();
/** Succeeded/failed request. Either String or Throwable. */
private final Map<Long, Object> completed = new HashMap<>();
- /** Atomic request counter */
+ /** Atomic request counter. */
private long requestCounter;
/** Handler for trigger requests. */
@@ -244,7 +247,7 @@ public class JobCancellationWithSavepointHandlers {
// Accepted response
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
gen.writeStringField("status", "accepted");
gen.writeNumberField("request-id", requestId);
@@ -283,7 +286,7 @@ public class JobCancellationWithSavepointHandlers {
/** The number of recent checkpoints whose IDs are remembered. */
private static final int NUM_GHOST_REQUEST_IDS = 16;
- /** Remember some recently completed */
+ /** Remember some recently completed. */
private final ArrayDeque<Tuple2<Long, Object>> recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
@Override
@@ -324,7 +327,7 @@ public class JobCancellationWithSavepointHandlers {
if (inProgressRequestId == requestId) {
return createInProgressResponse(requestId);
} else {
- String msg= "Request ID does not belong to JobID";
+ String msg = "Request ID does not belong to JobID";
return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
}
}
@@ -355,7 +358,7 @@ public class JobCancellationWithSavepointHandlers {
private FullHttpResponse createSuccessResponse(long requestId, String savepointPath) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
gen.writeStringField("status", "success");
@@ -381,7 +384,7 @@ public class JobCancellationWithSavepointHandlers {
private FullHttpResponse createInProgressResponse(long requestId) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
gen.writeStringField("status", "in-progress");
@@ -406,7 +409,7 @@ public class JobCancellationWithSavepointHandlers {
private FullHttpResponse createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
gen.writeStringField("status", "failed");
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 2b96456..72cf8b7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -18,19 +18,19 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
/**
* Request handler that returns the execution config of a job.
@@ -53,6 +53,9 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
return createJobConfigJson(graph);
}
+ /**
+ * Archivist for the JobConfigHandler.
+ */
public static class JobConfigJsonArchivist implements JsonArchivist {
@Override
@@ -66,7 +69,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
gen.writeStringField("jid", graph.getJobID().toString());
@@ -86,7 +89,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
Map<String, String> ucVals = summary.getGlobalJobParameters();
if (ucVals != null) {
gen.writeObjectFieldStart("user-config");
-
+
for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
gen.writeStringField(ucVal.getKey(), ucVal.getValue());
}
@@ -97,7 +100,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
gen.writeEndObject();
}
gen.writeEndObject();
-
+
gen.close();
return writer.toString();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 37a1c19..87ac7c3 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
-
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -32,7 +30,10 @@ import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -40,7 +41,7 @@ import java.util.Collection;
import java.util.Map;
/**
- * Request handler that returns details about a job, including:
+ * Request handler that returns details about a job. This includes:
* <ul>
* <li>Dataflow plan</li>
* <li>id, name, and current status</li>
@@ -71,6 +72,9 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
return createJobDetailsJson(graph, fetcher);
}
+ /**
+ * Archivist for the JobDetailsHandler.
+ */
public static class JobDetailsJsonArchivist implements JsonArchivist {
@Override
@@ -89,18 +93,18 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException {
final StringWriter writer = new StringWriter();
- final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ final JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
final long now = System.currentTimeMillis();
-
+
gen.writeStartObject();
-
+
// basic info
gen.writeStringField("jid", graph.getJobID().toString());
gen.writeStringField("name", graph.getJobName());
gen.writeBooleanField("isStoppable", graph.isStoppable());
gen.writeStringField("state", graph.getState().name());
-
+
// times and duration
final long jobStartTime = graph.getStatusTimestamp(JobStatus.CREATED);
final long jobEndTime = graph.getState().isGloballyTerminalState() ?
@@ -109,14 +113,14 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
gen.writeNumberField("end-time", jobEndTime);
gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : now) - jobStartTime);
gen.writeNumberField("now", now);
-
+
// timestamps
gen.writeObjectFieldStart("timestamps");
for (JobStatus status : JobStatus.values()) {
gen.writeNumberField(status.name(), graph.getStatusTimestamp(status));
}
gen.writeEndObject();
-
+
// job vertices
int[] jobVerticesPerState = new int[ExecutionState.values().length];
gen.writeArrayFieldStart("vertices");
@@ -126,7 +130,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
long startTime = Long.MAX_VALUE;
long endTime = 0;
boolean allFinished = true;
-
+
for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
final ExecutionState state = vertex.getExecutionState();
tasksPerState[state.ordinal()]++;
@@ -136,11 +140,11 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
if (started > 0) {
startTime = Math.min(startTime, started);
}
-
+
allFinished &= state.isTerminal();
endTime = Math.max(endTime, vertex.getStateTimestamp(state));
}
-
+
long duration;
if (startTime < Long.MAX_VALUE) {
if (allFinished) {
@@ -156,8 +160,8 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
endTime = -1L;
duration = -1L;
}
-
- ExecutionState jobVertexState =
+
+ ExecutionState jobVertexState =
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, ejv.getParallelism());
jobVerticesPerState[jobVertexState.ordinal()]++;
@@ -170,13 +174,13 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
gen.writeNumberField("start-time", startTime);
gen.writeNumberField("end-time", endTime);
gen.writeNumberField("duration", duration);
-
+
gen.writeObjectFieldStart("tasks");
for (ExecutionState state : ExecutionState.values()) {
gen.writeNumberField(state.name(), tasksPerState[state.ordinal()]);
}
gen.writeEndObject();
-
+
MutableIOMetrics counts = new MutableIOMetrics();
for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
@@ -188,7 +192,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
}
counts.writeIOMetricsAsJson(gen);
-
+
gen.writeEndObject();
}
gen.writeEndArray();
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 81cdc83..181b270 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -27,6 +26,8 @@ import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.ExceptionUtils;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.Collection;
@@ -41,7 +42,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions";
static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
-
+
public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}
@@ -56,6 +57,9 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
return createJobExceptionsJson(graph);
}
+ /**
+ * Archivist for the JobExceptionsHandler.
+ */
public static class JobExceptionsJsonArchivist implements JsonArchivist {
@Override
@@ -69,10 +73,10 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
-
+
// most important is the root failure cause
String rootException = graph.getFailureCauseAsString();
if (rootException != null && !rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
@@ -84,7 +88,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
int numExceptionsSoFar = 0;
boolean truncated = false;
-
+
for (AccessExecutionVertex task : graph.getAllExecutionVertices()) {
String t = task.getFailureCauseAsString();
if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index 5fcf010..d1aeea4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -18,10 +18,11 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.instance.ActorGateway;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import java.io.StringWriter;
import java.util.Map;
@@ -46,7 +47,7 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
@Override
public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartArray();
for (String key : config.keySet()) {
@@ -54,9 +55,9 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
gen.writeStringField("key", key);
// Mask key values which contain sensitive information
- if(key.toLowerCase().contains("password")) {
+ if (key.toLowerCase().contains("password")) {
String value = config.getString(key, null);
- if(value != null) {
+ if (value != null) {
value = "******";
}
gen.writeStringField("value", value);
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index 885d04e..d17b6bb 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -34,7 +34,7 @@ import java.util.Map;
public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
-
+
public JobPlanHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}
@@ -49,6 +49,9 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
return graph.getJsonPlan();
}
+ /**
+ * Archivist for the JobPlanHandler.
+ */
public static class JobPlanJsonArchivist implements JsonArchivist {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 2532a1e..8e90dfc 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
-
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -27,6 +25,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -34,11 +34,13 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-
+/**
+ * Request handler that returns the accummulators for a given vertex.
+ */
public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandler {
private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators";
-
+
public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}
@@ -53,6 +55,9 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
return createVertexAccumulatorsJson(jobVertex);
}
+ /**
+ * Archivist for JobVertexAccumulatorsHandler.
+ */
public static class JobVertexAccumulatorsJsonArchivist implements JsonArchivist {
@Override
@@ -71,13 +76,13 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
gen.writeStartObject();
gen.writeStringField("id", jobVertex.getJobVertexId().toString());
-
+
gen.writeArrayFieldStart("user-accumulators");
for (StringifiedAccumulatorResult acc : accs) {
gen.writeStartObject();
@@ -87,7 +92,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
gen.writeEndObject();
}
gen.writeEndArray();
-
+
gen.writeEndObject();
gen.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index 52167e1..cde8ca9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -18,18 +18,20 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
-import scala.Option;
+
+import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.Map;
+import scala.Option;
+
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -72,7 +74,7 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
}
ExecutionJobVertex jobVertex = (ExecutionJobVertex) accessJobVertex;
try (StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) {
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
gen.writeStartObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index d9a1131..7757fdd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
-
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -31,7 +29,10 @@ import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -45,7 +46,7 @@ import java.util.Map;
*/
public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
- private static String JOB_VERTEX_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid";
+ private static final String JOB_VERTEX_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid";
private final MetricFetcher fetcher;
@@ -64,6 +65,9 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
}
+ /**
+ * Archivist for the JobVertexDetailsHandler.
+ */
public static class JobVertexDetailsJsonArchivist implements JsonArchivist {
@Override
@@ -85,9 +89,9 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
String jobID,
@Nullable MetricFetcher fetcher) throws IOException {
final long now = System.currentTimeMillis();
-
+
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
@@ -100,7 +104,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
int num = 0;
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
final ExecutionState status = vertex.getExecutionState();
-
+
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
@@ -110,7 +114,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
}
long endTime = status.isTerminal() ? vertex.getStateTimestamp(status) : -1;
long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
-
+
gen.writeStartObject();
gen.writeNumberField("subtask", num);
gen.writeStringField("status", status.name());
@@ -130,13 +134,13 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
);
counts.writeIOMetricsAsJson(gen);
-
+
gen.writeEndObject();
-
+
num++;
}
gen.writeEndArray();
-
+
gen.writeEndObject();
gen.close();
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index 3878722..a612782 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -31,7 +30,10 @@ import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -65,6 +67,9 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
}
+ /**
+ * Archivist for JobVertexTaskManagersHandler.
+ */
public static class JobVertexTaskManagersJsonArchivist implements JsonArchivist {
@Override
@@ -86,7 +91,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
String jobID,
@Nullable MetricFetcher fetcher) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
// Build a map that groups tasks by TaskManager
Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
@@ -108,7 +113,6 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
// Build JSON response
final long now = System.currentTimeMillis();
-
gen.writeStartObject();
gen.writeStringField("id", jobVertex.getJobVertexId().toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
index e886532..4ce0baf 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
@@ -26,11 +26,10 @@ package org.apache.flink.runtime.webmonitor.handlers;
public class JsonFactory {
/** The singleton Jackson JSON factory. */
- public static final com.fasterxml.jackson.core.JsonFactory jacksonFactory =
+ public static final com.fasterxml.jackson.core.JsonFactory JACKSON_FACTORY =
new com.fasterxml.jackson.core.JsonFactory();
-
+
// --------------------------------------------------------------------------------------------
-
- /** Don't instantiate */
+
private JsonFactory() {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index b6246e6..66c30af 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -18,9 +18,10 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.runtime.instance.ActorGateway;
+import io.netty.handler.codec.http.FullHttpResponse;
+
import java.util.Map;
/**
@@ -36,13 +37,13 @@ public interface RequestHandler {
* respond with a full http response, including content-type, content-length, etc.
*
* <p>Exceptions may be throws and will be handled.
- *
+ *
* @param pathParams The map of REST path parameters, decoded by the router.
* @param queryParams The map of query parameters.
* @param jobManager The JobManager actor.
*
* @return The full http response.
- *
+ *
* @throws Exception Handlers may forward exceptions. Exceptions of type
* {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
* response with the exception message, other exceptions will cause a HTTP 500 response
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index 4cf5f0f..28e9ddf 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -30,7 +30,7 @@ import java.util.Map;
public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttemptDetailsHandler {
public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
-
+
public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, MetricFetcher fetcher) {
super(executionGraphHolder, fetcher);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 9026a22..171277f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -28,6 +27,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -37,12 +38,12 @@ import java.util.Map;
/**
* Base class for request handlers whose response depends on a specific job vertex (defined
- * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter).
+ * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter).
*/
public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptRequestHandler {
private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
-
+
public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}
@@ -56,7 +57,10 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
return createAttemptAccumulatorsJson(execAttempt);
}
-
+
+ /**
+ * Archivist for the SubtaskExecutionAttemptAccumulatorsHandler.
+ */
public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist implements JsonArchivist {
@Override
@@ -91,8 +95,8 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
public static String createAttemptAccumulatorsJson(AccessExecution execAttempt) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
-
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
gen.writeStartObject();
@@ -100,7 +104,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex());
gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
gen.writeStringField("id", execAttempt.getAttemptId().toString());
-
+
gen.writeArrayFieldStart("user-accumulators");
for (StringifiedAccumulatorResult acc : accs) {
gen.writeStartObject();
@@ -110,9 +114,9 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
gen.writeEndObject();
}
gen.writeEndArray();
-
+
gen.writeEndObject();
-
+
gen.close();
return writer.toString();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 078f54a..37c0e50 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
-
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -32,7 +30,10 @@ import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -66,6 +67,9 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
}
+ /**
+ * Archivist for the SubtaskExecutionAttemptDetailsHandler.
+ */
public static class SubtaskExecutionAttemptDetailsJsonArchivist implements JsonArchivist {
@Override
@@ -83,7 +87,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
.replace(":vertexid", task.getJobVertexId().toString())
.replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex()))
.replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
-
+
archive.add(new ArchivedJson(curAttemptPath1, curAttemptJson));
archive.add(new ArchivedJson(curAttemptPath2, curAttemptJson));
@@ -109,7 +113,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
String vertexID,
@Nullable MetricFetcher fetcher) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
final ExecutionState status = execAttempt.getState();
final long now = System.currentTimeMillis();
@@ -141,7 +145,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
jobID,
vertexID
);
-
+
counts.writeIOMetricsAsJson(gen);
gen.writeEndObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 6c3bc18..64bdfb4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
-
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -29,6 +27,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -42,7 +42,7 @@ import java.util.Map;
public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHandler {
private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
-
+
public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}
@@ -57,6 +57,9 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
return createSubtasksAccumulatorsJson(jobVertex);
}
+ /**
+ * Archivist for the SubtasksAllAccumulatorsHandler.
+ */
public static class SubtasksAllAccumulatorsJsonArchivist implements JsonArchivist {
@Override
@@ -75,22 +78,22 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
public static String createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
gen.writeStringField("id", jobVertex.getJobVertexId().toString());
gen.writeNumberField("parallelism", jobVertex.getParallelism());
gen.writeArrayFieldStart("subtasks");
-
+
int num = 0;
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname();
-
+
gen.writeStartObject();
-
+
gen.writeNumberField("subtask", num++);
gen.writeNumberField("attempt", vertex.getCurrentExecutionAttempt().getAttemptNumber());
gen.writeStringField("host", locationString);
@@ -105,7 +108,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
gen.writeEndObject();
}
gen.writeEndArray();
-
+
gen.writeEndObject();
}
gen.writeEndArray();
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index adefa80..ea88587 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
-
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -29,6 +27,8 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -43,7 +43,7 @@ import java.util.Map;
public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes";
-
+
public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}
@@ -58,6 +58,9 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
return createSubtaskTimesJson(jobVertex);
}
+ /**
+ * Archivist for the SubtasksTimesHandler.
+ */
public static class SubtasksTimesJsonArchivist implements JsonArchivist {
@Override
@@ -78,28 +81,28 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
final long now = System.currentTimeMillis();
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
gen.writeStringField("id", jobVertex.getJobVertexId().toString());
gen.writeStringField("name", jobVertex.getName());
gen.writeNumberField("now", now);
-
+
gen.writeArrayFieldStart("subtasks");
int num = 0;
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
-
+
long[] timestamps = vertex.getCurrentExecutionAttempt().getStateTimestamps();
ExecutionState status = vertex.getExecutionState();
long scheduledTime = timestamps[ExecutionState.SCHEDULED.ordinal()];
-
+
long start = scheduledTime > 0 ? scheduledTime : -1;
long end = status.isTerminal() ? timestamps[status.ordinal()] : now;
long duration = start >= 0 ? end - start : -1L;
-
+
gen.writeStartObject();
gen.writeNumberField("subtask", num++);
@@ -108,13 +111,13 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
gen.writeStringField("host", locationString);
gen.writeNumberField("duration", duration);
-
+
gen.writeObjectFieldStart("timestamps");
for (ExecutionState state : ExecutionState.values()) {
gen.writeNumberField(state.name(), timestamps[state.ordinal()]);
}
gen.writeEndObject();
-
+
gen.writeEndObject();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
index 53ee336..1084623 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -26,24 +26,6 @@ package org.apache.flink.runtime.webmonitor.handlers;
* https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
*****************************************************************************/
-import akka.dispatch.Mapper;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.DefaultFileRegion;
-import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.HttpChunkedInput;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.handler.codec.http.router.Routed;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.stream.ChunkedFile;
-import io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
@@ -65,12 +47,27 @@ import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
+
+import akka.dispatch.Mapper;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.concurrent.ExecutionContextExecutor;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
import java.io.File;
import java.io.FileNotFoundException;
@@ -81,6 +78,11 @@ import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
+import scala.Option;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
@@ -100,19 +102,19 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
private static final String TASKMANAGER_LOG_REST_PATH = "/taskmanagers/:taskmanagerid/log";
private static final String TASKMANAGER_OUT_REST_PATH = "/taskmanagers/:taskmanagerid/stdout";
- /** Keep track of last transmitted log, to clean up old ones */
+ /** Keep track of last transmitted log, to clean up old ones. */
private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>();
private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>();
- /** Keep track of request status, prevents multiple log requests for a single TM running concurrently */
+ /** Keep track of request status, prevents multiple log requests for a single TM running concurrently. */
private final ConcurrentHashMap<String, Boolean> lastRequestPending = new ConcurrentHashMap<>();
private final Configuration config;
- /** Future of the blob cache */
+ /** Future of the blob cache. */
private Future<BlobCache> cache;
- /** Indicates which log file should be displayed; true indicates .log, false indicates .out */
- private boolean serveLogFile;
+ /** Indicates which log file should be displayed. */
+ private FileMode fileMode;
private final ExecutionContextExecutor executor;
@@ -120,6 +122,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
private final BlobView blobView;
+ /** Used to control whether this handler serves the .log or .out file. */
public enum FileMode {
LOG,
STDOUT
@@ -138,14 +141,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
this.executor = checkNotNull(executor);
this.config = config;
- switch (fileMode) {
- case LOG:
- serveLogFile = true;
- break;
- case STDOUT:
- serveLogFile = false;
- break;
- }
+ this.fileMode = fileMode;
this.blobView = Preconditions.checkNotNull(blobView, "blobView");
@@ -154,10 +150,12 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
@Override
public String[] getPaths() {
- if (serveLogFile) {
- return new String[]{TASKMANAGER_LOG_REST_PATH};
- } else {
- return new String[]{TASKMANAGER_OUT_REST_PATH};
+ switch (fileMode) {
+ case LOG:
+ return new String[]{TASKMANAGER_LOG_REST_PATH};
+ case STDOUT:
+ default:
+ return new String[]{TASKMANAGER_OUT_REST_PATH};
}
}
@@ -199,10 +197,12 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
public Future<BlobKey> apply(JobManagerMessages.TaskManagerInstance value) {
Instance taskManager = value.instance().get();
- if (serveLogFile) {
- return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
- } else {
- return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
+ switch (fileMode) {
+ case LOG:
+ return taskManager.getTaskManagerGateway().requestTaskManagerLog(timeTimeout);
+ case STDOUT:
+ default:
+ return taskManager.getTaskManagerGateway().requestTaskManagerStdout(timeTimeout);
}
}
});
@@ -223,7 +223,7 @@ public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
final BlobCache blobCache = value.f1;
//delete previous log file, if it is different than the current one
- HashMap<String, BlobKey> lastSubmittedFile = serveLogFile ? lastSubmittedLog : lastSubmittedStdout;
+ HashMap<String, BlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : lastSubmittedStdout;
if (lastSubmittedFile.containsKey(taskManagerID)) {
if (!blobKey.equals(lastSubmittedFile.get(taskManagerID))) {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index a23e983..6ad490e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
@@ -28,28 +27,34 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
import org.apache.flink.util.StringUtils;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
import static java.util.Objects.requireNonNull;
+/**
+ * A request handler that provides an overview over all taskmanagers or details for a single one.
+ */
public class TaskManagersHandler extends AbstractJsonRequestHandler {
private static final String TASKMANAGERS_REST_PATH = "/taskmanagers";
private static final String TASKMANAGER_DETAILS_REST_PATH = "/taskmanagers/:taskmanagerid";
public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
-
+
private final FiniteDuration timeout;
private final MetricFetcher fetcher;
-
+
public TaskManagersHandler(FiniteDuration timeout, MetricFetcher fetcher) {
this.timeout = requireNonNull(timeout);
this.fetcher = fetcher;
@@ -88,7 +93,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
}
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
gen.writeArrayFieldStart("taskmanagers");
@@ -112,17 +117,17 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
if (metrics != null) {
gen.writeObjectFieldStart("metrics");
- long heapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
- long heapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
- long heapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+ long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+ long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+ long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
gen.writeNumberField("heapCommitted", heapCommitted);
gen.writeNumberField("heapUsed", heapUsed);
gen.writeNumberField("heapMax", heapTotal);
- long nonHeapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
- long nonHeapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
- long nonHeapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+ long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+ long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+ long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
gen.writeNumberField("nonHeapUsed", nonHeapUsed);
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 7914c29..d4c9b2a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -28,6 +27,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.Collection;
@@ -55,6 +56,9 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
return createCheckpointConfigJson(graph);
}
+ /**
+ * Archivist for the CheckpointConfigHandler.
+ */
public static class CheckpointConfigJsonArchivist implements JsonArchivist {
@Override
@@ -68,7 +72,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
JobCheckpointingSettings settings = graph.getJobCheckpointingSettings();
if (settings == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
index 35d529a..9bbe8a7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
@@ -18,9 +18,10 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import javax.annotation.Nullable;
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 16fd9bd..664744b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -32,6 +31,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -86,6 +87,9 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
return createCheckpointDetailsJson(checkpoint);
}
+ /**
+ * Archivist for the CheckpointStatsDetails.
+ */
public static class CheckpointStatsDetailsJsonArchivist implements JsonArchivist {
@Override
@@ -109,7 +113,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
gen.writeNumberField("id", checkpoint.getCheckpointId());
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index bb39b2c..f96e0c2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -35,6 +34,8 @@ import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -107,10 +108,13 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
if (taskStats == null) {
return "{}";
}
-
+
return createSubtaskCheckpointDetailsJson(checkpoint, taskStats);
}
+ /**
+ * Archivist for the CheckpointStatsDetailsSubtasksHandler.
+ */
public static class CheckpointStatsDetailsSubtasksJsonArchivist implements JsonArchivist {
@Override
@@ -137,7 +141,7 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
gen.writeStartObject();
// Overview
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index f004888..a86c5fd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
@@ -35,7 +34,10 @@ import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.StringWriter;
import java.util.Collection;
@@ -63,6 +65,9 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
return createCheckpointStatsJson(graph);
}
+ /**
+ * Archivist for the CheckpointStatsJsonHandler.
+ */
public static class CheckpointStatsJsonArchivist implements JsonArchivist {
@Override
@@ -76,7 +81,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
- JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
if (snapshot == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 3337370..d86bfb2 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -15,9 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.webmonitor.history;
-import io.netty.handler.codec.http.router.Router;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
@@ -34,10 +34,13 @@ import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+
+import io.netty.handler.codec.http.router.Router;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
+
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@@ -53,15 +56,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* The HistoryServer provides a WebInterface and REST API to retrieve information about finished jobs for which
* the JobManager may have already shut down.
- *
- * The HistoryServer regularly checks a set of directories for job archives created by the {@link FsJobArchivist} and
+ *
+ * <p>The HistoryServer regularly checks a set of directories for job archives created by the {@link FsJobArchivist} and
* caches these in a local directory. See {@link HistoryServerArchiveFetcher}.
- *
- * All configuration options are defined in{@link HistoryServerOptions}.
- *
- * The WebInterface only displays the "Completed Jobs" page.
- *
- * The REST API is limited to
+ *
+ * <p>All configuration options are defined in{@link HistoryServerOptions}.
+ *
+ * <p>The WebInterface only displays the "Completed Jobs" page.
+ *
+ * <p>The REST API is limited to
* <ul>
* <li>/config</li>
* <li>/joboverview</li>
@@ -110,7 +113,7 @@ public class HistoryServer {
});
System.exit(0);
} catch (UndeclaredThrowableException ute) {
- Throwable cause = ute. getUndeclaredThrowable();
+ Throwable cause = ute.getUndeclaredThrowable();
LOG.error("Failed to run HistoryServer.", cause);
cause.printStackTrace();
System.exit(1);
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 0ff9e02..0fc4314 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -15,14 +15,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.runtime.webmonitor.history;
-import static org.apache.flink.util.Preconditions.checkNotNull;
+package org.apache.flink.runtime.webmonitor.history;
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.HistoryServerOptions;
import org.apache.flink.core.fs.FileStatus;
@@ -32,6 +27,11 @@ import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
import org.apache.flink.util.FileUtils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,12 +49,14 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* This class is used by the {@link HistoryServer} to fetch the job archives that are located at
* {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are polled in regular intervals, defined
* by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}.
- *
- * The archives are downloaded and expanded into a file structure analog to the REST API defined in the WebRuntimeMonitor.
+ *
+ * <p>The archives are downloaded and expanded into a file structure analog to the REST API defined in the WebRuntimeMonitor.
*/
class HistoryServerArchiveFetcher {
@@ -174,7 +176,7 @@ class HistoryServerArchiveFetcher {
}
java.nio.file.Path targetPath = target.toPath();
-
+
// We overwrite existing files since this may be another attempt at fetching this archive.
// Existing files may be incomplete/corrupt.
if (Files.exists(targetPath)) {
@@ -224,10 +226,10 @@ class HistoryServerArchiveFetcher {
* This method replicates the JSON response that would be given by the {@link CurrentJobsOverviewHandler} when
* listing both running and finished jobs.
*
- * Every job archive contains a joboverview.json file containing the same structure. Since jobs are archived on
+ * <p>Every job archive contains a joboverview.json file containing the same structure. Since jobs are archived on
* their own however the list of finished jobs only contains a single job.
*
- * For the display in the HistoryServer WebFrontend we have to combine these overviews.
+ * <p>For the display in the HistoryServer WebFrontend we have to combine these overviews.
*/
private static void updateJobOverview(File webDir) {
File webOverviewDir = new File(webDir, "overviews");
http://git-wip-us.apache.org/repos/asf/flink/blob/d481f295/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index ba0e2d2..c14f3d8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -26,6 +26,8 @@ package org.apache.flink.runtime.webmonitor.history;
* https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
*****************************************************************************/
+import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
+
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
@@ -41,7 +43,6 @@ import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.router.Routed;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
-import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,10 +71,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Simple file server handler used by the {@link HistoryServer} that serves requests to web frontend's static files,
* such as HTML, CSS, JS or JSON files.
*
- * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
+ * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
* example.
- *
- * This class is a copy of the {@link StaticFileServerHandler}. The differences are that the request path is
+ *
+ * <p>This class is a copy of the {@link StaticFileServerHandler}. The differences are that the request path is
* modified to end on ".json" if it does not have a filename extension; when "index.html" is requested we load
* "index_hs.html" instead to inject the modified HistoryServer WebInterface and that the caching of the "/joboverview"
* page is prevented.
@@ -81,12 +82,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@ChannelHandler.Sharable
public class HistoryServerStaticFileServerHandler extends SimpleChannelInboundHandler<Routed> {
- /** Default logger, if none is specified */
+ /** Default logger, if none is specified. */
private static final Logger LOG = LoggerFactory.getLogger(HistoryServerStaticFileServerHandler.class);
// ------------------------------------------------------------------------
- /** The path in which the static documents are */
+ /** The path in which the static documents are. */
private final File rootPath;
public HistoryServerStaticFileServerHandler(File rootPath) throws IOException {