You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/04 16:49:15 UTC

[3/4] flink git commit: [hotfix] [runtime] Add resourceId to TaskManager registration messages

[hotfix] [runtime] Add resourceId to TaskManager registration messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ae81d4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ae81d4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ae81d4e

Branch: refs/heads/master
Commit: 6ae81d4e995ad637f0856367437a8f5d526e7bae
Parents: ae8dc5f
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 3 21:50:48 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 4 18:48:16 2018 +0200

----------------------------------------------------------------------
 .../registration/TaskExecutorConnection.java    | 10 +++-
 .../registration/WorkerRegistration.java        |  4 +-
 .../slotmanager/SlotManager.java                |  2 +-
 .../slotmanager/SlotManagerTest.java            | 61 +++++++++++---------
 .../slotmanager/SlotProtocolTest.java           |  9 +--
 5 files changed, 52 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6ae81d4e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
index babe5b9..edd6c24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager.registration;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 
@@ -29,15 +30,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TaskExecutorConnection {
 
+	private final ResourceID resourceID;
+
 	private final InstanceID instanceID;
 
 	private final TaskExecutorGateway taskExecutorGateway;
 
-	public TaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
+	public TaskExecutorConnection(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
+		this.resourceID = checkNotNull(resourceID);
 		this.instanceID = new InstanceID();
 		this.taskExecutorGateway = checkNotNull(taskExecutorGateway);
 	}
 
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
 	public InstanceID getInstanceID() {
 		return instanceID;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae81d4e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index eaa3c03..67925e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -39,7 +39,9 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> extend
 			WorkerType worker,
 			int dataPort,
 			HardwareDescription hardwareDescription) {
-		super(taskExecutorGateway);
+
+		super(worker.getResourceID(), taskExecutorGateway);
+
 		this.worker = Preconditions.checkNotNull(worker);
 		this.dataPort = dataPort;
 		this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae81d4e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 6cdd997..fe503b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -319,7 +319,7 @@ public class SlotManager implements AutoCloseable {
 	public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
 		checkInit();
 
-		LOG.info("Register TaskManager {} at the SlotManager.", taskExecutorConnection.getInstanceID());
+		LOG.info("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
 
 		// we identify task managers by their instance id
 		if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae81d4e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index f504d77..59de473 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -86,9 +86,9 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final ResourceID resourceId = ResourceID.generate();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
-		ResourceID resourceId = ResourceID.generate();
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
 		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
@@ -124,9 +124,9 @@ public class SlotManagerTest extends TestLogger {
 			eq(resourceManagerId),
 			any(Time.class))).thenReturn(new CompletableFuture<>());
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final ResourceID resourceId = ResourceID.generate();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
-		ResourceID resourceId = ResourceID.generate();
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
 		final AllocationID allocationId1 = new AllocationID();
@@ -251,7 +251,7 @@ public class SlotManagerTest extends TestLogger {
 				eq(resourceManagerId),
 				any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-			final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+			final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 			final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
 			final SlotReport slotReport = new SlotReport(slotStatus);
@@ -278,7 +278,8 @@ public class SlotManagerTest extends TestLogger {
 	public void testUnregisterPendingSlotRequest() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
-		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final ResourceID resourceID = ResourceID.generate();
+		final SlotID slotId = new SlotID(resourceID, 0);
 		final AllocationID allocationId = new AllocationID();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -296,7 +297,7 @@ public class SlotManagerTest extends TestLogger {
 
 		final SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, resourceProfile, "foobar");
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
@@ -348,7 +349,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
 		final SlotReport slotReport = new SlotReport(slotStatus);
@@ -388,7 +389,7 @@ public class SlotManagerTest extends TestLogger {
 		// accept an incoming slot request
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 
-		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
 		final SlotReport slotReport = new SlotReport(slotStatus);
@@ -451,10 +452,11 @@ public class SlotManagerTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
-		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final ResourceID resourceID = ResourceID.generate();
+		final SlotID slotId = new SlotID(resourceID, 0);
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
 		final SlotReport slotReport = new SlotReport(slotStatus);
@@ -491,9 +493,11 @@ public class SlotManagerTest extends TestLogger {
 			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final ResourceID resourceID = ResourceID.generate();
+
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
-		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final SlotID slotId = new SlotID(resourceID, 0);
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1);
 		final SlotReport slotReport = new SlotReport(slotStatus);
 
@@ -536,9 +540,10 @@ public class SlotManagerTest extends TestLogger {
 			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final ResourceID resourceID = ResourceID.generate();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
-		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final SlotID slotId = new SlotID(resourceID, 0);
 		final SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2));
 		final SlotReport slotReport = new SlotReport(slotStatus);
 
@@ -619,7 +624,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			// check that we don't have any slots registered
@@ -658,11 +663,12 @@ public class SlotManagerTest extends TestLogger {
 
 		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+		final ResourceID resourceID = ResourceID.generate();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
-		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final SlotID slotId = new SlotID(resourceID, 0);
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
 		final SlotReport slotReport = new SlotReport(slotStatus);
@@ -765,9 +771,9 @@ public class SlotManagerTest extends TestLogger {
 			any(ResourceManagerId.class),
 			any(Time.class))).thenReturn(slotRequestFuture1, slotRequestFuture2);
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
-
 		final ResourceID resourceId = ResourceID.generate();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
+
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
 		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
@@ -843,9 +849,9 @@ public class SlotManagerTest extends TestLogger {
 			any(ResourceManagerId.class),
 			any(Time.class))).thenReturn(slotRequestFuture1, CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
-
 		final ResourceID resourceId = ResourceID.generate();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
+
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
 		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
@@ -963,7 +969,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
@@ -1042,12 +1048,13 @@ public class SlotManagerTest extends TestLogger {
 	public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
 		final Time taskManagerTimeout = Time.milliseconds(10L);
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+		final ResourceID resourceID = ResourceID.generate();
 		final ResourceActions resourceActions = mock(ResourceActions.class);
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 
-		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 		final SlotStatus slotStatus = new SlotStatus(
-			new SlotID(ResourceID.generate(), 0),
+			new SlotID(resourceID, 0),
 			new ResourceProfile(1.0, 1));
 		final SlotReport initialSlotReport = new SlotReport(slotStatus);
 
@@ -1082,10 +1089,10 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testReportAllocatedSlot() throws Exception {
+		final ResourceID taskManagerId = ResourceID.generate();
 		final ResourceActions resourceActions = mock(ResourceActions.class);
 		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
-		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
-		final ResourceID taskManagerId = ResourceID.generate();
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway);
 
 		try (final SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae81d4e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index b3e5e91..eac530d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
@@ -59,11 +60,11 @@ public class SlotProtocolTest extends TestLogger {
 
 	private static final long timeout = 10000L;
 
-	private static final ScheduledExecutorService scheduledExecutorService = 
+	private static final ScheduledExecutorService scheduledExecutorService =
 			new ScheduledThreadPoolExecutor(1);
 
 
-	private static final ScheduledExecutor scheduledExecutor = 
+	private static final ScheduledExecutor scheduledExecutor =
 			new ScheduledExecutorServiceAdapter(scheduledExecutorService);
 
 	@AfterClass
@@ -118,7 +119,7 @@ public class SlotProtocolTest extends TestLogger {
 			final SlotReport slotReport =
 				new SlotReport(Collections.singletonList(slotStatus));
 			// register slot at SlotManager
-			slotManager.registerTaskManager(new TaskExecutorConnection(taskExecutorGateway), slotReport);
+			slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
 
 			// 4) Slot becomes available and TaskExecutor gets a SlotRequest
 			verify(taskExecutorGateway, timeout(5000L))
@@ -166,7 +167,7 @@ public class SlotProtocolTest extends TestLogger {
 				new SlotReport(Collections.singletonList(slotStatus));
 			// register slot at SlotManager
 			slotManager.registerTaskManager(
-				new TaskExecutorConnection(taskExecutorGateway), slotReport);
+				new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
 
 			final String targetAddress = "foobar";