You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/29 13:56:11 UTC

[2/9] flink git commit: [FLINK-8150] [flip6] Add TaskManagerIdPathParameterTest

[FLINK-8150] [flip6] Add TaskManagerIdPathParameterTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bf67b46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bf67b46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bf67b46

Branch: refs/heads/master
Commit: 3bf67b46567baa84c8c1a1e0b5ec282227bcb0bc
Parents: dc7ab13
Author: gyao <ga...@data-artisans.com>
Authored: Tue Nov 28 13:53:51 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 29 14:52:53 2017 +0100

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        | 10 ++--
 .../taskmanager/TaskManagerDetailsHandler.java  | 37 +++++++------
 .../messages/json/ResourceIDDeserializer.java   |  1 +
 .../messages/json/ResourceIDSerializer.java     |  3 +-
 .../taskmanager/TaskManagerDetailsInfo.java     |  2 +-
 .../taskmanager/TaskManagerIdPathParameter.java |  3 +-
 .../messages/taskmanager/TaskManagerInfo.java   |  6 +--
 .../ResourceManagerTaskExecutorTest.java        | 13 ++++-
 .../TaskManagerIdPathParameterTest.java         | 56 ++++++++++++++++++++
 9 files changed, 100 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 4220ebf..a0ff5f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -503,14 +503,15 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		final ArrayList<TaskManagerInfo> taskManagerInfos = new ArrayList<>(taskExecutors.size());
 
 		for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> taskExecutorEntry : taskExecutors.entrySet()) {
+			final ResourceID resourceId = taskExecutorEntry.getKey();
 			final WorkerRegistration<WorkerType> taskExecutor = taskExecutorEntry.getValue();
 
 			taskManagerInfos.add(
 				new TaskManagerInfo(
-					taskExecutorEntry.getKey(),
+					resourceId,
 					taskExecutor.getTaskExecutorGateway().getAddress(),
 					taskExecutor.getDataPort(),
-					taskManagerHeartbeatManager.getLastHeartbeatFrom(taskExecutorEntry.getKey()),
+					taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
 					slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
 					slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
 					taskExecutor.getHardwareDescription()));
@@ -527,13 +528,14 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		if (taskExecutor == null) {
 			return FutureUtils.completedExceptionally(new FlinkException("Requested TaskManager " + resourceId + " is not registered."));
 		} else {
+			final InstanceID instanceId = taskExecutor.getInstanceID();
 			final TaskManagerInfo taskManagerInfo = new TaskManagerInfo(
 				resourceId,
 				taskExecutor.getTaskExecutorGateway().getAddress(),
 				taskExecutor.getDataPort(),
 				taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
-				slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
-				slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
+				slotManager.getNumberRegisteredSlotsOf(instanceId),
+				slotManager.getNumberFreeSlotsOf(instanceId),
 				taskExecutor.getHardwareDescription());
 
 			return CompletableFuture.completedFuture(taskManagerInfo);

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
index b8c1a60..e66a61a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerDetailsHandler.java
@@ -71,32 +71,31 @@ public class TaskManagerDetailsHandler<T extends RestfulGateway> extends Abstrac
 	protected CompletableFuture<TaskManagerDetailsInfo> handleRequest(
 			@Nonnull HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> request,
 			@Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
-		final ResourceID taskManagerInstanceId = request.getPathParameter(TaskManagerIdPathParameter.class);
+		final ResourceID taskManagerResourceId = request.getPathParameter(TaskManagerIdPathParameter
+			.class);
 
-		CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerInstanceId, timeout);
+		CompletableFuture<TaskManagerInfo> taskManagerInfoFuture = gateway.requestTaskManagerInfo(taskManagerResourceId, timeout);
 
 		metricFetcher.update();
 
 		return taskManagerInfoFuture.thenApply(
 			(TaskManagerInfo taskManagerInfo) -> {
-				// the MetricStore is not yet thread safe, therefore we still have to synchronize it
-				synchronized (metricStore) {
-					final MetricStore.TaskManagerMetricStore tmMetrics = metricStore.getTaskManagerMetricStore(taskManagerInstanceId.toString());
-
-					final TaskManagerMetricsInfo taskManagerMetricsInfo;
-
-					if (tmMetrics != null) {
-						log.debug("Create metrics info for TaskManager {}.", taskManagerInstanceId);
-						taskManagerMetricsInfo = createTaskManagerMetricsInfo(tmMetrics);
-					} else {
-						log.debug("No metrics for TaskManager {}.", taskManagerInstanceId);
-						taskManagerMetricsInfo = TaskManagerMetricsInfo.empty();
-					}
-
-					return new TaskManagerDetailsInfo(
-						taskManagerInfo,
-						taskManagerMetricsInfo);
+				final MetricStore.TaskManagerMetricStore tmMetrics =
+					metricStore.getTaskManagerMetricStore(taskManagerResourceId.getResourceIdString());
+
+				final TaskManagerMetricsInfo taskManagerMetricsInfo;
+
+				if (tmMetrics != null) {
+					log.debug("Create metrics info for TaskManager {}.", taskManagerResourceId);
+					taskManagerMetricsInfo = createTaskManagerMetricsInfo(tmMetrics);
+				} else {
+					log.debug("No metrics for TaskManager {}.", taskManagerResourceId);
+					taskManagerMetricsInfo = TaskManagerMetricsInfo.empty();
 				}
+
+				return new TaskManagerDetailsInfo(
+					taskManagerInfo,
+					taskManagerMetricsInfo);
 			});
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
index cc0d086..0e0feb4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDDeserializer.java
@@ -41,4 +41,5 @@ public class ResourceIDDeserializer extends StdDeserializer<ResourceID> {
 	public ResourceID deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
 		return new ResourceID(p.getValueAsString());
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
index fa95451..970d95f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/ResourceIDSerializer.java
@@ -40,6 +40,7 @@ public class ResourceIDSerializer extends StdSerializer<ResourceID> {
 
 	@Override
 	public void serialize(ResourceID value, JsonGenerator gen, SerializerProvider provider) throws IOException {
-		gen.writeString(value.toString());
+		gen.writeString(value.getResourceIdString());
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
index 3b0a68f..f830ac8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerDetailsInfo.java
@@ -43,7 +43,7 @@ public class TaskManagerDetailsInfo extends TaskManagerInfo {
 
 	@JsonCreator
 	public TaskManagerDetailsInfo(
-			@JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
+			@JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_RESOURCE_ID) ResourceID resourceId,
 			@JsonProperty(FIELD_NAME_ADDRESS) String address,
 			@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
 			@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
index b0cd2b5..7456fee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameter.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rest.messages.taskmanager;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.rest.messages.ConversionException;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 
 /**
@@ -34,7 +33,7 @@ public class TaskManagerIdPathParameter extends MessagePathParameter<ResourceID>
 	}
 
 	@Override
-	protected ResourceID convertFromString(String value) throws ConversionException {
+	protected ResourceID convertFromString(String value) {
 		return new ResourceID(value);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
index 1ad7e7d..e1452e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerInfo.java
@@ -38,7 +38,7 @@ import java.util.Objects;
  */
 public class TaskManagerInfo implements ResponseBody {
 
-	public static final String FIELD_NAME_INSTANCE_ID = "id";
+	public static final String FIELD_NAME_RESOURCE_ID = "id";
 
 	public static final String FIELD_NAME_ADDRESS = "path";
 
@@ -52,7 +52,7 @@ public class TaskManagerInfo implements ResponseBody {
 
 	public static final String FIELD_NAME_HARDWARE = "hardware";
 
-	@JsonProperty(FIELD_NAME_INSTANCE_ID)
+	@JsonProperty(FIELD_NAME_RESOURCE_ID)
 	@JsonSerialize(using = ResourceIDSerializer.class)
 	private final ResourceID resourceId;
 
@@ -76,7 +76,7 @@ public class TaskManagerInfo implements ResponseBody {
 
 	@JsonCreator
 	public TaskManagerInfo(
-			@JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_INSTANCE_ID) ResourceID resourceId,
+			@JsonDeserialize(using = ResourceIDDeserializer.class) @JsonProperty(FIELD_NAME_RESOURCE_ID) ResourceID resourceId,
 			@JsonProperty(FIELD_NAME_ADDRESS) String address,
 			@JsonProperty(FIELD_NAME_DATA_PORT) int dataPort,
 			@JsonProperty(FIELD_NAME_LAST_HEARTBEAT) long lastHeartbeat,

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 524c503..66d3ad6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -48,10 +49,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ResourceManagerTaskExecutorTest extends TestLogger {
 
@@ -101,7 +105,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	}
 
 	/**
-	 * Test receive normal registration from task executor and receive duplicate registration from task executor
+	 * Test receive normal registration from task executor and receive duplicate registration
+	 * from task executor.
 	 */
 	@Test
 	public void testRegisterTaskExecutor() throws Exception {
@@ -111,6 +116,10 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, dataPort, hardwareDescription, timeout);
 			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
+			final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo(
+				taskExecutorResourceID,
+				timeout).get();
+			assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID));
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
 			CompletableFuture<RegistrationResponse> duplicateFuture =
@@ -168,6 +177,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private ResourceID mockTaskExecutor(String taskExecutorAddress) {
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.getAddress()).thenReturn(taskExecutorAddress);
+
 		ResourceID taskExecutorResourceID = ResourceID.generate();
 		rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway);
 		return taskExecutorResourceID;

http://git-wip-us.apache.org/repos/asf/flink/blob/3bf67b46/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
new file mode 100644
index 0000000..f9c234f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link TaskManagerIdPathParameter}.
+ */
+public class TaskManagerIdPathParameterTest {
+
+	private TaskManagerIdPathParameter taskManagerIdPathParameter;
+
+	@Before
+	public void setUp() {
+		taskManagerIdPathParameter = new TaskManagerIdPathParameter();
+	}
+
+	@Test
+	public void testConversions() {
+		final String resourceIdString = "foo";
+		final ResourceID resourceId = taskManagerIdPathParameter.convertFromString(resourceIdString);
+		assertThat(resourceId.getResourceIdString(), equalTo(resourceIdString));
+
+		assertThat(taskManagerIdPathParameter.convertToString(resourceId), equalTo(resourceIdString));
+	}
+
+	@Test
+	public void testIsMandatory() {
+		assertTrue(taskManagerIdPathParameter.isMandatory());
+	}
+
+}