You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/29 13:56:13 UTC

[4/9] flink git commit: [FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

[FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint

Migrate logic from
org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to
new handler. Add new handler to DispatcherRestEndpoint.

[FLINK-8143][flip6] Assert that SubtaskIndexPathParameter is mandatory

[FLINK-8143][flip6] Use path parameter constants in SubtaskMetricsHandlerTest

This closes #5082.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97f6b63b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97f6b63b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97f6b63b

Branch: refs/heads/master
Commit: 97f6b63b12ba9f3fab119e997c738b38d3adbef9
Parents: 6463685
Author: gyao <ga...@data-artisans.com>
Authored: Tue Nov 28 16:30:27 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:54 2017 +0100

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      | 10 +++
 .../job/metrics/JobVertexMetricsHandler.java    |  2 +
 .../job/metrics/SubtaskMetricsHandler.java      | 72 ++++++++++++++++++++
 .../messages/SubtaskIndexPathParameter.java     | 47 +++++++++++++
 .../job/metrics/SubtaskMetricsHeaders.java      | 53 ++++++++++++++
 .../SubtaskMetricsMessageParameters.java        | 60 ++++++++++++++++
 .../job/metrics/SubtaskMetricsHandlerTest.java  | 69 +++++++++++++++++++
 .../messages/SubtaskIndexPathParameterTest.java | 67 ++++++++++++++++++
 .../job/metrics/SubtaskMetricsHeadersTest.java  | 50 ++++++++++++++
 9 files changed, 430 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 9916371..8a26a9fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatis
 import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
 import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
@@ -72,6 +73,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisti
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
@@ -319,6 +321,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			responseHeaders,
 			metricFetcher);
 
+		final SubtaskMetricsHandler subtaskMetricsHandler = new SubtaskMetricsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			metricFetcher);
+
 		final TaskManagerMetricsHandler taskManagerMetricsHandler = new TaskManagerMetricsHandler(
 			restAddressFuture,
 			leaderRetriever,
@@ -369,6 +378,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
 		handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
 		handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
+		handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler));
 		handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
 		handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
index 4f83db2..f8f4702 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
@@ -39,6 +39,8 @@ import java.util.concurrent.CompletableFuture;
  * Handler that returns metrics given a {@link JobID} and {@link JobVertexID}.
  *
  * @see MetricStore#getTaskMetricStore(String, String)
+ * @deprecated This class is subsumed by {@link SubtaskMetricsHandler} and is only kept for
+ * backwards-compatibility.
  */
 public class JobVertexMetricsHandler extends AbstractMetricsHandler<JobVertexMetricsMessageParameters> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
