You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/04 11:21:07 UTC

[1/2] flink git commit: [hotfix] [runtime] Add resourceId to TaskManager registration messages

Repository: flink
Updated Branches:
  refs/heads/release-1.5 9435cd4fe -> b13a2bcb7


[hotfix] [runtime] Add resourceId to TaskManager registration messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b13a2bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b13a2bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b13a2bcb

Branch: refs/heads/release-1.5
Commit: b13a2bcb7b72a7535ef68ad8a2e1faa1a36b7f74
Parents: 3fdec15
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 3 21:50:48 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 4 13:20:45 2018 +0200

----------------------------------------------------------------------
 .../registration/TaskExecutorConnection.java    | 10 +++-
 .../registration/WorkerRegistration.java        |  4 +-
 .../slotmanager/SlotManager.java                |  2 +-
 .../slotmanager/SlotManagerTest.java            | 61 +++++++++++---------
 .../slotmanager/SlotProtocolTest.java           |  9 +--
 5 files changed, 52 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b13a2bcb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
index babe5b9..edd6c24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager.registration;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 
@@ -29,15 +30,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TaskExecutorConnection {
 
+	private final ResourceID resourceID;
+
 	private final InstanceID instanceID;
 
 	private final TaskExecutorGateway taskExecutorGateway;
 
-	public TaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
+	public TaskExecutorConnection(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
+		this.resourceID = checkNotNull(resourceID);
 		this.instanceID = new InstanceID();
 		this.taskExecutorGateway = checkNotNull(taskExecutorGateway);
 	}
 
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
 	public InstanceID getInstanceID() {
 		return instanceID;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b13a2bcb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index eaa3c03..67925e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -39,7 +39,9 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> extend
 			WorkerType worker,
 			int dataPort,
 			HardwareDescription hardwareDescription) {
-		super(taskExecutorGateway);
+
+		super(worker.getResourceID(), taskExecutorGateway);
+
 		this.worker = Preconditions.checkNotNull(worker);
 		this.dataPort = dataPort;
 		this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);

http://git-wip-us.apache.org/repos/asf/flink/blob/b13a2bcb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 6cdd997..fe503b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -319,7 +319,7 @@ public class SlotManager implements AutoCloseable {
 	public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
 		checkInit();
 
-		LOG.info("Register TaskManager {} at the SlotManager.", taskExecutorConnection.getInstanceID());
+		LOG.info("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
 
 		// we identify task managers by their instance id
 		if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b13a2bcb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index f504d77..59de473 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -86,9 +86,9 @@ public class SlotManagerTest extends TestLogger {
 		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final ResourceID resourceId = ResourceID.generate();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
-		ResourceID resourceId = ResourceID.generate();
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
 		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
@@ -124,9 +124,9 @@ public class SlotManagerTest extends TestLogger {
 			eq(resourceManagerId),
 			any(Time.class))).thenReturn(new CompletableFuture<>());
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final ResourceID resourceId = ResourceID.generate();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
-		ResourceID resourceId = ResourceID.generate();
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
 		final AllocationID allocationId1 = new AllocationID();
@@ -251,7 +251,7 @@ public class SlotManagerTest extends TestLogger {
 				eq(resourceManagerId),
 				any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-			final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+			final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 			final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
 			final SlotReport slotReport = new SlotReport(slotStatus);
@@ -278,7 +278,8 @@ public class SlotManagerTest extends TestLogger {
 	public void testUnregisterPendingSlotRequest() throws Exception {
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
-		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final ResourceID resourceID = ResourceID.generate();
+		final SlotID slotId = new SlotID(resourceID, 0);
 		final AllocationID allocationId = new AllocationID();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -296,7 +297,7 @@ public class SlotManagerTest extends TestLogger {
 
 		final SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, resourceProfile, "foobar");
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
@@ -348,7 +349,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
 		final SlotReport slotReport = new SlotReport(slotStatus);
@@ -388,7 +389,7 @@ public class SlotManagerTest extends TestLogger {
 		// accept an incoming slot request
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 
-		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
 		final SlotReport slotReport = new SlotReport(slotStatus);
@@ -451,10 +452,11 @@ public class SlotManagerTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
-		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final ResourceID resourceID = ResourceID.generate();
+		final SlotID slotId = new SlotID(resourceID, 0);
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
 		final SlotReport slotReport = new SlotReport(slotStatus);
@@ -491,9 +493,11 @@ public class SlotManagerTest extends TestLogger {
 			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final ResourceID resourceID = ResourceID.generate();
+
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
-		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final SlotID slotId = new SlotID(resourceID, 0);
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1);
 		final SlotReport slotReport = new SlotReport(slotStatus);
 
@@ -536,9 +540,10 @@ public class SlotManagerTest extends TestLogger {
 			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final ResourceID resourceID = ResourceID.generate();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
-		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final SlotID slotId = new SlotID(resourceID, 0);
 		final SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2));
 		final SlotReport slotReport = new SlotReport(slotStatus);
 
@@ -619,7 +624,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			// check that we don't have any slots registered
@@ -658,11 +663,12 @@ public class SlotManagerTest extends TestLogger {
 
 		final ResourceActions resourceManagerActions = mock(ResourceActions.class);
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+		final ResourceID resourceID = ResourceID.generate();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
-		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final SlotID slotId = new SlotID(resourceID, 0);
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
 		final SlotReport slotReport = new SlotReport(slotStatus);
@@ -765,9 +771,9 @@ public class SlotManagerTest extends TestLogger {
 			any(ResourceManagerId.class),
 			any(Time.class))).thenReturn(slotRequestFuture1, slotRequestFuture2);
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
-
 		final ResourceID resourceId = ResourceID.generate();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
+
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
 		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
@@ -843,9 +849,9 @@ public class SlotManagerTest extends TestLogger {
 			any(ResourceManagerId.class),
 			any(Time.class))).thenReturn(slotRequestFuture1, CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
-
 		final ResourceID resourceId = ResourceID.generate();
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
+
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
 		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
@@ -963,7 +969,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
-		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
 
 		final SlotID slotId1 = new SlotID(resourceId, 0);
 		final SlotID slotId2 = new SlotID(resourceId, 1);
@@ -1042,12 +1048,13 @@ public class SlotManagerTest extends TestLogger {
 	public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
 		final Time taskManagerTimeout = Time.milliseconds(10L);
 		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+		final ResourceID resourceID = ResourceID.generate();
 		final ResourceActions resourceActions = mock(ResourceActions.class);
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 
-		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
 		final SlotStatus slotStatus = new SlotStatus(
-			new SlotID(ResourceID.generate(), 0),
+			new SlotID(resourceID, 0),
 			new ResourceProfile(1.0, 1));
 		final SlotReport initialSlotReport = new SlotReport(slotStatus);
 
@@ -1082,10 +1089,10 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testReportAllocatedSlot() throws Exception {
+		final ResourceID taskManagerId = ResourceID.generate();
 		final ResourceActions resourceActions = mock(ResourceActions.class);
 		final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
-		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
-		final ResourceID taskManagerId = ResourceID.generate();
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway);
 
 		try (final SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/b13a2bcb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index b3e5e91..eac530d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
@@ -59,11 +60,11 @@ public class SlotProtocolTest extends TestLogger {
 
 	private static final long timeout = 10000L;
 
-	private static final ScheduledExecutorService scheduledExecutorService = 
+	private static final ScheduledExecutorService scheduledExecutorService =
 			new ScheduledThreadPoolExecutor(1);
 
 
-	private static final ScheduledExecutor scheduledExecutor = 
+	private static final ScheduledExecutor scheduledExecutor =
 			new ScheduledExecutorServiceAdapter(scheduledExecutorService);
 
 	@AfterClass
@@ -118,7 +119,7 @@ public class SlotProtocolTest extends TestLogger {
 			final SlotReport slotReport =
 				new SlotReport(Collections.singletonList(slotStatus));
 			// register slot at SlotManager
-			slotManager.registerTaskManager(new TaskExecutorConnection(taskExecutorGateway), slotReport);
+			slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
 
 			// 4) Slot becomes available and TaskExecutor gets a SlotRequest
 			verify(taskExecutorGateway, timeout(5000L))
@@ -166,7 +167,7 @@ public class SlotProtocolTest extends TestLogger {
 				new SlotReport(Collections.singletonList(slotStatus));
 			// register slot at SlotManager
 			slotManager.registerTaskManager(
-				new TaskExecutorConnection(taskExecutorGateway), slotReport);
+				new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
 
 			final String targetAddress = "foobar";
 


[2/2] flink git commit: [hotfix] [runtime] Minor cleanups around JobMasterId, ResourceManagerId, DispatcherId.

Posted by se...@apache.org.
[hotfix] [runtime] Minor cleanups around JobMasterId, ResourceManagerId, DispatcherId.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fdec156
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fdec156
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fdec156

Branch: refs/heads/release-1.5
Commit: 3fdec156467e40ac662b28a41d642f0531d8c55b
Parents: 9435cd4
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 3 21:13:36 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 4 13:20:45 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  2 +-
 .../flink/runtime/dispatcher/DispatcherId.java  | 28 ++++++------
 .../runtime/entrypoint/ClusterEntrypoint.java   |  4 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  2 +-
 .../flink/runtime/jobmaster/JobMasterId.java    | 41 +++++++++++------
 .../flink/runtime/minicluster/MiniCluster.java  |  4 +-
 .../resourcemanager/JobLeaderIdService.java     |  2 +-
 .../resourcemanager/ResourceManager.java        |  2 +-
 .../resourcemanager/ResourceManagerId.java      | 48 +++++++++++++-------
 .../runtime/taskexecutor/JobLeaderService.java  |  5 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../slotpool/SlotPoolSchedulingTestBase.java    |  2 +-
 12 files changed, 84 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c05255b..58ffda3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -698,7 +698,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	 */
 	@Override
 	public void grantLeadership(final UUID newLeaderSessionID) {
-		final DispatcherId dispatcherId = new DispatcherId(newLeaderSessionID);
+		final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
 		log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), dispatcherId);
 
 		final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
index e563090..403f72f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
@@ -29,29 +29,27 @@ public class DispatcherId extends AbstractID {
 
 	private static final long serialVersionUID = -1654056277003743966L;
 
-	public DispatcherId(byte[] bytes) {
-		super(bytes);
-	}
-
-	public DispatcherId(long lowerPart, long upperPart) {
-		super(lowerPart, upperPart);
-	}
-
-	public DispatcherId(AbstractID id) {
-		super(id);
-	}
-
-	public DispatcherId() {}
+	private DispatcherId() {}
 
-	public DispatcherId(UUID uuid) {
-		this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+	private DispatcherId(UUID uuid) {
+		super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
 	}
 
 	public UUID toUUID() {
 		return new UUID(getUpperPart(), getLowerPart());
 	}
 
+	/**
+	 * Generates a new random DispatcherId.
+	 */
 	public static DispatcherId generate() {
 		return new DispatcherId();
 	}
+
+	/**
+	 * Creates a new DispatcherId that corresponds to the UUID.
+	 */
+	public static DispatcherId fromUuid(UUID uuid) {
+		return new DispatcherId(uuid);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f7a5858..f823ea7 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -306,14 +306,14 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
 				rpcService,
 				DispatcherGateway.class,
-				DispatcherId::new,
+				DispatcherId::fromUuid,
 				10,
 				Time.milliseconds(50L));
 
 			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
 				rpcService,
 				ResourceManagerGateway.class,
-				ResourceManagerId::new,
+				ResourceManagerId::fromUuid,
 				10,
 				Time.milliseconds(50L));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f5606fb..f30c119 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1452,7 +1452,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			runAsync(
 				() -> notifyOfNewResourceManagerLeader(
 					leaderAddress,
-					leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null));
+					ResourceManagerId.fromUuidOrNull(leaderSessionID)));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
index 39f7ded..0ebf913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.util.AbstractID;
 
+import javax.annotation.Nullable;
+
 import java.util.UUID;
 
 /**
@@ -29,30 +31,39 @@ public class JobMasterId extends AbstractID {
 
 	private static final long serialVersionUID = -933276753644003754L;
 
-	public JobMasterId(byte[] bytes) {
-		super(bytes);
-	}
-
-	public JobMasterId(long lowerPart, long upperPart) {
-		super(lowerPart, upperPart);
-	}
-
-	public JobMasterId(AbstractID id) {
-		super(id);
-	}
-
-	public JobMasterId() {
+	/**
+	 * Creates a JobMasterId that takes the bits from the given UUID.
+	 */
+	public JobMasterId(UUID uuid) {
+		super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
 	}
 
-	public JobMasterId(UUID uuid) {
-		this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+	/**
+	 * Generates a new random JobMasterId.
+	 */
+	private JobMasterId() {
+		super();
 	}
 
+	/**
+	 * Creates a UUID with the bits from this JobMasterId.
+	 */
 	public UUID toUUID() {
 		return new UUID(getUpperPart(), getLowerPart());
 	}
 
+	/**
+	 * Generates a new random JobMasterId.
+	 */
 	public static JobMasterId generate() {
 		return new JobMasterId();
 	}
+
+	/**
+	 * If the given uuid is null, this returns null, otherwise a JobMasterId that
+	 * corresponds to the UUID, via {@link #JobMasterId(UUID)}.
+	 */
+	public static JobMasterId fromUuidOrNull(@Nullable UUID uuid) {
+		return  uuid == null ? null : new JobMasterId(uuid);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 64d46c6..0c1644f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -324,13 +324,13 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 				dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
 					jobManagerRpcService,
 					DispatcherGateway.class,
-					DispatcherId::new,
+					DispatcherId::fromUuid,
 					20,
 					Time.milliseconds(20L));
 				final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
 					jobManagerRpcService,
 					ResourceManagerGateway.class,
-					ResourceManagerId::new,
+					ResourceManagerId::fromUuid,
 					20,
 					Time.milliseconds(20L));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index da0a7fd..994db34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -188,7 +188,7 @@ public class JobLeaderIdService {
 
 		JobLeaderIdListener listener = jobLeaderIdListeners.get(jobId);
 
-		return listener.getLeaderIdFuture().thenApply((UUID id) -> id != null ? new JobMasterId(id) : null);
+		return listener.getLeaderIdFuture().thenApply(JobMasterId::fromUuidOrNull);
 	}
 
 	public boolean isValidTimeout(JobID jobId, UUID timeoutId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c753469..af736b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -889,7 +889,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	public void grantLeadership(final UUID newLeaderSessionID) {
 		runAsyncWithoutFencing(
 			() -> {
-				final ResourceManagerId newResourceManagerId = new ResourceManagerId(newLeaderSessionID);
+				final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
 
 				log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
index 3594e88..3405bb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.util.AbstractID;
 
+import javax.annotation.Nullable;
+
 import java.util.UUID;
 
 /**
@@ -29,30 +31,44 @@ public class ResourceManagerId extends AbstractID {
 
 	private static final long serialVersionUID = -6042820142662137374L;
 
-	public ResourceManagerId(byte[] bytes) {
-		super(bytes);
-	}
-
-	public ResourceManagerId(long lowerPart, long upperPart) {
-		super(lowerPart, upperPart);
-	}
-
-	public ResourceManagerId(AbstractID id) {
-		super(id);
-	}
-
-	public ResourceManagerId() {
-	}
+	/**
+	 * Generates a new random ResourceManagerId.
+	 */
+	private ResourceManagerId() {}
 
-	public ResourceManagerId(UUID uuid) {
-		this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+	/**
+	 * Creates a ResourceManagerId that takes the bits from the given UUID.
+	 */
+	private ResourceManagerId(UUID uuid) {
+		super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
 	}
 
+	/**
+	 * Creates a UUID with the bits from this ResourceManagerId.
+	 */
 	public UUID toUUID() {
 		return new UUID(getUpperPart(), getLowerPart());
 	}
 
+	/**
+	 * Generates a new random ResourceManagerId.
+	 */
 	public static ResourceManagerId generate() {
 		return new ResourceManagerId();
 	}
+
+	/**
+	 * Creates a ResourceManagerId that corresponds to the given UUID.
+	 */
+	public static ResourceManagerId fromUuid(UUID uuid) {
+		return new ResourceManagerId(uuid);
+	}
+
+	/**
+	 * If the given uuid is null, this returns null, otherwise a ResourceManagerId that
+	 * corresponds to the UUID, via {@link #ResourceManagerId(UUID)}.
+	 */
+	public static ResourceManagerId fromUuidOrNull(@Nullable UUID uuid) {
+		return  uuid == null ? null : new ResourceManagerId(uuid);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 500d7e4..09ffd99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
@@ -277,12 +278,12 @@ public class JobLeaderService {
 		}
 
 		@Override
-		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) {
+		public void notifyLeaderAddress(final @Nullable String leaderAddress, final @Nullable UUID leaderId) {
 			if (stopped) {
 				LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " +
 					"However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId);
 			} else {
-				final JobMasterId jobMasterId = leaderId != null ? new JobMasterId(leaderId) : null;
+				final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);
 
 				LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",
 					jobId, leaderAddress, jobMasterId);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f665426..000d0c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1387,7 +1387,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			runAsync(
 				() -> notifyOfNewResourceManagerLeader(
 					leaderAddress,
-					leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null));
+					ResourceManagerId.fromUuidOrNull(leaderSessionID)));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
index 75b0d05..a457599 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
@@ -44,7 +44,7 @@ public class SlotPoolSchedulingTestBase extends TestLogger {
 
 	private static final JobID jobId = new JobID();
 
-	private static final JobMasterId jobMasterId = new JobMasterId();
+	private static final JobMasterId jobMasterId = JobMasterId.generate();
 
 	private static final String jobMasterAddress = "foobar";