You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/21 11:42:53 UTC

[1/2] flink git commit: [FLINK-4775] [metrics] Simplify MetricStore access

Repository: flink
Updated Branches:
  refs/heads/master 3137bf774 -> 0d2903541


[FLINK-4775] [metrics] Simplify MetricStore access


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0d290354
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0d290354
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0d290354

Branch: refs/heads/master
Commit: 0d290354179a5ea3a11040a2ed7e218263bc474b
Parents: e30e7a6
Author: zentol <ch...@apache.org>
Authored: Fri Oct 7 10:16:49 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Oct 21 13:42:18 2016 +0200

----------------------------------------------------------------------
 .../metrics/JobManagerMetricsHandler.java       |   2 +-
 .../webmonitor/metrics/JobMetricsHandler.java   |  10 +-
 .../metrics/JobVertexMetricsHandler.java        |  17 +--
 .../runtime/webmonitor/metrics/MetricStore.java | 125 +++++++++++++++++--
 .../metrics/TaskManagerMetricsHandler.java      |   2 +-
 .../webmonitor/metrics/MetricFetcherTest.java   |   8 +-
 .../webmonitor/metrics/MetricStoreTest.java     |  10 +-
 7 files changed, 134 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 54d6aea..7452c71 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -37,7 +37,7 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler {
 
 	@Override
 	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
-		MetricStore.JobManagerMetricStore jobManager = metrics.jobManager;
+		MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore();
 		if (jobManager == null) {
 			return null;
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index cdaae2c..d66c954 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -39,11 +39,9 @@ public class JobMetricsHandler extends AbstractMetricsHandler {
 
 	@Override
 	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
-		MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(PARAMETER_JOB_ID));
-		if (job == null) {
-			return null;
-		} else {
-			return job.metrics;
-		}
+		MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
+		return job != null
+			? job.metrics
+			: null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index 1b92b47..6fca771 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -39,16 +39,11 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler {
 
 	@Override
 	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
-		MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID));
-		if (job == null) {
-			return null;
-		} else {
-			MetricStore.TaskMetricStore task = job.tasks.get(pathParams.get(PARAMETER_VERTEX_ID));
-			if (task == null) {
-				return null;
-			} else {
-				return task.metrics;
-			}
-		}
+		MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
+			pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),
+			pathParams.get(PARAMETER_VERTEX_ID));
+		return task != null
+			? task.metrics
+			: null;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
index c1b2bec..989145b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
@@ -47,17 +47,21 @@ public class MetricStore {
 	final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>();
 	final Map<String, JobMetricStore> jobs = new HashMap<>();
 
+	// -----------------------------------------------------------------------------------------------------------------
+	// Adding metrics
+	// -----------------------------------------------------------------------------------------------------------------
 	public void add(MetricDump metric) {
 		try {
 			QueryScopeInfo info = metric.scopeInfo;
 			TaskManagerMetricStore tm;
 			JobMetricStore job;
 			TaskMetricStore task;
+			SubtaskMetricStore subtask;
 
 			String name = info.scope.isEmpty()
 				? metric.name
 				: info.scope + "." + metric.name;
-			
+
 			if (name.isEmpty()) { // malformed transmission
 				return;
 			}
@@ -96,10 +100,18 @@ public class MetricStore {
 						task = new TaskMetricStore();
 						job.tasks.put(taskInfo.vertexID, task);
 					}
+					subtask = task.subtasks.get(taskInfo.subtaskIndex);
+					if (subtask == null) {
+						subtask = new SubtaskMetricStore();
+						task.subtasks.put(taskInfo.subtaskIndex, subtask);
+					}
 					/**
-					 * As the WebInterface task metric queries currently do not account for subtasks we don't 
-					 * divide by subtask and instead use the concatenation of subtask index and metric name as the name. 
+					 * The duplication is intended. Metrics scoped by subtask are useful for several job/task handlers,
+					 * while the WebInterface task metric queries currently do not account for subtasks, so we don't 
+					 * divide by subtask and instead use the concatenation of subtask index and metric name as the name
+					 * for thos.
 					 */
+					addMetric(subtask.metrics, name, metric);
 					addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric);
 					break;
 				case INFO_CATEGORY_OPERATOR:
@@ -160,32 +172,121 @@ public class MetricStore {
 		}
 	}
 
+	// -----------------------------------------------------------------------------------------------------------------
+	// Accessors for sub MetricStores
+	// -----------------------------------------------------------------------------------------------------------------
+
 	/**
-	 * Sub-structure containing metrics of the JobManager.
+	 * Returns the {@link JobManagerMetricStore}.
+	 *
+	 * @return JobManagerMetricStore
+	 */
+	public JobManagerMetricStore getJobManagerMetricStore() {
+		return jobManager;
+	}
+
+	/**
+	 * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID.
+	 *
+	 * @param tmID taskmanager ID
+	 * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists
+	 */
+	public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
+		return taskManagers.get(tmID);
+	}
+
+	/**
+	 * Returns the {@link JobMetricStore} for the given job ID.
+	 *
+	 * @param jobID job ID
+	 * @return JobMetricStore for the given ID, or null if no store for the given argument exists
+	 */
+	public JobMetricStore getJobMetricStore(String jobID) {
+		return jobs.get(jobID);
+	}
+
+	/**
+	 * Returns the {@link TaskMetricStore} for the given job/task ID.
+	 *
+	 * @param jobID  job ID
+	 * @param taskID task ID
+	 * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists
+	 */
+	public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
+		JobMetricStore job = getJobMetricStore(jobID);
+		if (job == null) {
+			return null;
+		}
+		return job.getTaskMetricStore(taskID);
+	}
+
+	/**
+	 * Returns the {@link SubtaskMetricStore} for the given job/task ID and subtask index.
+	 *
+	 * @param jobID        job ID
+	 * @param taskID       task ID
+	 * @param subtaskIndex subtask index
+	 * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists
 	 */