new file mode 100644
index 0000000..cb3d864
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler that returns subtask metrics.
+ *
+ * @see MetricStore#getSubtaskMetricStore(String, String, int)
+ */
+public class SubtaskMetricsHandler extends AbstractMetricsHandler<SubtaskMetricsMessageParameters> {
+
+	public SubtaskMetricsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<DispatcherGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> headers,
+			MetricFetcher metricFetcher) {
+
+		super(localRestAddress, leaderRetriever, timeout, headers, SubtaskMetricsHeaders.getInstance(),
+			metricFetcher);
+	}
+
+	@Nullable
+	@Override
+	protected MetricStore.ComponentMetricStore getComponentMetricStore(
+			HandlerRequest<EmptyRequestBody, SubtaskMetricsMessageParameters> request,
+			MetricStore metricStore) {
+
+		final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+		final JobVertexID vertexId = request.getPathParameter(JobVertexIdPathParameter.class);
+		final int subtaskIndex = request.getPathParameter(SubtaskIndexPathParameter.class);
+
+		return metricStore.getSubtaskMetricStore(jobId.toString(), vertexId.toString(), subtaskIndex);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java
new file mode 100644
index 0000000..e8f2268
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Path parameter specifying the index of a subtask.
+ */
+public class SubtaskIndexPathParameter extends MessagePathParameter<Integer> {
+
+	public static final String KEY = "subtaskindex";
+
+	public SubtaskIndexPathParameter() {
+		super(KEY);
+	}
+
+	@Override
+	protected Integer convertFromString(final String value) throws ConversionException {
+		final int subtaskIndex = Integer.parseInt(value);
+		if (subtaskIndex >= 0) {
+			return subtaskIndex;
+		} else {
+			throw new ConversionException("subtaskindex must be positive, was: " + subtaskIndex);
+		}
+	}
+
+	@Override
+	protected String convertToString(final Integer value) {
+		return value.toString();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java
new file mode 100644
index 0000000..5471020
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+/**
+ * {@link MessageHeaders} for {@link SubtaskMetricsHandler}.
+ */
+public final class SubtaskMetricsHeaders extends
+	AbstractMetricsHeaders<SubtaskMetricsMessageParameters> {
+
+	private static final SubtaskMetricsHeaders INSTANCE = new SubtaskMetricsHeaders();
+
+	private SubtaskMetricsHeaders() {
+	}
+
+	@Override
+	public SubtaskMetricsMessageParameters getUnresolvedMessageParameters() {
+		return new SubtaskMetricsMessageParameters();
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY +
+			"/subtasks/:" + SubtaskIndexPathParameter.KEY + "/metrics";
+	}
+
+	public static SubtaskMetricsHeaders getInstance() {
+		return INSTANCE;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
new file mode 100644
index 0000000..bdfa003
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for {@link SubtaskMetricsHandler}.
+ */
+public class SubtaskMetricsMessageParameters extends MessageParameters {
+
+	private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter();
+
+	private final JobVertexIdPathParameter jobVertexIdPathParameter = new JobVertexIdPathParameter();
+
+	private final SubtaskIndexPathParameter subtaskIndexPathParameter = new SubtaskIndexPathParameter();
+
+	private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Collections.unmodifiableCollection(Arrays.asList(
+			jobIDPathParameter,
+			jobVertexIdPathParameter,
+			subtaskIndexPathParameter
+		));
+	}
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.singletonList(metricsFilterParameter);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
new file mode 100644
index 0000000..e38ff8a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Tests for {@link SubtaskMetricsHandler}.
+ */
+public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMetricsHandler> {
+
+	private static final String TEST_JOB_ID = new JobID().toString();
+
+	private static final String TEST_VERTEX_ID = new JobVertexID().toString();
+
+	private static final int TEST_SUBTASK_INDEX = 0;
+
+	@Override
+	SubtaskMetricsHandler getMetricsHandler() {
+		return new SubtaskMetricsHandler(
+			CompletableFuture.completedFuture("localhost:12345"),
+			leaderRetriever,
+			TIMEOUT,
+			TEST_HEADERS,
+			mockMetricFetcher
+		);
+	}
+
+	@Override
+	QueryScopeInfo getQueryScopeInfo() {
+		return new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID, TEST_VERTEX_ID,
+			TEST_SUBTASK_INDEX);
+	}
+
+	@Override
+	Map<String, String> getPathParameters() {
+		final Map<String, String> pathParameters = new HashMap<>();
+		pathParameters.put(JobIDPathParameter.KEY, TEST_JOB_ID);
+		pathParameters.put(JobVertexIdPathParameter.KEY, TEST_VERTEX_ID);
+		pathParameters.put(SubtaskIndexPathParameter.KEY, Integer.toString(TEST_SUBTASK_INDEX));
+		return pathParameters;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java
new file mode 100644
index 0000000..f018fd4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link SubtaskIndexPathParameter}.
+ */
+public class SubtaskIndexPathParameterTest {
+
+	private SubtaskIndexPathParameter subtaskIndexPathParameter;
+
+	@Before
+	public void setUp() {
+		subtaskIndexPathParameter = new SubtaskIndexPathParameter();
+	}
+
+	@Test
+	public void testConversionFromString() throws Exception {
+		assertThat(subtaskIndexPathParameter.convertFromString("2147483647"), equalTo(Integer.MAX_VALUE));
+	}
+
+	@Test
+	public void testConversionFromStringNegativeNumber() throws Exception {
+		try {
+			subtaskIndexPathParameter.convertFromString("-2147483648");
+			fail("Expected exception not thrown");
+		} catch (final ConversionException e) {
+			assertThat(e.getMessage(), equalTo("subtaskindex must be positive, was: " + Integer
+				.MIN_VALUE));
+		}
+	}
+
+	@Test
+	public void testConvertToString() throws Exception {
+		assertThat(subtaskIndexPathParameter.convertToString(Integer.MAX_VALUE), equalTo("2147483647"));
+	}
+
+	@Test
+	public void testIsMandatoryParameter() {
+		assertTrue(subtaskIndexPathParameter.isMandatory());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
new file mode 100644
index 0000000..345ad74
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link SubtaskMetricsHeaders}.
+ */
+public class SubtaskMetricsHeadersTest {
+
+	private final SubtaskMetricsHeaders subtaskMetricsHeaders = SubtaskMetricsHeaders.getInstance();
+
+	@Test
+	public void testUrl() {
+		assertThat(subtaskMetricsHeaders.getTargetRestEndpointURL(),
+			equalTo("/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY +
+				"/subtasks/:" + SubtaskIndexPathParameter.KEY + "/metrics"));
+	}
+
+	@Test
+	public void testMessageParameters() {
+		assertThat(subtaskMetricsHeaders.getUnresolvedMessageParameters(),
+			instanceOf(SubtaskMetricsMessageParameters.class));
+	}
+}