You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/23 16:13:20 UTC
[3/3] flink git commit: [FLINK-2357] [web dashboard] New dashboard
backend server supports requests from old web server as well.
[FLINK-2357] [web dashboard] New dashboard backend server supports requests from old web server as well.
Also moves TestRunner to test scope.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c52e753a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c52e753a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c52e753a
Branch: refs/heads/master
Commit: c52e753a8d3fc15ed48df7bfa24a327a90df9a0f
Parents: 4473db6
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jul 23 12:01:13 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jul 23 16:11:35 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 11 -
flink-runtime-web/pom.xml | 30 +-
.../flink/runtime/webmonitor/TestRunner.java | 197 ------
.../runtime/webmonitor/WebRuntimeMonitor.java | 10 +-
.../handlers/RequestJobIdsHandler.java | 2 +-
.../legacy/JobManagerInfoHandler.java | 705 +++++++++++++++++++
.../runtime/webmonitor/legacy/JsonFactory.java | 112 +++
.../runtime/webmonitor/runner/TestRunner.java | 198 ++++++
.../web-dashboard/app/scripts/index.coffee | 6 +-
flink-runtime-web/web-dashboard/server.js | 2 +-
flink-runtime-web/web-dashboard/web/js/index.js | 5 +-
.../flink/runtime/jobmanager/JobManager.scala | 3 -
.../minicluster/LocalFlinkMiniCluster.scala | 9 +-
.../test/util/ForkableFlinkMiniCluster.scala | 9 +-
14 files changed, 1058 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9690f41..c76741b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -303,11 +303,6 @@ public final class ConfigConstants {
* The option that specifies whether to use the new web frontend
*/
public static final String JOB_MANAGER_NEW_WEB_FRONTEND_KEY = "jobmanager.new-web-frontend";
-
- /**
- * The port for the runtime monitor web-frontend server.
- */
- public static final String JOB_MANAGER_NEW_WEB_PORT_KEY = "jobmanager.new-web.port";
/**
* The config parameter defining the number of archived jobs for the jobmanager
@@ -612,12 +607,6 @@ public final class ConfigConstants {
* Setting this value to {@code -1} disables the web frontend.
*/
public static final int DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT = 8081;
-
- /**
- * The config key for the port of the JobManager new web frontend.
- * Setting this value to {@code -1} disables the web frontend.
- */
- public static final int DEFAULT_JOB_MANAGER_NEW_WEB_FRONTEND_PORT = 8082;
/**
* The default number of archived jobs for the jobmanager
http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index ffe15af..0a05111 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -46,18 +46,6 @@ under the License.
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java-examples</artifactId>
- <version>${project.version}</version>
- </dependency>
-
<!-- ===================================================
Dependencies for the Web Server
=================================================== -->
@@ -102,6 +90,24 @@ under the License.
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
+
+ <!-- ===================================================
+ Testing
+ =================================================== -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java-examples</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java
deleted file mode 100644
index eecc81a..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/TestRunner.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.java.relational.util.WebLogData;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
-
-/**
- * Simple runner that brings up a local cluster with the web server and executes two
- * jobs to expose their data in the archive
- */
-@SuppressWarnings("serial")
-public class TestRunner {
-
- public static void main(String[] args) throws Exception {
-
- // start the cluster with the runtime monitor
- Configuration configuration = new Configuration();
- configuration.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true);
- configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
- configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY,
- "/data/repositories/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
-
- LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false);
-
- final int port = cluster.getJobManagerRPCPort();
- runWordCount(port);
- runWebLogAnalysisExample(port);
- runWordCount(port);
-
- Object o = new Object();
- synchronized (o) {
- o.wait();
- }
-
- cluster.shutdown();
- }
-
- private static void runWordCount(int port) throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
-
- DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
-
- DataSet<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
- text.flatMap(new Tokenizer())
- // group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0)
- .sum(1);
-
- counts.print();
- }
-
- private static void runWebLogAnalysisExample(int port) throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
-
- // get input data
- DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);
- DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);
- DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);
-
- // Retain documents with keywords
- DataSet<Tuple1<String>> filterDocs = documents
- .filter(new FilterDocByKeyWords())
- .project(0);
-
- // Filter ranks by minimum rank
- DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
- .filter(new FilterByRank());
-
- // Filter visits by visit date
- DataSet<Tuple1<String>> filterVisits = visits
- .filter(new FilterVisitsByDate())
- .project(0);
-
- // Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
- DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
- filterDocs.join(filterRanks)
- .where(0).equalTo(1)
- .projectSecond(0,1,2);
-
- // Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
- DataSet<Tuple3<Integer, String, Integer>> result =
- joinDocsRanks.coGroup(filterVisits)
- .where(1).equalTo(0)
- .with(new AntiJoinVisits());
-
- result.print();
- }
-
- // *************************************************************************
- // USER FUNCTIONS
- // *************************************************************************
-
- public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-
- public static class FilterDocByKeyWords implements FilterFunction<Tuple2<String, String>> {
-
- private static final String[] KEYWORDS = { " editors ", " oscillations " };
-
- @Override
- public boolean filter(Tuple2<String, String> value) throws Exception {
- // FILTER
- // Only collect the document if all keywords are contained
- String docText = value.f1;
- for (String kw : KEYWORDS) {
- if (!docText.contains(kw)) {
- return false;
- }
- }
- return true;
- }
- }
-
- public static class FilterByRank implements FilterFunction<Tuple3<Integer, String, Integer>> {
-
- private static final int RANKFILTER = 40;
-
- @Override
- public boolean filter(Tuple3<Integer, String, Integer> value) throws Exception {
- return (value.f0 > RANKFILTER);
- }
- }
-
-
- public static class FilterVisitsByDate implements FilterFunction<Tuple2<String, String>> {
-
- private static final int YEARFILTER = 2007;
-
- @Override
- public boolean filter(Tuple2<String, String> value) throws Exception {
- // Parse date string with the format YYYY-MM-DD and extract the year
- String dateString = value.f1;
- int year = Integer.parseInt(dateString.substring(0,4));
- return (year == YEARFILTER);
- }
- }
-
-
- @FunctionAnnotation.ForwardedFieldsFirst("*")
- public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
-
- @Override
- public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
- // Check if there is a entry in the visits relation
- if (!visits.iterator().hasNext()) {
- for (Tuple3<Integer, String, Integer> next : ranks) {
- // Emit all rank pairs
- out.collect(next);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 3a8dd83..0aa6b07 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.webmonitor.handlers.RequestConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestJobIdsHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestOverviewHandler;
+import org.apache.flink.runtime.webmonitor.legacy.JobManagerInfoHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,8 +112,8 @@ public class WebRuntimeMonitor implements WebMonitor {
}
// port configuration
- this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_NEW_WEB_PORT_KEY,
- ConfigConstants.DEFAULT_JOB_MANAGER_NEW_WEB_FRONTEND_PORT);
+ this.configuredPort = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
if (this.configuredPort < 0) {
throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
}
@@ -133,7 +134,10 @@ public class WebRuntimeMonitor implements WebMonitor {
.GET("/jobs/:jobid/plan", handler(new ExecutionPlanHandler(currentGraphs)))
// .GET("/running/:jobid/:jobvertex", handler(new ExecutionPlanHandler(currentGraphs)))
-
+
+ // the handler for the legacy requests
+ .GET("/jobsInfo", new JobManagerInfoHandler(jobManager, archive, DEFAULT_REQUEST_TIMEOUT))
+
// this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(webRootDir));
http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
index a09bc1a..1f28a01 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestJobIdsHandler.java
@@ -38,7 +38,7 @@ import java.util.Map;
* May serve the IDs of current jobs, or past jobs, depending on whether this handler is
* given the JobManager or Archive Actor Reference.
*/
-public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse {
+public class RequestJobIdsHandler implements RequestHandler, RequestHandler.JsonResponse {
private final ActorRef target;
http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
new file mode 100644
index 0000000..0a1e08c
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JobManagerInfoHandler.java
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.legacy;
+
+import akka.actor.ActorRef;
+
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.router.KeepAliveWrite;
+import io.netty.handler.codec.http.router.Routed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.ArchiveMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultStringsFound;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsNotFound;
+import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResultsStringified;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Tuple3;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+@ChannelHandler.Sharable
+public class JobManagerInfoHandler extends SimpleChannelInboundHandler<Routed> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobManagerInfoHandler.class);
+
+ private static final Charset ENCODING = Charset.forName("UTF-8");
+
+ /** Underlying JobManager */
+ private final ActorRef jobmanager;
+ private final ActorRef archive;
+ private final FiniteDuration timeout;
+
+
+ public JobManagerInfoHandler(ActorRef jobmanager, ActorRef archive, FiniteDuration timeout) {
+ this.jobmanager = jobmanager;
+ this.archive = archive;
+ this.timeout = timeout;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
+ DefaultFullHttpResponse response;
+ try {
+ String result = handleRequest(routed);
+ byte[] bytes = result.getBytes(ENCODING);
+
+ response = new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
+ }
+ catch (Exception e) {
+ byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);
+ response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+ HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
+ }
+
+ response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+
+ KeepAliveWrite.flush(ctx, routed.request(), response);
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private String handleRequest(Routed routed) throws Exception {
+ if ("archive".equals(routed.queryParam("get"))) {
+ Future<Object> response = Patterns.ask(archive, ArchiveMessages.getRequestArchivedJobs(),
+ new Timeout(timeout));
+
+ Object result = Await.result(response, timeout);
+
+ if(!(result instanceof ArchiveMessages.ArchivedJobs)) {
+ throw new RuntimeException("RequestArchiveJobs requires a response of type " +
+ "ArchivedJobs. Instead the response is of type " + result.getClass() +".");
+ }
+ else {
+ final List<ExecutionGraph> archivedJobs = new ArrayList<ExecutionGraph>(
+ ((ArchiveMessages.ArchivedJobs) result).asJavaCollection());
+
+ return writeJsonForArchive(archivedJobs);
+ }
+ }
+ else if ("jobcounts".equals(routed.queryParam("get"))) {
+ Future<Object> response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(),
+ new Timeout(timeout));
+
+ Object result = Await.result(response, timeout);
+
+ if (!(result instanceof Tuple3)) {
+ throw new RuntimeException("RequestJobCounts requires a response of type " +
+ "Tuple3. Instead the response is of type " + result.getClass() +
+ ".");
+ }
+ else {
+ return writeJsonForJobCounts((Tuple3<Integer, Integer, Integer>) result);
+ }
+ }
+ else if ("job".equals(routed.queryParam("get"))) {
+ String jobId = routed.queryParam("job");
+
+ Future<Object> response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
+ new Timeout(timeout));
+
+ Object result = Await.result(response, timeout);
+
+ if (!(result instanceof JobManagerMessages.JobResponse)){
+ throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
+ "Instead the response is of type " + result.getClass());
+ }
+ else {
+ final JobManagerMessages.JobResponse jobResponse = (JobManagerMessages.JobResponse) result;
+
+ if (jobResponse instanceof JobManagerMessages.JobFound){
+ ExecutionGraph archivedJob = ((JobManagerMessages.JobFound)result).executionGraph();
+ return writeJsonForArchivedJob(archivedJob);
+ }
+ else {
+ throw new Exception("DoGet:job: Could not find job for job ID " + jobId);
+ }
+ }
+ }
+ else if ("groupvertex".equals(routed.queryParam("get"))) {
+ String jobId = routed.queryParam("job");
+ String groupVertexId = routed.queryParam("groupvertex");
+
+ // No group vertex specified
+ if (groupVertexId.equals("null")) {
+ throw new Exception("Found null groupVertexId");
+ }
+
+ Future<Object> response = Patterns.ask(archive, new JobManagerMessages.RequestJob(JobID.fromHexString(jobId)),
+ new Timeout(timeout));
+
+ Object result = Await.result(response, timeout);
+
+ if (!(result instanceof JobManagerMessages.JobResponse)){
+ throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
+ "Instead the response is of type " + result.getClass());
+ }
+ else {
+ final JobManagerMessages.JobResponse jobResponse = (JobManagerMessages.JobResponse) result;
+
+ if (jobResponse instanceof JobManagerMessages.JobFound) {
+ ExecutionGraph archivedJob = ((JobManagerMessages.JobFound)jobResponse).executionGraph();
+
+ return writeJsonForArchivedJobGroupvertex(archivedJob, JobVertexID.fromHexString(groupVertexId));
+ }
+ else {
+ throw new Exception("DoGet:groupvertex: Could not find job for job ID " + jobId);
+ }
+ }
+ }
+ else if ("taskmanagers".equals(routed.queryParam("get"))) {
+ Future<Object> response = Patterns.ask(jobmanager,
+ JobManagerMessages.getRequestNumberRegisteredTaskManager(),
+ new Timeout(timeout));
+
+ Object result = Await.result(response, timeout);
+
+ if (!(result instanceof Integer)) {
+ throw new RuntimeException("RequestNumberRegisteredTaskManager requires a " +
+ "response of type Integer. Instead the response is of type " +
+ result.getClass() + ".");
+ }
+ else {
+ final int numberOfTaskManagers = (Integer)result;
+
+ final Future<Object> responseRegisteredSlots = Patterns.ask(jobmanager,
+ JobManagerMessages.getRequestTotalNumberOfSlots(),
+ new Timeout(timeout));
+
+ final Object resultRegisteredSlots = Await.result(responseRegisteredSlots,
+ timeout);
+
+ if (!(resultRegisteredSlots instanceof Integer)) {
+ throw new RuntimeException("RequestTotalNumberOfSlots requires a response of " +
+ "type Integer. Instaed the response of type " +
+ resultRegisteredSlots.getClass() + ".");
+ }
+ else {
+ final int numberOfRegisteredSlots = (Integer) resultRegisteredSlots;
+
+ return "{\"taskmanagers\": " + numberOfTaskManagers + ", " +
+ "\"slots\": " + numberOfRegisteredSlots + "}";
+ }
+ }
+ }
+ else if ("cancel".equals(routed.queryParam("get"))) {
+ String jobId = routed.queryParam("job");
+
+ Future<Object> response = Patterns.ask(jobmanager, new JobManagerMessages.CancelJob(JobID.fromHexString(jobId)),
+ new Timeout(timeout));
+
+ Await.ready(response, timeout);
+ return "{}";
+ }
+ else if ("updates".equals(routed.queryParam("get"))) {
+ String jobId = routed.queryParam("job");
+ return writeJsonUpdatesForJob(JobID.fromHexString(jobId));
+ }
+ else if ("version".equals(routed.queryParam("get"))) {
+ return writeJsonForVersion();
+ }
+ else{
+ Future<Object> response = Patterns.ask(jobmanager, JobManagerMessages.getRequestRunningJobs(),
+ new Timeout(timeout));
+
+ Object result = Await.result(response, timeout);
+
+ if(!(result instanceof JobManagerMessages.RunningJobs)){
+ throw new RuntimeException("RequestRunningJobs requires a response of type " +
+ "RunningJobs. Instead the response of type " + result.getClass() + ".");
+ }
+ else {
+ final Iterable<ExecutionGraph> runningJobs =
+ ((JobManagerMessages.RunningJobs) result).asJavaIterable();
+
+ return writeJsonForJobs(runningJobs);
+ }
+ }
+ }
+
+ private String writeJsonForJobs(Iterable<ExecutionGraph> graphs) {
+ StringBuilder bld = new StringBuilder();
+ bld.append("[");
+
+ Iterator<ExecutionGraph> it = graphs.iterator();
+ // Loop Jobs
+ while(it.hasNext()){
+ ExecutionGraph graph = it.next();
+
+ writeJsonForJob(bld, graph);
+
+ //Write seperator between json objects
+ if(it.hasNext()) {
+ bld.append(",");
+ }
+ }
+ bld.append("]");
+
+ return bld.toString();
+ }
+
+ private void writeJsonForJob(StringBuilder bld, ExecutionGraph graph) {
+ //Serialize job to json
+ bld.append("{");
+ bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
+ bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
+ bld.append("\"status\": \"").append(graph.getState()).append("\",");
+ bld.append("\"time\": ").append(graph.getStatusTimestamp(graph.getState())).append(",");
+
+ // Serialize ManagementGraph to json
+ bld.append("\"groupvertices\": [");
+ boolean first = true;
+
+ for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+ //Write seperator between json objects
+ if (first) {
+ first = false;
+ } else {
+ bld.append(",");
+ }
+ bld.append(JsonFactory.toJson(groupVertex));
+ }
+ bld.append("]");
+ bld.append("}");
+ }
+
+ private String writeJsonForArchive(List<ExecutionGraph> graphs) {
+ StringBuilder bld = new StringBuilder();
+ bld.append("[");
+
+ // sort jobs by time
+ Collections.sort(graphs, new Comparator<ExecutionGraph>() {
+ @Override
+ public int compare(ExecutionGraph o1, ExecutionGraph o2) {
+ if (o1.getStatusTimestamp(o1.getState()) < o2.getStatusTimestamp(o2.getState())) {
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+
+ });
+
+ // Loop Jobs
+ for (int i = 0; i < graphs.size(); i++) {
+ ExecutionGraph graph = graphs.get(i);
+
+ //Serialize job to json
+ bld.append("{");
+ bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
+ bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
+ bld.append("\"status\": \"").append(graph.getState()).append("\",");
+ bld.append("\"time\": ").append(graph.getStatusTimestamp(graph.getState()));
+
+ bld.append("}");
+
+ //Write seperator between json objects
+ if(i != graphs.size() - 1) {
+ bld.append(",");
+ }
+ }
+ bld.append("]");
+ return bld.toString();
+ }
+
+ private String writeJsonForJobCounts(Tuple3<Integer, Integer, Integer> jobCounts) {
+ return "{\"finished\": " + jobCounts._1() + ",\"canceled\": " + jobCounts._2() + ",\"failed\": "
+ + jobCounts._3() + "}";
+ }
+
+
+ private String writeJsonForArchivedJob(ExecutionGraph graph) {
+ StringBuilder bld = new StringBuilder();
+
+ bld.append("[");
+ bld.append("{");
+ bld.append("\"jobid\": \"").append(graph.getJobID()).append("\",");
+ bld.append("\"jobname\": \"").append(graph.getJobName()).append("\",");
+ bld.append("\"status\": \"").append(graph.getState()).append("\",");
+ bld.append("\"SCHEDULED\": ").append(graph.getStatusTimestamp(JobStatus.CREATED)).append(",");
+ bld.append("\"RUNNING\": ").append(graph.getStatusTimestamp(JobStatus.RUNNING)).append(",");
+ bld.append("\"FINISHED\": ").append(graph.getStatusTimestamp(JobStatus.FINISHED)).append(",");
+ bld.append("\"FAILED\": ").append(graph.getStatusTimestamp(JobStatus.FAILED)).append(",");
+ bld.append("\"CANCELED\": ").append(graph.getStatusTimestamp(JobStatus.CANCELED)).append(",");
+
+ if (graph.getState() == JobStatus.FAILED) {
+ bld.append("\"failednodes\": [");
+ boolean first = true;
+ for (ExecutionVertex vertex : graph.getAllExecutionVertices()) {
+ if (vertex.getExecutionState() == ExecutionState.FAILED) {
+ InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+ Throwable failureCause = vertex.getFailureCause();
+ if (location != null || failureCause != null) {
+ if (first) {
+ first = false;
+ } else {
+ bld.append(",");
+ }
+ bld.append("{");
+ bld.append("\"node\": \"").append(location == null ? "(none)" : location.getFQDNHostname()).append("\",");
+ bld.append("\"message\": \"").append(failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause))).append("\"");
+ bld.append("}");
+ }
+ }
+ }
+ bld.append("],");
+ }
+
+ // Serialize ManagementGraph to json
+ bld.append("\"groupvertices\": [");
+ boolean first = true;
+ for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+ //Write seperator between json objects
+ if (first) {
+ first = false;
+ } else {
+ bld.append(",");
+ }
+
+ bld.append(JsonFactory.toJson(groupVertex));
+
+ }
+ bld.append("],");
+
+ // write user config
+ ExecutionConfig ec = graph.getExecutionConfig();
+ if(ec != null) {
+ bld.append("\"executionConfig\": {");
+ bld.append("\"Execution Mode\": \"").append(ec.getExecutionMode()).append("\",");
+ bld.append("\"Number of execution retries\": \"").append(ec.getNumberOfExecutionRetries()).append("\",");
+ bld.append("\"Job parallelism\": \"").append(ec.getParallelism()).append("\",");
+ bld.append("\"Object reuse mode\": \"").append(ec.isObjectReuseEnabled()).append("\"");
+ ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();
+ if(uc != null) {
+ Map<String, String> ucVals = uc.toMap();
+ if (ucVals != null) {
+ String ucString = "{";
+ int i = 0;
+ for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
+ ucString += "\"" + ucVal.getKey() + "\":\"" + ucVal.getValue() + "\"";
+ if (++i < ucVals.size()) {
+ ucString += ",\n";
+ }
+ }
+ bld.append(", \"userConfig\": ").append(ucString).append("}");
+ }
+ else {
+ LOG.debug("GlobalJobParameters.toMap() did not return anything");
+ }
+ }
+ else {
+ LOG.debug("No GlobalJobParameters were set in the execution config");
+ }
+ bld.append("},");
+ }
+ else {
+ LOG.warn("Unable to retrieve execution config from execution graph");
+ }
+
+ // write accumulators
+ final Future<Object> response = Patterns.ask(jobmanager,
+ new RequestAccumulatorResultsStringified(graph.getJobID()), new Timeout(timeout));
+
+ Object result;
+ try {
+ result = Await.result(response, timeout);
+ }
+ catch (Exception ex) {
+ throw new RuntimeException("Could not retrieve the accumulator results from the job manager.", ex);
+ }
+
+ if (result instanceof AccumulatorResultStringsFound) {
+ StringifiedAccumulatorResult[] accumulators = ((AccumulatorResultStringsFound) result).result();
+
+ bld.append("\n\"accumulators\": [");
+ int i = 0;
+ for (StringifiedAccumulatorResult accumulator : accumulators) {
+ bld.append("{ \"name\": \"").append(accumulator.getName()).append(" (").append(accumulator.getType()).append(")\",").append(" \"value\": \"").append(accumulator.getValue()).append("\"}\n");
+ if (++i < accumulators.length) {
+ bld.append(",");
+ }
+ }
+ bld.append("],\n");
+ }
+ else if (result instanceof AccumulatorResultsNotFound) {
+ bld.append("\n\"accumulators\": [],");
+ }
+ else if (result instanceof AccumulatorResultsErroneous) {
+ LOG.error("Could not obtain accumulators for job " + graph.getJobID(),
+ ((AccumulatorResultsErroneous) result).cause());
+ }
+ else {
+ throw new RuntimeException("RequestAccumulatorResults requires a response of type " +
+ "AccumulatorResultStringsFound. Instead the response is of type " +
+ result.getClass() + ".");
+ }
+
+ bld.append("\"groupverticetimes\": {");
+ first = true;
+
+ for (ExecutionJobVertex groupVertex : graph.getVerticesTopologically()) {
+ if (first) {
+ first = false;
+ } else {
+ bld.append(",");
+ }
+
+ // Calculate start and end time for groupvertex
+ long started = Long.MAX_VALUE;
+ long ended = 0;
+
+ // Take earliest running state and latest endstate of groupmembers
+ for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
+
+ long running = vertex.getStateTimestamp(ExecutionState.RUNNING);
+ if (running != 0 && running < started) {
+ started = running;
+ }
+
+ long finished = vertex.getStateTimestamp(ExecutionState.FINISHED);
+ long canceled = vertex.getStateTimestamp(ExecutionState.CANCELED);
+ long failed = vertex.getStateTimestamp(ExecutionState.FAILED);
+
+ if (finished != 0 && finished > ended) {
+ ended = finished;
+ }
+
+ if (canceled != 0 && canceled > ended) {
+ ended = canceled;
+ }
+
+ if (failed != 0 && failed > ended) {
+ ended = failed;
+ }
+
+ }
+
+ bld.append("\"").append(groupVertex.getJobVertexId()).append("\": {");
+ bld.append("\"groupvertexid\": \"").append(groupVertex.getJobVertexId()).append("\",");
+ bld.append("\"groupvertexname\": \"").append(groupVertex).append("\",");
+ bld.append("\"STARTED\": ").append(started).append(",");
+ bld.append("\"ENDED\": ").append(ended);
+ bld.append("}");
+
+ }
+
+ bld.append("}");
+ bld.append("}");
+ bld.append("]");
+
+ return bld.toString();
+ }
+
+
+ private String writeJsonUpdatesForJob(JobID jobId) {
+ final Future<Object> responseArchivedJobs = Patterns.ask(jobmanager,
+ JobManagerMessages.getRequestRunningJobs(),
+ new Timeout(timeout));
+
+ Object resultArchivedJobs;
+ try{
+ resultArchivedJobs = Await.result(responseArchivedJobs, timeout);
+ }
+ catch (Exception ex) {
+ throw new RuntimeException("Could not retrieve archived jobs from the job manager.", ex);
+ }
+
+ if(!(resultArchivedJobs instanceof JobManagerMessages.RunningJobs)){
+ throw new RuntimeException("RequestArchivedJobs requires a response of type " +
+ "RunningJobs. Instead the response is of type " +
+ resultArchivedJobs.getClass() + ".");
+ }
+ else {
+ final Iterable<ExecutionGraph> graphs = ((JobManagerMessages.RunningJobs)resultArchivedJobs).
+ asJavaIterable();
+
+ //Serialize job to json
+ final StringBuilder bld = new StringBuilder();
+
+ bld.append("{");
+ bld.append("\"jobid\": \"").append(jobId).append("\",");
+ bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\",");
+ bld.append("\"recentjobs\": [");
+
+ boolean first = true;
+
+ for (ExecutionGraph g : graphs){
+ if (first) {
+ first = false;
+ } else {
+ bld.append(",");
+ }
+
+ bld.append("\"").append(g.getJobID()).append("\"");
+ }
+ bld.append("],");
+
+ final Future<Object> responseJob = Patterns.ask(jobmanager, new JobManagerMessages.RequestJob(jobId),
+ new Timeout(timeout));
+
+ Object resultJob;
+ try{
+ resultJob = Await.result(responseJob, timeout);
+ }
+ catch (Exception ex){
+ throw new RuntimeException("Could not retrieve the job with jobID " + jobId +
+ "from the job manager.", ex);
+ }
+
+ if (!(resultJob instanceof JobManagerMessages.JobResponse)) {
+ throw new RuntimeException("RequestJob requires a response of type JobResponse. " +
+ "Instead the response is of type " + resultJob.getClass() + ".");
+ }
+ else {
+ final JobManagerMessages.JobResponse response = (JobManagerMessages.JobResponse) resultJob;
+
+ if (response instanceof JobManagerMessages.JobFound){
+ ExecutionGraph graph = ((JobManagerMessages.JobFound)response).executionGraph();
+
+ bld.append("\"vertexevents\": [");
+
+ first = true;
+ for (ExecutionVertex ev : graph.getAllExecutionVertices()) {
+ if (first) {
+ first = false;
+ } else {
+ bld.append(",");
+ }
+
+ bld.append("{");
+ bld.append("\"vertexid\": \"").append(ev.getCurrentExecutionAttempt().getAttemptId()).append("\",");
+ bld.append("\"newstate\": \"").append(ev.getExecutionState()).append("\",");
+ bld.append("\"timestamp\": \"").append(ev.getStateTimestamp(ev.getExecutionState())).append("\"");
+ bld.append("}");
+ }
+
+ bld.append("],");
+
+ bld.append("\"jobevents\": [");
+
+ bld.append("{");
+ bld.append("\"newstate\": \"").append(graph.getState()).append("\",");
+ bld.append("\"timestamp\": \"").append(graph.getStatusTimestamp(graph.getState())).append("\"");
+ bld.append("}");
+
+ bld.append("]");
+
+ bld.append("}");
+ }
+ else {
+ bld.append("\"vertexevents\": [],");
+ bld.append("\"jobevents\": [");
+ bld.append("{");
+ bld.append("\"newstate\": \"").append(JobStatus.FINISHED.toString()).append("\",");
+ bld.append("\"timestamp\": \"").append(System.currentTimeMillis()).append("\"");
+ bld.append("}");
+ bld.append("]");
+ bld.append("}");
+ }
+ }
+
+ return bld.toString();
+ }
+ }
+
+ private String writeJsonForArchivedJobGroupvertex(ExecutionGraph graph, JobVertexID vertexId) {
+ ExecutionJobVertex jobVertex = graph.getJobVertex(vertexId);
+ StringBuilder bld = new StringBuilder();
+
+ bld.append("{\"groupvertex\": ").append(JsonFactory.toJson(jobVertex)).append(",");
+
+ bld.append("\"verticetimes\": {");
+ boolean first = true;
+ for (ExecutionJobVertex groupVertex : graph.getAllVertices().values()) {
+
+ for (ExecutionVertex vertex : groupVertex.getTaskVertices()) {
+
+ Execution exec = vertex.getCurrentExecutionAttempt();
+
+ if(first) {
+ first = false;
+ } else {
+ bld.append(","); }
+
+ bld.append("\"").append(exec.getAttemptId()).append("\": {");
+ bld.append("\"vertexid\": \"").append(exec.getAttemptId()).append("\",");
+ bld.append("\"vertexname\": \"").append(vertex).append("\",");
+ bld.append("\"CREATED\": ").append(vertex.getStateTimestamp(ExecutionState.CREATED)).append(",");
+ bld.append("\"SCHEDULED\": ").append(vertex.getStateTimestamp(ExecutionState.SCHEDULED)).append(",");
+ bld.append("\"DEPLOYING\": ").append(vertex.getStateTimestamp(ExecutionState.DEPLOYING)).append(",");
+ bld.append("\"RUNNING\": ").append(vertex.getStateTimestamp(ExecutionState.RUNNING)).append(",");
+ bld.append("\"FINISHED\": ").append(vertex.getStateTimestamp(ExecutionState.FINISHED)).append(",");
+ bld.append("\"CANCELING\": ").append(vertex.getStateTimestamp(ExecutionState.CANCELING)).append(",");
+ bld.append("\"CANCELED\": ").append(vertex.getStateTimestamp(ExecutionState.CANCELED)).append(",");
+ bld.append("\"FAILED\": ").append(vertex.getStateTimestamp(ExecutionState.FAILED)).append("");
+ bld.append("}");
+ }
+
+ }
+ bld.append("}}");
+ return bld.toString();
+ }
+
+
+ private String writeJsonForVersion() {
+ return "{\"version\": \"" + EnvironmentInformation.getVersion() + "\",\"revision\": \"" +
+ EnvironmentInformation.getRevisionInformation().commitId + "\"}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
new file mode 100644
index 0000000..fe18d3f
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/legacy/JsonFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.legacy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class JsonFactory {
+
+ public static String toJson(ExecutionVertex vertex) {
+ StringBuilder json = new StringBuilder("");
+ json.append("{");
+ json.append("\"vertexid\": \"").append(vertex.getCurrentExecutionAttempt().getAttemptId()).append("\",");
+ json.append("\"vertexname\": \"").append(StringUtils.escapeHtml(vertex.getSimpleName())).append("\",");
+ json.append("\"vertexstatus\": \"").append(vertex.getExecutionState()).append("\",");
+
+ InstanceConnectionInfo location = vertex.getCurrentAssignedResourceLocation();
+ String instanceName = location == null ? "(null)" : location.getFQDNHostname();
+
+ json.append("\"vertexinstancename\": \"").append(instanceName).append("\"");
+ json.append("}");
+ return json.toString();
+ }
+
+ public static String toJson(ExecutionJobVertex jobVertex) {
+ StringBuilder json = new StringBuilder("");
+
+ json.append("{");
+ json.append("\"groupvertexid\": \"").append(jobVertex.getJobVertexId()).append("\",");
+ json.append("\"groupvertexname\": \"").append(StringUtils.escapeHtml(jobVertex.getJobVertex().getName())).append("\",");
+ json.append("\"numberofgroupmembers\": ").append(jobVertex.getParallelism()).append(",");
+ json.append("\"groupmembers\": [");
+
+ // Count state status of group members
+ Map<ExecutionState, Integer> stateCounts = new HashMap<ExecutionState, Integer>();
+
+ // initialize with 0
+ for (ExecutionState state : ExecutionState.values()) {
+ stateCounts.put(state, 0);
+ }
+
+ ExecutionVertex[] vertices = jobVertex.getTaskVertices();
+
+ for (int j = 0; j < vertices.length; j++) {
+ ExecutionVertex vertex = vertices[j];
+
+ json.append(toJson(vertex));
+
+ // print delimiter
+ if (j != vertices.length - 1) {
+ json.append(",");
+ }
+
+ // Increment state status count
+ int count = stateCounts.get(vertex.getExecutionState()) + 1;
+ stateCounts.put(vertex.getExecutionState(), count);
+ }
+
+ json.append("],");
+ json.append("\"backwardEdges\": [");
+
+ List<IntermediateResult> inputs = jobVertex.getInputs();
+
+ for (int inputNumber = 0; inputNumber < inputs.size(); inputNumber++) {
+ ExecutionJobVertex input = inputs.get(inputNumber).getProducer();
+
+ json.append("{");
+ json.append("\"groupvertexid\": \"").append(input.getJobVertexId()).append("\",");
+ json.append("\"groupvertexname\": \"").append(StringUtils.escapeHtml(jobVertex.getJobVertex().getName())).append("\"");
+ json.append("}");
+
+ // print delimiter
+ if(inputNumber != inputs.size() - 1) {
+ json.append(",");
+ }
+ }
+ json.append("]");
+
+ // list number of members for each status
+ for (Map.Entry<ExecutionState, Integer> stateCount : stateCounts.entrySet()) {
+ json.append(",\"").append(stateCount.getKey()).append("\": ").append(stateCount.getValue());
+ }
+
+ json.append("}");
+
+ return json.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
new file mode 100644
index 0000000..9a9b6ba
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/runner/TestRunner.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.runner;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.examples.java.relational.util.WebLogData;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+/**
+ * Simple runner that brings up a local cluster with the web server and executes two
+ * jobs to expose their data in the archive
+ */
+@SuppressWarnings("serial")
+public class TestRunner {
+
+ public static void main(String[] args) throws Exception {
+
+ // start the cluster with the runtime monitor
+ Configuration configuration = new Configuration();
+ configuration.setBoolean(ConfigConstants.LOCAL_INSTANCE_MANAGER_START_WEBSERVER, true);
+ configuration.setBoolean(ConfigConstants.JOB_MANAGER_NEW_WEB_FRONTEND_KEY, true);
+ configuration.setString(ConfigConstants.JOB_MANAGER_WEB_DOC_ROOT_KEY,
+ "/data/repositories/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/resources/web-runtime-monitor");
+
+ LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(configuration, false);
+
+ final int port = cluster.getJobManagerRPCPort();
+ runWordCount(port);
+ runWebLogAnalysisExample(port);
+ runWordCount(port);
+
+ // block the thread
+ Object o = new Object();
+ synchronized (o) {
+ o.wait();
+ }
+
+ cluster.shutdown();
+ }
+
+ private static void runWordCount(int port) throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
+
+ DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new Tokenizer())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0)
+ .sum(1);
+
+ counts.print();
+ }
+
+ private static void runWebLogAnalysisExample(int port) throws Exception {
+ final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("localhost", port);
+
+ // get input data
+ DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);
+ DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);
+ DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);
+
+ // Retain documents with keywords
+ DataSet<Tuple1<String>> filterDocs = documents
+ .filter(new FilterDocByKeyWords())
+ .project(0);
+
+ // Filter ranks by minimum rank
+ DataSet<Tuple3<Integer, String, Integer>> filterRanks = ranks
+ .filter(new FilterByRank());
+
+ // Filter visits by visit date
+ DataSet<Tuple1<String>> filterVisits = visits
+ .filter(new FilterVisitsByDate())
+ .project(0);
+
+ // Join the filtered documents and ranks, i.e., get all URLs with min rank and keywords
+ DataSet<Tuple3<Integer, String, Integer>> joinDocsRanks =
+ filterDocs.join(filterRanks)
+ .where(0).equalTo(1)
+ .projectSecond(0,1,2);
+
+ // Anti-join urls with visits, i.e., retain all URLs which have NOT been visited in a certain time
+ DataSet<Tuple3<Integer, String, Integer>> result =
+ joinDocsRanks.coGroup(filterVisits)
+ .where(1).equalTo(0)
+ .with(new AntiJoinVisits());
+
+ result.print();
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+ public static class FilterDocByKeyWords implements FilterFunction<Tuple2<String, String>> {
+
+ private static final String[] KEYWORDS = { " editors ", " oscillations " };
+
+ @Override
+ public boolean filter(Tuple2<String, String> value) throws Exception {
+ // FILTER
+ // Only collect the document if all keywords are contained
+ String docText = value.f1;
+ for (String kw : KEYWORDS) {
+ if (!docText.contains(kw)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ public static class FilterByRank implements FilterFunction<Tuple3<Integer, String, Integer>> {
+
+ private static final int RANKFILTER = 40;
+
+ @Override
+ public boolean filter(Tuple3<Integer, String, Integer> value) throws Exception {
+ return (value.f0 > RANKFILTER);
+ }
+ }
+
+
+ public static class FilterVisitsByDate implements FilterFunction<Tuple2<String, String>> {
+
+ private static final int YEARFILTER = 2007;
+
+ @Override
+ public boolean filter(Tuple2<String, String> value) throws Exception {
+ // Parse date string with the format YYYY-MM-DD and extract the year
+ String dateString = value.f1;
+ int year = Integer.parseInt(dateString.substring(0,4));
+ return (year == YEARFILTER);
+ }
+ }
+
+
+ @FunctionAnnotation.ForwardedFieldsFirst("*")
+ public static class AntiJoinVisits implements CoGroupFunction<Tuple3<Integer, String, Integer>, Tuple1<String>, Tuple3<Integer, String, Integer>> {
+
+ @Override
+ public void coGroup(Iterable<Tuple3<Integer, String, Integer>> ranks, Iterable<Tuple1<String>> visits, Collector<Tuple3<Integer, String, Integer>> out) {
+ // Check if there is a entry in the visits relation
+ if (!visits.iterator().hasNext()) {
+ for (Tuple3<Integer, String, Integer> next : ranks) {
+ // Emit all rank pairs
+ out.collect(next);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index b85d7e0..385b3d6 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -29,11 +29,9 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
# --------------------------------------
.constant 'flinkConfig', {
- webServer: 'http://localhost:8080'
jobServer: 'http://localhost:8081'
- newServer: 'http://localhost:8082'
-# webServer: 'http://localhost:3000/web-server'
-# jobServer: 'http://localhost:3000/job-server'
+ newServer: 'http://localhost:8081'
+# jobServer: 'http://localhost:3000/new-server'
# newServer: 'http://localhost:3000/new-server'
refreshInterval: 10000
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c52e753a/flink-runtime-web/web-dashboard/server.js
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/server.js b/flink-runtime-web/web-dashboard/server.js
index 797cfa0..453e7a4 100644
--- a/flink-runtime-web/web-dashboard/server.js
+++ b/flink-runtime-web/web-dashboard/server.js
@@ -29,7 +29,7 @@ var server = new Hapi.Server();
var remotes = [
{ port: 8080, path: 'web-server' },
{ port: 8081, path: 'job-server' },
- { port: 8082, path: 'new-server' }
+ { port: 8081, path: 'new-server' }
]
server.connection({ port: 3000 });