-	static class JobManagerMetricStore {
+	public SubtaskMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) {
+		TaskMetricStore task = getTaskMetricStore(jobID, taskID);
+		if (task == null) {
+			return null;
+		}
+		return task.getSubtaskMetricStore(subtaskIndex);
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	// sub MetricStore classes
+	// -----------------------------------------------------------------------------------------------------------------
+	private static abstract class ComponentMetricStore {
 		public final Map<String, String> metrics = new HashMap<>();
+
+		public String getMetric(String name, String defaultValue) {
+			String value = this.metrics.get(name);
+			return value != null
+				? value
+				: defaultValue;
+		}
+	}
+
+	/**
+	 * Sub-structure containing metrics of the JobManager.
+	 */
+	public static class JobManagerMetricStore extends ComponentMetricStore {
 	}
 
 	/**
 	 * Sub-structure containing metrics of a single TaskManager.
 	 */
-	static class TaskManagerMetricStore {
-		public final Map<String, String> metrics = new HashMap<>();
+	public static class TaskManagerMetricStore extends ComponentMetricStore {
 	}
 
 	/**
 	 * Sub-structure containing metrics of a single Job.
 	 */
-	static class JobMetricStore {
-		public final Map<String, String> metrics = new HashMap<>();
-		public final Map<String, TaskMetricStore> tasks = new HashMap<>();
+	public static class JobMetricStore extends ComponentMetricStore {
+		private final Map<String, TaskMetricStore> tasks = new HashMap<>();
+
+		public TaskMetricStore getTaskMetricStore(String taskID) {
+			return tasks.get(taskID);
+		}
 	}
 
 	/**
 	 * Sub-structure containing metrics of a single Task.
 	 */
-	static class TaskMetricStore {
-		public final Map<String, String> metrics = new HashMap<>();
+	public static class TaskMetricStore extends ComponentMetricStore {
+		private final Map<Integer, SubtaskMetricStore> subtasks = new HashMap<>();
+
+		public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) {
+			return subtasks.get(subtaskIndex);
+		}
+	}
+
+	/**
+	 * Sub-structure containing metrics of a single Subtask.
+	 */
+	public static class SubtaskMetricStore extends ComponentMetricStore {
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
index e4e8b00..a69b676 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -39,7 +39,7 @@ public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
 
 	@Override
 	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
-		MetricStore.TaskManagerMetricStore taskManager = metrics.taskManagers.get(pathParams.get(PARAMETER_TM_ID));
+		MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(PARAMETER_TM_ID));
 		if (taskManager == null) {
 			return null;
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 14cbeac..3061346 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -156,10 +156,10 @@ public class MetricFetcherTest extends TestLogger {
 			assertEquals("0.99", store.jobManager.metrics.get("abc.hist_p99"));
 			assertEquals("0.999", store.jobManager.metrics.get("abc.hist_p999"));
 
-			assertEquals("x", store.taskManagers.get(tmID.toString()).metrics.get("abc.gauge"));
-			assertEquals("5.0", store.jobs.get(jobID.toString()).metrics.get("abc.jc"));
-			assertEquals("2", store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc"));
-			assertEquals("1", store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc"));
+			assertEquals("x", store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge"));
+			assertEquals("5.0", store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc"));
+			assertEquals("2", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.abc.tc"));
+			assertEquals("1", store.getTaskMetricStore(jobID.toString(), "taskid").metrics.get("2.opname.abc.oc"));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0d290354/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
index ee46494..c71f015 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
@@ -31,11 +31,11 @@ public class MetricStoreTest extends TestLogger {
 	public void testAdd() throws IOException {
 		MetricStore store = setupStore(new MetricStore());
 
-		assertEquals("0", store.jobManager.metrics.get("abc.metric1"));
-		assertEquals("1", store.taskManagers.get("tmid").metrics.get("abc.metric2"));
-		assertEquals("2", store.jobs.get("jobid").metrics.get("abc.metric3"));
-		assertEquals("3", store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4"));
-		assertEquals("4", store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5"));
+		assertEquals("0", store.getJobManagerMetricStore().getMetric("abc.metric1", "-1"));
+		assertEquals("1", store.getTaskManagerMetricStore("tmid").getMetric("abc.metric2", "-1"));
+		assertEquals("2", store.getJobMetricStore("jobid").getMetric("abc.metric3", "-1"));
+		assertEquals("3", store.getTaskMetricStore("jobid", "taskid").getMetric("8.abc.metric4", "-1"));
+		assertEquals("4", store.getTaskMetricStore("jobid", "taskid").getMetric("8.opname.abc.metric5", "-1"));
 	}
 
 	@Test


[2/2] flink git commit: [FLINK-4772] [metrics] Store metrics as strings in MetricStore

Posted by ch...@apache.org.
[FLINK-4772] [metrics] Store metrics as strings in MetricStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e30e7a61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e30e7a61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e30e7a61

Branch: refs/heads/master
Commit: e30e7a6192d921d7a3c94beb178eb2c7b9ba74c0
Parents: 3137bf7
Author: zentol <ch...@apache.org>
Authored: Fri Oct 7 10:11:31 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Fri Oct 21 13:42:18 2016 +0200

----------------------------------------------------------------------
 .../metrics/AbstractMetricsHandler.java         |  6 ++--
 .../metrics/JobManagerMetricsHandler.java       |  2 +-
 .../webmonitor/metrics/JobMetricsHandler.java   |  2 +-
 .../metrics/JobVertexMetricsHandler.java        |  2 +-
 .../runtime/webmonitor/metrics/MetricStore.java | 36 ++++++++++----------
 .../metrics/TaskManagerMetricsHandler.java      |  2 +-
 .../metrics/JobManagerMetricsHandlerTest.java   |  6 ++--
 .../metrics/JobMetricsHandlerTest.java          |  6 ++--
 .../metrics/JobVertexMetricsHandlerTest.java    |  8 ++---
 .../webmonitor/metrics/MetricFetcherTest.java   | 28 +++++++--------
 .../webmonitor/metrics/MetricStoreTest.java     | 10 +++---
 .../metrics/TaskManagerMetricsHandlerTest.java  |  6 ++--
 12 files changed, 57 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index 54e4b6f..8374523 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -61,7 +61,7 @@ public abstract class AbstractMetricsHandler implements RequestHandler {
 	 * @param metrics MetricStore containing all metrics
 	 * @return Map containing metrics, or null if no metric exists
 	 */
-	protected abstract Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics);
+	protected abstract Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics);
 
 	private String getMetricsValues(Map<String, String> pathParams, String requestedMetricsList) throws IOException {
 		if (requestedMetricsList.isEmpty()) {
@@ -73,7 +73,7 @@ public abstract class AbstractMetricsHandler implements RequestHandler {
 		}
 		MetricStore metricStore = fetcher.getMetricStore();
 		synchronized (metricStore) {
-			Map<String, Object> metrics = getMapFor(pathParams, metricStore);
+			Map<String, String> metrics = getMapFor(pathParams, metricStore);
 			if (metrics == null) {
 				return "";
 			}
@@ -102,7 +102,7 @@ public abstract class AbstractMetricsHandler implements RequestHandler {
 	private String getAvailableMetricsList(Map<String, String> pathParams) throws IOException {
 		MetricStore metricStore = fetcher.getMetricStore();
 		synchronized (metricStore) {
-			Map<String, Object> metrics = getMapFor(pathParams, metricStore);
+			Map<String, String> metrics = getMapFor(pathParams, metricStore);
 			if (metrics == null) {
 				return "";
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
index 7435643..54d6aea 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -36,7 +36,7 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler {
 	}
 
 	@Override
-	protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
 		MetricStore.JobManagerMetricStore jobManager = metrics.jobManager;
 		if (jobManager == null) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
index b54799d..cdaae2c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -38,7 +38,7 @@ public class JobMetricsHandler extends AbstractMetricsHandler {
 	}
 
 	@Override
-	protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
 		MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(PARAMETER_JOB_ID));
 		if (job == null) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
index 73b8bb0..1b92b47 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -38,7 +38,7 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler {
 	}
 
 	@Override
-	protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
 		MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID));
 		if (job == null) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
index 5df63c6..c1b2bec 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
@@ -129,11 +129,11 @@ public class MetricStore {
 		}
 	}
 
-	private void addMetric(Map<String, Object> target, String name, MetricDump metric) {
+	private void addMetric(Map<String, String> target, String name, MetricDump metric) {
 		switch (metric.getCategory()) {
 			case METRIC_CATEGORY_COUNTER:
 				MetricDump.CounterDump counter = (MetricDump.CounterDump) metric;
-				target.put(name, counter.count);
+				target.put(name, String.valueOf(counter.count));
 				break;
 			case METRIC_CATEGORY_GAUGE:
 				MetricDump.GaugeDump gauge = (MetricDump.GaugeDump) metric;
@@ -141,21 +141,21 @@ public class MetricStore {
 				break;
 			case METRIC_CATEGORY_HISTOGRAM:
 				MetricDump.HistogramDump histogram = (MetricDump.HistogramDump) metric;
-				target.put(name + "_min", histogram.min);
-				target.put(name + "_max", histogram.max);
-				target.put(name + "_mean", histogram.mean);
-				target.put(name + "_median", histogram.median);
-				target.put(name + "_stddev", histogram.stddev);
-				target.put(name + "_p75", histogram.p75);
-				target.put(name + "_p90", histogram.p90);
-				target.put(name + "_p95", histogram.p95);
-				target.put(name + "_p98", histogram.p98);
-				target.put(name + "_p99", histogram.p99);
-				target.put(name + "_p999", histogram.p999);
+				target.put(name + "_min", String.valueOf(histogram.min));
+				target.put(name + "_max", String.valueOf(histogram.max));
+				target.put(name + "_mean", String.valueOf(histogram.mean));
+				target.put(name + "_median", String.valueOf(histogram.median));
+				target.put(name + "_stddev", String.valueOf(histogram.stddev));
+				target.put(name + "_p75", String.valueOf(histogram.p75));
+				target.put(name + "_p90", String.valueOf(histogram.p90));
+				target.put(name + "_p95", String.valueOf(histogram.p95));
+				target.put(name + "_p98", String.valueOf(histogram.p98));
+				target.put(name + "_p99", String.valueOf(histogram.p99));
+				target.put(name + "_p999", String.valueOf(histogram.p999));
 				break;
 			case METRIC_CATEGORY_METER:
 				MetricDump.MeterDump meter = (MetricDump.MeterDump) metric;
-				target.put(name, meter.rate);
+				target.put(name, String.valueOf(meter.rate));
 				break;
 		}
 	}
@@ -164,21 +164,21 @@ public class MetricStore {
 	 * Sub-structure containing metrics of the JobManager.
 	 */
 	static class JobManagerMetricStore {
-		public final Map<String, Object> metrics = new HashMap<>();
+		public final Map<String, String> metrics = new HashMap<>();
 	}
 
 	/**
 	 * Sub-structure containing metrics of a single TaskManager.
 	 */
 	static class TaskManagerMetricStore {
-		public final Map<String, Object> metrics = new HashMap<>();
+		public final Map<String, String> metrics = new HashMap<>();
 	}
 
 	/**
 	 * Sub-structure containing metrics of a single Job.
 	 */
 	static class JobMetricStore {
-		public final Map<String, Object> metrics = new HashMap<>();
+		public final Map<String, String> metrics = new HashMap<>();
 		public final Map<String, TaskMetricStore> tasks = new HashMap<>();
 	}
 
@@ -186,6 +186,6 @@ public class MetricStore {
 	 * Sub-structure containing metrics of a single Task.
 	 */
 	static class TaskMetricStore {
-		public final Map<String, Object> metrics = new HashMap<>();
+		public final Map<String, String> metrics = new HashMap<>();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
index fea3d07..e4e8b00 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -38,7 +38,7 @@ public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
 	}
 
 	@Override
-	protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
+	protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) {
 		MetricStore.TaskManagerMetricStore taskManager = metrics.taskManagers.get(pathParams.get(PARAMETER_TM_ID));
 		if (taskManager == null) {
 			return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
index 9757574..d0ffc81 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -40,9 +40,9 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 
 		Map<String, String> pathParams = new HashMap<>();
 
-		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
 
-		assertEquals(0L, metrics.get("abc.metric1"));
+		assertEquals("0", metrics.get("abc.metric1"));
 	}
 
 	@Test
@@ -54,7 +54,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 
 		Map<String, String> pathParams = new HashMap<>();
 
-		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
 
 		assertNotNull(metrics);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
index c0cc345..9391dc0 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -42,9 +42,9 @@ public class JobMetricsHandlerTest extends TestLogger {
 		Map<String, String> pathParams = new HashMap<>();
 		pathParams.put(PARAMETER_JOB_ID, "jobid");
 
-		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
 
-		assertEquals(2L, metrics.get("abc.metric3"));
+		assertEquals("2", metrics.get("abc.metric3"));
 	}
 
 	@Test
@@ -56,7 +56,7 @@ public class JobMetricsHandlerTest extends TestLogger {
 
 		Map<String, String> pathParams = new HashMap<>();
 
-		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
 
 		assertNull(metrics);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
index d6e5ca7..a7f9084 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -44,11 +44,11 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 		pathParams.put(PARAMETER_JOB_ID, "jobid");
 		pathParams.put(PARAMETER_VERTEX_ID, "taskid");
 
-		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
 
-		assertEquals(3L, metrics.get("8.abc.metric4"));
+		assertEquals("3", metrics.get("8.abc.metric4"));
 
-		assertEquals(4L, metrics.get("8.opname.abc.metric5"));
+		assertEquals("4", metrics.get("8.opname.abc.metric5"));
 	}
 
 	@Test
@@ -60,7 +60,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 
 		Map<String, String> pathParams = new HashMap<>();
 
-		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
 
 		assertNull(metrics);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
index 356ce67..14cbeac 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -144,22 +144,22 @@ public class MetricFetcherTest extends TestLogger {
 		fetcher.update();
 		MetricStore store = fetcher.getMetricStore();
 		synchronized (store) {
-			assertEquals(7L, store.jobManager.metrics.get("abc.hist_min"));
-			assertEquals(6L, store.jobManager.metrics.get("abc.hist_max"));
-			assertEquals(4.0, store.jobManager.metrics.get("abc.hist_mean"));
-			assertEquals(0.5, store.jobManager.metrics.get("abc.hist_median"));
-			assertEquals(5.0, store.jobManager.metrics.get("abc.hist_stddev"));
-			assertEquals(0.75, store.jobManager.metrics.get("abc.hist_p75"));
-			assertEquals(0.9, store.jobManager.metrics.get("abc.hist_p90"));
-			assertEquals(0.95, store.jobManager.metrics.get("abc.hist_p95"));
-			assertEquals(0.98, store.jobManager.metrics.get("abc.hist_p98"));
-			assertEquals(0.99, store.jobManager.metrics.get("abc.hist_p99"));
-			assertEquals(0.999, store.jobManager.metrics.get("abc.hist_p999"));
+			assertEquals("7", store.jobManager.metrics.get("abc.hist_min"));
+			assertEquals("6", store.jobManager.metrics.get("abc.hist_max"));
+			assertEquals("4.0", store.jobManager.metrics.get("abc.hist_mean"));
+			assertEquals("0.5", store.jobManager.metrics.get("abc.hist_median"));
+			assertEquals("5.0", store.jobManager.metrics.get("abc.hist_stddev"));
+			assertEquals("0.75", store.jobManager.metrics.get("abc.hist_p75"));
+			assertEquals("0.9", store.jobManager.metrics.get("abc.hist_p90"));
+			assertEquals("0.95", store.jobManager.metrics.get("abc.hist_p95"));
+			assertEquals("0.98", store.jobManager.metrics.get("abc.hist_p98"));
+			assertEquals("0.99", store.jobManager.metrics.get("abc.hist_p99"));
+			assertEquals("0.999", store.jobManager.metrics.get("abc.hist_p999"));
 
 			assertEquals("x", store.taskManagers.get(tmID.toString()).metrics.get("abc.gauge"));
-			assertEquals(5.0, store.jobs.get(jobID.toString()).metrics.get("abc.jc"));
-			assertEquals(2L, store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc"));
-			assertEquals(1L, store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc"));
+			assertEquals("5.0", store.jobs.get(jobID.toString()).metrics.get("abc.jc"));
+			assertEquals("2", store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc"));
+			assertEquals("1", store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc"));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
index 9dc2929..ee46494 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
@@ -31,11 +31,11 @@ public class MetricStoreTest extends TestLogger {
 	public void testAdd() throws IOException {
 		MetricStore store = setupStore(new MetricStore());
 
-		assertEquals(0L, store.jobManager.metrics.get("abc.metric1"));
-		assertEquals(1L, store.taskManagers.get("tmid").metrics.get("abc.metric2"));
-		assertEquals(2L, store.jobs.get("jobid").metrics.get("abc.metric3"));
-		assertEquals(3L, store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4"));
-		assertEquals(4L, store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5"));
+		assertEquals("0", store.jobManager.metrics.get("abc.metric1"));
+		assertEquals("1", store.taskManagers.get("tmid").metrics.get("abc.metric2"));
+		assertEquals("2", store.jobs.get("jobid").metrics.get("abc.metric3"));
+		assertEquals("3", store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4"));
+		assertEquals("4", store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5"));
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e30e7a61/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
index 6299a56..a410404 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -42,9 +42,9 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
 		Map<String, String> pathParams = new HashMap<>();
 		pathParams.put(PARAMETER_TM_ID, "tmid");
 
-		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
 
-		assertEquals(1L, metrics.get("abc.metric2"));
+		assertEquals("1", metrics.get("abc.metric2"));
 	}
 
 	@Test
@@ -56,7 +56,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
 
 		Map<String, String> pathParams = new HashMap<>();
 
-		Map<String, Object> metrics = handler.getMapFor(pathParams, store);
+		Map<String, String> metrics = handler.getMapFor(pathParams, store);
 
 		assertNull(metrics);
 	}