You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/29 13:56:11 UTC
[2/9] flink git commit: [FLINK-8150] [flip6] Add
TaskManagerIdPathParameterTest
[FLINK-8150] [flip6] Add TaskManagerIdPathParameterTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bf67b46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bf67b46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bf67b46
Branch: refs/heads/master
Commit: 3bf67b46567baa84c8c1a1e0b5ec282227bcb0bc
Parents: dc7ab13
Author: gyao <ga...@data-artisans.com>
Authored: Tue Nov 28 13:53:51 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:53 2017 +0100
----------------------------------------------------------------------
.../resourcemanager/ResourceManager.java | 10 ++--
.../taskmanager/TaskManagerDetailsHandler.java | 37 +++++++------
.../messages/json/ResourceIDDeserializer.java | 1 +
.../messages/json/ResourceIDSerializer.java | 3 +-
.../taskmanager/TaskManagerDetailsInfo.java | 2 +-
.../taskmanager/TaskManagerIdPathParameter.java | 3 +-
.../messages/taskmanager/TaskManagerInfo.java | 6 +--
.../ResourceManagerTaskExecutorTest.java | 13 ++++-
.../TaskManagerIdPathParameterTest.java | 56 ++++++++++++++++++++
9 files changed, 100 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 4220ebf..a0ff5f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -503,14 +503,15 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
final ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size());
for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry : taskExecutors.entrySet()) {
+ final ResourceID resourceId = taskExecutorEntry.getKey();
final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();
taskManagerInfos.add(
new TaskManagerInfo(
- taskExecutorEntry.getKey(),
+ resourceId,
taskExecutor.getTaskExecutorGateway().getAddress(),
taskExecutor.getDataPort(),
- taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
+ taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
taskExecutor.getHardwareDescription()));
@@ -527,13 +528,14 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
if (taskExecutor == null) {
return FutureUtils.completedExceptionally(new FlinkException("Requested TaskManager " + resourceId + " is not registered."));
} else {
+ final InstanceID instanceId = taskExecutor.getInstanceID();
final TaskManagerInfo taskManagerInfo = new TaskManagerInfo(
resourceId,
taskExecutor.getTaskExecutorGateway().getAddress(),
taskExecutor.getDataPort(),
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
- slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
- slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
+ slotManager.getNumberRegisteredSlotsOf(instanceId),
+ slotManager.getNumberFreeSlotsOf(instanceId),
taskExecutor.getHardwareDescription());
return CompletableFuture.completedFuture(taskManagerInfo);
http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
index b8c1a60..e66a61a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
@@ -71,32 +71,31 @@ public class TaskManagerDetailsHandler<T extends RestfulGateway> extends Abstrac
protected CompletableFuture<TaskManagerDetailsInfo> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> request,
@Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
- final ResourceID taskManagerInstanceId = request.getPathParameter(TaskManagerIdPathParameter.class);
+ final ResourceID taskManagerResourceId = request.getPathParameter(TaskManagerIdPathParameter
+ .class);
- CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerInstanceId, timeout);
+ CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerResourceId, timeout);
metricFetcher.update();
return taskManagerInfoFuture.thenApply(
(TaskManagerInfo taskManagerInfo) -> {
- // the MetricStore is not yet thread safe, therefore we still have to synchronize it
- synchronized (metricStore) {
- final MetricStore.TaskManagerMetricStore tmMetrics = metricStore.getTaskManagerMetricStore(taskManagerInstanceId.toString());
-
- final TaskManagerMetricsInfo taskManagerMetricsInfo;
-
- if (tmMetrics != null) {
- log.debug("Create metrics info for TaskManager {}.", taskManagerInstanceId);
- taskManagerMetricsInfo = createTaskManagerMetricsInfo(tmMetrics);
- } else {
- log.debug("No metrics for TaskManager {}.", taskManagerInstanceId);
- taskManagerMetricsInfo = TaskManagerMetricsInfo.empty();
- }
-
- return new TaskManagerDetailsInfo(
- taskManagerInfo,
- taskManagerMetricsInfo);
+ final MetricStore.TaskManagerMetricStore tmMetrics =
+ metricStore.getTaskManagerMetricStore(taskManagerResourceId.getResourceIdString());
+
+ final TaskManagerMetricsInfo taskManagerMetricsInfo;
+
+ if (tmMetrics != null) {
+ log.debug("Create metrics info for TaskManager {}.", taskManagerResourceId);
+ taskManagerMetricsInfo = createTaskManagerMetricsInfo(tmMetrics);
+ } else {
+ log.debug("No metrics for TaskManager {}.", taskManagerResourceId);
+ taskManagerMetricsInfo = TaskManagerMetricsInfo.empty();
}
+
+ return new TaskManagerDetailsInfo(
+ taskManagerInfo,
+ taskManagerMetricsInfo);
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
index cc0d086..0e0feb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
@@ -41,4 +41,5 @@ public class ResourceIDDeserializer extends StdDeserializer<ResourceID> {
public ResourceID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
return new ResourceID(p.getValueAsString());
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
index fa95451..970d95f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
@@ -40,6 +40,7 @@ public class ResourceIDSerializer extends StdSerializer<ResourceID> {
@Override
public void serialize(ResourceID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
- gen.writeString(value.toString());
+ gen.writeString(value.getResourceIdString());
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
index 3b0a68f..f830ac8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -43,7 +43,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
@JsonCreator
public TaskManagerDetailsInfo(
- @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
+ @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_RESOURCE_ID) ResourceID resourceId,
@JsonProperty(FIELD_NAME_ADDRESS) String address,
@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
index b0cd2b5..7456fee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.rest.messages.taskmanager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.rest.messages.ConversionException;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
/**
@@ -34,7 +33,7 @@ public class TaskManagerIdPathParameter extends MessagePathParameter<ResourceID>
}
@Override
- protected ResourceID convertFromString(String value) throws ConversionException {
+ protected ResourceID convertFromString(String value) {
return new ResourceID(value);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index 1ad7e7d..e1452e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -38,7 +38,7 @@ import java.util.Objects;
*/
public class TaskManagerInfo implements ResponseBody {
- public static final String FIELD_NAME_INSTANCE_ID = "id";
+ public static final String FIELD_NAME_RESOURCE_ID = "id";
public static final String FIELD_NAME_ADDRESS = "path";
@@ -52,7 +52,7 @@ public class TaskManagerInfo implements ResponseBody {
public static final String FIELD_NAME_HARDWARE = "hardware";
- @JsonProperty(FIELD_NAME_INSTANCE_ID)
+ @JsonProperty(FIELD_NAME_RESOURCE_ID)
@JsonSerialize(using = ResourceIDSerializer.class)
private final ResourceID resourceId;
@@ -76,7 +76,7 @@ public class TaskManagerInfo implements ResponseBody {
@JsonCreator
public TaskManagerInfo(
- @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
+ @JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_RESOURCE_ID) ResourceID resourceId,
@JsonProperty(FIELD_NAME_ADDRESS) String address,
@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,
http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 524c503..66d3ad6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -48,10 +49,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class ResourceManagerTaskExecutorTest extends TestLogger {
@@ -101,7 +105,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
}
/**
- * Test receive normal registration from task executor and receive duplicate registration from task executor
+ * Test receive normal registration from task executor and receive duplicate registration
+ * from task executor.
*/
@Test
public void testRegisterTaskExecutor() throws Exception {
@@ -111,6 +116,10 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+ final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
+ taskExecutorResourceID,
+ timeout).get();
+ assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
CompletableFuture<RegistrationResponse> duplicateFuture =
@@ -168,6 +177,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
private ResourceID mockTaskExecutor(String taskExecutorAddress) {
TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+ when(taskExecutorGateway.getAddress()).thenReturn(taskExecutorAddress);
+
ResourceID taskExecutorResourceID = ResourceID.generate();
rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway);
return taskExecutorResourceID;
http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
new file mode 100644
index 0000000..f9c234f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link TaskManagerIdPathParameter}.
+ */
+public class TaskManagerIdPathParameterTest {
+
+ private TaskManagerIdPathParameter taskManagerIdPathParameter;
+
+ @Before
+ public void setUp() {
+ taskManagerIdPathParameter = new TaskManagerIdPathParameter();
+ }
+
+ @Test
+ public void testConversions() {
+ final String resourceIdString = "foo";
+ final ResourceID resourceId = taskManagerIdPathParameter.convertFromString(resourceIdString);
+ assertThat(resourceId.getResourceIdString(), equalTo(resourceIdString));
+
+ assertThat(taskManagerIdPathParameter.convertToString(resourceId), equalTo(resourceIdString));
+ }
+
+ @Test
+ public void testIsMandatory() {
+ assertTrue(taskManagerIdPathParameter.isMandatory());
+ }
+
+}