You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/04 11:21:07 UTC
[1/2] flink git commit: [hotfix] [runtime] Add resourceId to
TaskManager registration messages
Repository: flink
Updated Branches:
refs/heads/release-1.5 9435cd4fe -> b13a2bcb7
[hotfix] [runtime] Add resourceId to TaskManager registration messages
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b13a2bcb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b13a2bcb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b13a2bcb
Branch: refs/heads/release-1.5
Commit: b13a2bcb7b72a7535ef68ad8a2e1faa1a36b7f74
Parents: 3fdec15
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 3 21:50:48 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 4 13:20:45 2018 +0200
----------------------------------------------------------------------
.../registration/TaskExecutorConnection.java | 10 +++-
.../registration/WorkerRegistration.java | 4 +-
.../slotmanager/SlotManager.java | 2 +-
.../slotmanager/SlotManagerTest.java | 61 +++++++++++---------
.../slotmanager/SlotProtocolTest.java | 9 +--
5 files changed, 52 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b13a2bcb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
index babe5b9..edd6c24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.resourcemanager.registration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -29,15 +30,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class TaskExecutorConnection {
+ private final ResourceID resourceID;
+
private final InstanceID instanceID;
private final TaskExecutorGateway taskExecutorGateway;
- public TaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
+ public TaskExecutorConnection(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
+ this.resourceID = checkNotNull(resourceID);
this.instanceID = new InstanceID();
this.taskExecutorGateway = checkNotNull(taskExecutorGateway);
}
+ public ResourceID getResourceID() {
+ return resourceID;
+ }
+
public InstanceID getInstanceID() {
return instanceID;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b13a2bcb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index eaa3c03..67925e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -39,7 +39,9 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> extend
WorkerType worker,
int dataPort,
HardwareDescription hardwareDescription) {
- super(taskExecutorGateway);
+
+ super(worker.getResourceID(), taskExecutorGateway);
+
this.worker = Preconditions.checkNotNull(worker);
this.dataPort = dataPort;
this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
http://git-wip-us.apache.org/repos/asf/flink/blob/b13a2bcb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 6cdd997..fe503b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -319,7 +319,7 @@ public class SlotManager implements AutoCloseable {
public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
checkInit();
- LOG.info("Register TaskManager {} at the SlotManager.", taskExecutorConnection.getInstanceID());
+ LOG.info("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
// we identify task managers by their instance id
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/b13a2bcb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index f504d77..59de473 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -86,9 +86,9 @@ public class SlotManagerTest extends TestLogger {
final ResourceActions resourceManagerActions = mock(ResourceActions.class);
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final ResourceID resourceId = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
- ResourceID resourceId = ResourceID.generate();
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
@@ -124,9 +124,9 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(new CompletableFuture<>());
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final ResourceID resourceId = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
- ResourceID resourceId = ResourceID.generate();
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final AllocationID allocationId1 = new AllocationID();
@@ -251,7 +251,7 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -278,7 +278,8 @@ public class SlotManagerTest extends TestLogger {
public void testUnregisterPendingSlotRequest() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = mock(ResourceActions.class);
- final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+ final ResourceID resourceID = ResourceID.generate();
+ final SlotID slotId = new SlotID(resourceID, 0);
final AllocationID allocationId = new AllocationID();
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -296,7 +297,7 @@ public class SlotManagerTest extends TestLogger {
final SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, resourceProfile, "foobar");
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
@@ -348,7 +349,7 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -388,7 +389,7 @@ public class SlotManagerTest extends TestLogger {
// accept an incoming slot request
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -451,10 +452,11 @@ public class SlotManagerTest extends TestLogger {
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
- final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+ final ResourceID resourceID = ResourceID.generate();
+ final SlotID slotId = new SlotID(resourceID, 0);
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -491,9 +493,11 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final ResourceID resourceID = ResourceID.generate();
+
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
- final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+ final SlotID slotId = new SlotID(resourceID, 0);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -536,9 +540,10 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final ResourceID resourceID = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
- final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+ final SlotID slotId = new SlotID(resourceID, 0);
final SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2));
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -619,7 +624,7 @@ public class SlotManagerTest extends TestLogger {
final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
// check that we don't have any slots registered
@@ -658,11 +663,12 @@ public class SlotManagerTest extends TestLogger {
final ResourceActions resourceManagerActions = mock(ResourceActions.class);
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+ final ResourceID resourceID = ResourceID.generate();
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
- final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+ final SlotID slotId = new SlotID(resourceID, 0);
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -765,9 +771,9 @@ public class SlotManagerTest extends TestLogger {
any(ResourceManagerId.class),
any(Time.class))).thenReturn(slotRequestFuture1, slotRequestFuture2);
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
-
final ResourceID resourceId = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
+
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
@@ -843,9 +849,9 @@ public class SlotManagerTest extends TestLogger {
any(ResourceManagerId.class),
any(Time.class))).thenReturn(slotRequestFuture1, CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
-
final ResourceID resourceId = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
+
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
@@ -963,7 +969,7 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
@@ -1042,12 +1048,13 @@ public class SlotManagerTest extends TestLogger {
public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
final Time taskManagerTimeout = Time.milliseconds(10L);
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+ final ResourceID resourceID = ResourceID.generate();
final ResourceActions resourceActions = mock(ResourceActions.class);
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(
- new SlotID(ResourceID.generate(), 0),
+ new SlotID(resourceID, 0),
new ResourceProfile(1.0, 1));
final SlotReport initialSlotReport = new SlotReport(slotStatus);
@@ -1082,10 +1089,10 @@ public class SlotManagerTest extends TestLogger {
*/
@Test
public void testReportAllocatedSlot() throws Exception {
+ final ResourceID taskManagerId = ResourceID.generate();
final ResourceActions resourceActions = mock(ResourceActions.class);
final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
- final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
- final ResourceID taskManagerId = ResourceID.generate();
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway);
try (final SlotManager slotManager = new SlotManager(
TestingUtils.defaultScheduledExecutor(),
http://git-wip-us.apache.org/repos/asf/flink/blob/b13a2bcb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index b3e5e91..eac530d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
@@ -59,11 +60,11 @@ public class SlotProtocolTest extends TestLogger {
private static final long timeout = 10000L;
- private static final ScheduledExecutorService scheduledExecutorService =
+ private static final ScheduledExecutorService scheduledExecutorService =
new ScheduledThreadPoolExecutor(1);
- private static final ScheduledExecutor scheduledExecutor =
+ private static final ScheduledExecutor scheduledExecutor =
new ScheduledExecutorServiceAdapter(scheduledExecutorService);
@AfterClass
@@ -118,7 +119,7 @@ public class SlotProtocolTest extends TestLogger {
final SlotReport slotReport =
new SlotReport(Collections.singletonList(slotStatus));
// register slot at SlotManager
- slotManager.registerTaskManager(new TaskExecutorConnection(taskExecutorGateway), slotReport);
+ slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
// 4) Slot becomes available and TaskExecutor gets a SlotRequest
verify(taskExecutorGateway, timeout(5000L))
@@ -166,7 +167,7 @@ public class SlotProtocolTest extends TestLogger {
new SlotReport(Collections.singletonList(slotStatus));
// register slot at SlotManager
slotManager.registerTaskManager(
- new TaskExecutorConnection(taskExecutorGateway), slotReport);
+ new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
final String targetAddress = "foobar";
[2/2] flink git commit: [hotfix] [runtime] Minor cleanups around
JobMasterId, ResourceManagerId, DispatcherId.
Posted by se...@apache.org.
[hotfix] [runtime] Minor cleanups around JobMasterId, ResourceManagerId, DispatcherId.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fdec156
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fdec156
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fdec156
Branch: refs/heads/release-1.5
Commit: 3fdec156467e40ac662b28a41d642f0531d8c55b
Parents: 9435cd4
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 3 21:13:36 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 4 13:20:45 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 2 +-
.../flink/runtime/dispatcher/DispatcherId.java | 28 ++++++------
.../runtime/entrypoint/ClusterEntrypoint.java | 4 +-
.../flink/runtime/jobmaster/JobMaster.java | 2 +-
.../flink/runtime/jobmaster/JobMasterId.java | 41 +++++++++++------
.../flink/runtime/minicluster/MiniCluster.java | 4 +-
.../resourcemanager/JobLeaderIdService.java | 2 +-
.../resourcemanager/ResourceManager.java | 2 +-
.../resourcemanager/ResourceManagerId.java | 48 +++++++++++++-------
.../runtime/taskexecutor/JobLeaderService.java | 5 +-
.../runtime/taskexecutor/TaskExecutor.java | 2 +-
.../slotpool/SlotPoolSchedulingTestBase.java | 2 +-
12 files changed, 84 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c05255b..58ffda3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -698,7 +698,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
*/
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
- final DispatcherId dispatcherId = new DispatcherId(newLeaderSessionID);
+ final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), dispatcherId);
final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
index e563090..403f72f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
@@ -29,29 +29,27 @@ public class DispatcherId extends AbstractID {
private static final long serialVersionUID = -1654056277003743966L;
- public DispatcherId(byte[] bytes) {
- super(bytes);
- }
-
- public DispatcherId(long lowerPart, long upperPart) {
- super(lowerPart, upperPart);
- }
-
- public DispatcherId(AbstractID id) {
- super(id);
- }
-
- public DispatcherId() {}
+ private DispatcherId() {}
- public DispatcherId(UUID uuid) {
- this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+ private DispatcherId(UUID uuid) {
+ super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
}
public UUID toUUID() {
return new UUID(getUpperPart(), getLowerPart());
}
+ /**
+ * Generates a new random DispatcherId.
+ */
public static DispatcherId generate() {
return new DispatcherId();
}
+
+ /**
+ * Creates a new DispatcherId that corresponds to the UUID.
+ */
+ public static DispatcherId fromUuid(UUID uuid) {
+ return new DispatcherId(uuid);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f7a5858..f823ea7 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -306,14 +306,14 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
- DispatcherId::new,
+ DispatcherId::fromUuid,
10,
Time.milliseconds(50L));
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
- ResourceManagerId::new,
+ ResourceManagerId::fromUuid,
10,
Time.milliseconds(50L));
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f5606fb..f30c119 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1452,7 +1452,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
- leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null));
+ ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
index 39f7ded..0ebf913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.util.AbstractID;
+import javax.annotation.Nullable;
+
import java.util.UUID;
/**
@@ -29,30 +31,39 @@ public class JobMasterId extends AbstractID {
private static final long serialVersionUID = -933276753644003754L;
- public JobMasterId(byte[] bytes) {
- super(bytes);
- }
-
- public JobMasterId(long lowerPart, long upperPart) {
- super(lowerPart, upperPart);
- }
-
- public JobMasterId(AbstractID id) {
- super(id);
- }
-
- public JobMasterId() {
+ /**
+ * Creates a JobMasterId that takes the bits from the given UUID.
+ */
+ public JobMasterId(UUID uuid) {
+ super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
}
- public JobMasterId(UUID uuid) {
- this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+ /**
+ * Generates a new random JobMasterId.
+ */
+ private JobMasterId() {
+ super();
}
+ /**
+ * Creates a UUID with the bits from this JobMasterId.
+ */
public UUID toUUID() {
return new UUID(getUpperPart(), getLowerPart());
}
+ /**
+ * Generates a new random JobMasterId.
+ */
public static JobMasterId generate() {
return new JobMasterId();
}
+
+ /**
+ * If the given uuid is null, this returns null, otherwise a JobMasterId that
+ * corresponds to the UUID, via {@link #JobMasterId(UUID)}.
+ */
+ public static JobMasterId fromUuidOrNull(@Nullable UUID uuid) {
+ return uuid == null ? null : new JobMasterId(uuid);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 64d46c6..0c1644f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -324,13 +324,13 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
DispatcherGateway.class,
- DispatcherId::new,
+ DispatcherId::fromUuid,
20,
Time.milliseconds(20L));
final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
ResourceManagerGateway.class,
- ResourceManagerId::new,
+ ResourceManagerId::fromUuid,
20,
Time.milliseconds(20L));
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index da0a7fd..994db34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -188,7 +188,7 @@ public class JobLeaderIdService {
JobLeaderIdListener listener = jobLeaderIdListeners.get(jobId);
- return listener.getLeaderIdFuture().thenApply((UUID id) -> id != null ? new JobMasterId(id) : null);
+ return listener.getLeaderIdFuture().thenApply(JobMasterId::fromUuidOrNull);
}
public boolean isValidTimeout(JobID jobId, UUID timeoutId) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c753469..af736b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -889,7 +889,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
public void grantLeadership(final UUID newLeaderSessionID) {
runAsyncWithoutFencing(
() -> {
- final ResourceManagerId newResourceManagerId = new ResourceManagerId(newLeaderSessionID);
+ final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
index 3594e88..3405bb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.util.AbstractID;
+import javax.annotation.Nullable;
+
import java.util.UUID;
/**
@@ -29,30 +31,44 @@ public class ResourceManagerId extends AbstractID {
private static final long serialVersionUID = -6042820142662137374L;
- public ResourceManagerId(byte[] bytes) {
- super(bytes);
- }
-
- public ResourceManagerId(long lowerPart, long upperPart) {
- super(lowerPart, upperPart);
- }
-
- public ResourceManagerId(AbstractID id) {
- super(id);
- }
-
- public ResourceManagerId() {
- }
+ /**
+ * Generates a new random ResourceManagerId.
+ */
+ private ResourceManagerId() {}
- public ResourceManagerId(UUID uuid) {
- this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+ /**
+ * Creates a ResourceManagerId that takes the bits from the given UUID.
+ */
+ private ResourceManagerId(UUID uuid) {
+ super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
}
+ /**
+ * Creates a UUID with the bits from this ResourceManagerId.
+ */
public UUID toUUID() {
return new UUID(getUpperPart(), getLowerPart());
}
+ /**
+ * Generates a new random ResourceManagerId.
+ */
public static ResourceManagerId generate() {
return new ResourceManagerId();
}
+
+ /**
+ * Creates a ResourceManagerId that corresponds to the given UUID.
+ */
+ public static ResourceManagerId fromUuid(UUID uuid) {
+ return new ResourceManagerId(uuid);
+ }
+
+ /**
+ * If the given uuid is null, this returns null, otherwise a ResourceManagerId that
+ * corresponds to the UUID, via {@link #ResourceManagerId(UUID)}.
+ */
+ public static ResourceManagerId fromUuidOrNull(@Nullable UUID uuid) {
+ return uuid == null ? null : new ResourceManagerId(uuid);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 500d7e4..09ffd99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
@@ -277,12 +278,12 @@ public class JobLeaderService {
}
@Override
- public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) {
+ public void notifyLeaderAddress(final @Nullable String leaderAddress, final @Nullable UUID leaderId) {
if (stopped) {
LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " +
"However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId);
} else {
- final JobMasterId jobMasterId = leaderId != null ? new JobMasterId(leaderId) : null;
+ final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);
LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",
jobId, leaderAddress, jobMasterId);
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f665426..000d0c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1387,7 +1387,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
- leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null));
+ ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
index 75b0d05..a457599 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
@@ -44,7 +44,7 @@ public class SlotPoolSchedulingTestBase extends TestLogger {
private static final JobID jobId = new JobID();
- private static final JobMasterId jobMasterId = new JobMasterId();
+ private static final JobMasterId jobMasterId = JobMasterId.generate();
private static final String jobMasterAddress = "foobar";