You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/29 13:56:13 UTC
[4/9] flink git commit: [FLINK-8143][flip6] Migrate
SubtaskMetricsHandler to new RestServerEndpoint
[FLINK-8143][flip6] Migrate SubtaskMetricsHandler to new RestServerEndpoint
Migrate logic from
org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler to
new handler. Add new handler to DispatcherRestEndpoint.
[FLINK-8143][flip6] Assert that SubtaskIndexPathParameter is mandatory
[FLINK-8143][flip6] Use path parameter constants in SubtaskMetricsHandlerTest
This closes #5082.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97f6b63b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97f6b63b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97f6b63b
Branch: refs/heads/master
Commit: 97f6b63b12ba9f3fab119e997c738b38d3adbef9
Parents: 6463685
Author: gyao <ga...@data-artisans.com>
Authored: Tue Nov 28 16:30:27 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:54 2017 +0100
----------------------------------------------------------------------
.../dispatcher/DispatcherRestEndpoint.java | 10 +++
.../job/metrics/JobVertexMetricsHandler.java | 2 +
.../job/metrics/SubtaskMetricsHandler.java | 72 ++++++++++++++++++++
.../messages/SubtaskIndexPathParameter.java | 47 +++++++++++++
.../job/metrics/SubtaskMetricsHeaders.java | 53 ++++++++++++++
.../SubtaskMetricsMessageParameters.java | 60 ++++++++++++++++
.../job/metrics/SubtaskMetricsHandlerTest.java | 69 +++++++++++++++++++
.../messages/SubtaskIndexPathParameterTest.java | 67 ++++++++++++++++++
.../job/metrics/SubtaskMetricsHeadersTest.java | 50 ++++++++++++++
9 files changed, 430 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 9916371..8a26a9fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatis
import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
@@ -72,6 +73,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisti
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.TaskManagerMetricsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
@@ -319,6 +321,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
responseHeaders,
metricFetcher);
+ final SubtaskMetricsHandler subtaskMetricsHandler = new SubtaskMetricsHandler(
+ restAddressFuture,
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ metricFetcher);
+
final TaskManagerMetricsHandler taskManagerMetricsHandler = new TaskManagerMetricsHandler(
restAddressFuture,
leaderRetriever,
@@ -369,6 +378,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
handlers.add(Tuple2.of(SubtasksTimesHeaders.getInstance(), subtasksTimesHandler));
handlers.add(Tuple2.of(JobVertexMetricsHeaders.getInstance(), jobVertexMetricsHandler));
+ handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler));
handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
index 4f83db2..f8f4702 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/JobVertexMetricsHandler.java
@@ -39,6 +39,8 @@ import java.util.concurrent.CompletableFuture;
* Handler that returns metrics given a {@link JobID} and {@link JobVertexID}.
*
* @see MetricStore#getTaskMetricStore(String, String)
+ * @deprecated This class is subsumed by {@link SubtaskMetricsHandler} and is only kept for
+ * backwards-compatibility.
*/
public class JobVertexMetricsHandler extends AbstractMetricsHandler<JobVertexMetricsMessageParameters> {
http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
new file mode 100644
index 0000000..cb3d864
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.SubtaskMetricsMessageParameters;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Handler that returns subtask metrics.
+ *
+ * @see MetricStore#getSubtaskMetricStore(String, String, int)
+ */
+public class SubtaskMetricsHandler extends AbstractMetricsHandler<SubtaskMetricsMessageParameters> {
+
+ public SubtaskMetricsHandler(
+ CompletableFuture<String> localRestAddress,
+ GatewayRetriever<DispatcherGateway> leaderRetriever,
+ Time timeout,
+ Map<String, String> headers,
+ MetricFetcher metricFetcher) {
+
+ super(localRestAddress, leaderRetriever, timeout, headers, SubtaskMetricsHeaders.getInstance(),
+ metricFetcher);
+ }
+
+ @Nullable
+ @Override
+ protected MetricStore.ComponentMetricStore getComponentMetricStore(
+ HandlerRequest<EmptyRequestBody, SubtaskMetricsMessageParameters> request,
+ MetricStore metricStore) {
+
+ final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+ final JobVertexID vertexId = request.getPathParameter(JobVertexIdPathParameter.class);
+ final int subtaskIndex = request.getPathParameter(SubtaskIndexPathParameter.class);
+
+ return metricStore.getSubtaskMetricStore(jobId.toString(), vertexId.toString(), subtaskIndex);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java
new file mode 100644
index 0000000..e8f2268
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Path parameter specifying the index of a subtask.
+ */
+public class SubtaskIndexPathParameter extends MessagePathParameter<Integer> {
+
+ public static final String KEY = "subtaskindex";
+
+ public SubtaskIndexPathParameter() {
+ super(KEY);
+ }
+
+ @Override
+ protected Integer convertFromString(final String value) throws ConversionException {
+ final int subtaskIndex = Integer.parseInt(value);
+ if (subtaskIndex >= 0) {
+ return subtaskIndex;
+ } else {
+ throw new ConversionException("subtaskindex must be positive, was: " + subtaskIndex);
+ }
+ }
+
+ @Override
+ protected String convertToString(final Integer value) {
+ return value.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java
new file mode 100644
index 0000000..5471020
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeaders.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+/**
+ * {@link MessageHeaders} for {@link SubtaskMetricsHandler}.
+ */
+public final class SubtaskMetricsHeaders extends
+ AbstractMetricsHeaders<SubtaskMetricsMessageParameters> {
+
+ private static final SubtaskMetricsHeaders INSTANCE = new SubtaskMetricsHeaders();
+
+ private SubtaskMetricsHeaders() {
+ }
+
+ @Override
+ public SubtaskMetricsMessageParameters getUnresolvedMessageParameters() {
+ return new SubtaskMetricsMessageParameters();
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return "/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY +
+ "/subtasks/:" + SubtaskIndexPathParameter.KEY + "/metrics";
+ }
+
+ public static SubtaskMetricsHeaders getInstance() {
+ return INSTANCE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
new file mode 100644
index 0000000..bdfa003
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for {@link SubtaskMetricsHandler}.
+ */
+public class SubtaskMetricsMessageParameters extends MessageParameters {
+
+ private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter();
+
+ private final JobVertexIdPathParameter jobVertexIdPathParameter = new JobVertexIdPathParameter();
+
+ private final SubtaskIndexPathParameter subtaskIndexPathParameter = new SubtaskIndexPathParameter();
+
+ private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Collections.unmodifiableCollection(Arrays.asList(
+ jobIDPathParameter,
+ jobVertexIdPathParameter,
+ subtaskIndexPathParameter
+ ));
+ }
+
+ @Override
+ public Collection<MessageQueryParameter<?>> getQueryParameters() {
+ return Collections.singletonList(metricsFilterParameter);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
new file mode 100644
index 0000000..e38ff8a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/SubtaskMetricsHandlerTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Tests for {@link SubtaskMetricsHandler}.
+ */
+public class SubtaskMetricsHandlerTest extends MetricsHandlerTestBase<SubtaskMetricsHandler> {
+
+ private static final String TEST_JOB_ID = new JobID().toString();
+
+ private static final String TEST_VERTEX_ID = new JobVertexID().toString();
+
+ private static final int TEST_SUBTASK_INDEX = 0;
+
+ @Override
+ SubtaskMetricsHandler getMetricsHandler() {
+ return new SubtaskMetricsHandler(
+ CompletableFuture.completedFuture("localhost:12345"),
+ leaderRetriever,
+ TIMEOUT,
+ TEST_HEADERS,
+ mockMetricFetcher
+ );
+ }
+
+ @Override
+ QueryScopeInfo getQueryScopeInfo() {
+ return new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID, TEST_VERTEX_ID,
+ TEST_SUBTASK_INDEX);
+ }
+
+ @Override
+ Map<String, String> getPathParameters() {
+ final Map<String, String> pathParameters = new HashMap<>();
+ pathParameters.put(JobIDPathParameter.KEY, TEST_JOB_ID);
+ pathParameters.put(JobVertexIdPathParameter.KEY, TEST_VERTEX_ID);
+ pathParameters.put(SubtaskIndexPathParameter.KEY, Integer.toString(TEST_SUBTASK_INDEX));
+ return pathParameters;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java
new file mode 100644
index 0000000..f018fd4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtaskIndexPathParameterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link SubtaskIndexPathParameter}.
+ */
+public class SubtaskIndexPathParameterTest {
+
+ private SubtaskIndexPathParameter subtaskIndexPathParameter;
+
+ @Before
+ public void setUp() {
+ subtaskIndexPathParameter = new SubtaskIndexPathParameter();
+ }
+
+ @Test
+ public void testConversionFromString() throws Exception {
+ assertThat(subtaskIndexPathParameter.convertFromString("2147483647"), equalTo(Integer.MAX_VALUE));
+ }
+
+ @Test
+ public void testConversionFromStringNegativeNumber() throws Exception {
+ try {
+ subtaskIndexPathParameter.convertFromString("-2147483648");
+ fail("Expected exception not thrown");
+ } catch (final ConversionException e) {
+ assertThat(e.getMessage(), equalTo("subtaskindex must be positive, was: " + Integer
+ .MIN_VALUE));
+ }
+ }
+
+ @Test
+ public void testConvertToString() throws Exception {
+ assertThat(subtaskIndexPathParameter.convertToString(Integer.MAX_VALUE), equalTo("2147483647"));
+ }
+
+ @Test
+ public void testIsMandatoryParameter() {
+ assertTrue(subtaskIndexPathParameter.isMandatory());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/97f6b63b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
new file mode 100644
index 0000000..345ad74
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link SubtaskMetricsHeaders}.
+ */
+public class SubtaskMetricsHeadersTest {
+
+ private final SubtaskMetricsHeaders subtaskMetricsHeaders = SubtaskMetricsHeaders.getInstance();
+
+ @Test
+ public void testUrl() {
+ assertThat(subtaskMetricsHeaders.getTargetRestEndpointURL(),
+ equalTo("/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY +
+ "/subtasks/:" + SubtaskIndexPathParameter.KEY + "/metrics"));
+ }
+
+ @Test
+ public void testMessageParameters() {
+ assertThat(subtaskMetricsHeaders.getUnresolvedMessageParameters(),
+ instanceOf(SubtaskMetricsMessageParameters.class));
+ }
+}