You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/04 11:21:08 UTC
[2/2] flink git commit: [hotfix] [runtime] Minor cleanups around
JobMasterId, ResourceManagerId, DispatcherId.
[hotfix] [runtime] Minor cleanups around JobMasterId, ResourceManagerId, DispatcherId.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fdec156
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fdec156
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fdec156
Branch: refs/heads/release-1.5
Commit: 3fdec156467e40ac662b28a41d642f0531d8c55b
Parents: 9435cd4
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 3 21:13:36 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 4 13:20:45 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/dispatcher/Dispatcher.java | 2 +-
.../flink/runtime/dispatcher/DispatcherId.java | 28 ++++++------
.../runtime/entrypoint/ClusterEntrypoint.java | 4 +-
.../flink/runtime/jobmaster/JobMaster.java | 2 +-
.../flink/runtime/jobmaster/JobMasterId.java | 41 +++++++++++------
.../flink/runtime/minicluster/MiniCluster.java | 4 +-
.../resourcemanager/JobLeaderIdService.java | 2 +-
.../resourcemanager/ResourceManager.java | 2 +-
.../resourcemanager/ResourceManagerId.java | 48 +++++++++++++-------
.../runtime/taskexecutor/JobLeaderService.java | 5 +-
.../runtime/taskexecutor/TaskExecutor.java | 2 +-
.../slotpool/SlotPoolSchedulingTestBase.java | 2 +-
12 files changed, 84 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c05255b..58ffda3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -698,7 +698,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
*/
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
- final DispatcherId dispatcherId = new DispatcherId(newLeaderSessionID);
+ final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), dispatcherId);
final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
index e563090..403f72f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
@@ -29,29 +29,27 @@ public class DispatcherId extends AbstractID {
private static final long serialVersionUID = -1654056277003743966L;
- public DispatcherId(byte[] bytes) {
- super(bytes);
- }
-
- public DispatcherId(long lowerPart, long upperPart) {
- super(lowerPart, upperPart);
- }
-
- public DispatcherId(AbstractID id) {
- super(id);
- }
-
- public DispatcherId() {}
+ private DispatcherId() {}
- public DispatcherId(UUID uuid) {
- this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+ private DispatcherId(UUID uuid) {
+ super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
}
public UUID toUUID() {
return new UUID(getUpperPart(), getLowerPart());
}
+ /**
+ * Generates a new random DispatcherId.
+ */
public static DispatcherId generate() {
return new DispatcherId();
}
+
+ /**
+ * Creates a new DispatcherId that corresponds to the UUID.
+ */
+ public static DispatcherId fromUuid(UUID uuid) {
+ return new DispatcherId(uuid);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f7a5858..f823ea7 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -306,14 +306,14 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
- DispatcherId::new,
+ DispatcherId::fromUuid,
10,
Time.milliseconds(50L));
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
- ResourceManagerId::new,
+ ResourceManagerId::fromUuid,
10,
Time.milliseconds(50L));
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f5606fb..f30c119 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1452,7 +1452,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
- leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null));
+ ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
index 39f7ded..0ebf913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.jobmaster;
import org.apache.flink.util.AbstractID;
+import javax.annotation.Nullable;
+
import java.util.UUID;
/**
@@ -29,30 +31,39 @@ public class JobMasterId extends AbstractID {
private static final long serialVersionUID = -933276753644003754L;
- public JobMasterId(byte[] bytes) {
- super(bytes);
- }
-
- public JobMasterId(long lowerPart, long upperPart) {
- super(lowerPart, upperPart);
- }
-
- public JobMasterId(AbstractID id) {
- super(id);
- }
-
- public JobMasterId() {
+ /**
+ * Creates a JobMasterId that takes the bits from the given UUID.
+ */
+ public JobMasterId(UUID uuid) {
+ super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
}
- public JobMasterId(UUID uuid) {
- this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+ /**
+ * Generates a new random JobMasterId.
+ */
+ private JobMasterId() {
+ super();
}
+ /**
+ * Creates a UUID with the bits from this JobMasterId.
+ */
public UUID toUUID() {
return new UUID(getUpperPart(), getLowerPart());
}
+ /**
+ * Generates a new random JobMasterId.
+ */
public static JobMasterId generate() {
return new JobMasterId();
}
+
+ /**
+ * If the given uuid is null, this returns null, otherwise a JobMasterId that
+ * corresponds to the UUID, via {@link #JobMasterId(UUID)}.
+ */
+ public static JobMasterId fromUuidOrNull(@Nullable UUID uuid) {
+ return uuid == null ? null : new JobMasterId(uuid);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 64d46c6..0c1644f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -324,13 +324,13 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
DispatcherGateway.class,
- DispatcherId::new,
+ DispatcherId::fromUuid,
20,
Time.milliseconds(20L));
final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
jobManagerRpcService,
ResourceManagerGateway.class,
- ResourceManagerId::new,
+ ResourceManagerId::fromUuid,
20,
Time.milliseconds(20L));
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index da0a7fd..994db34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -188,7 +188,7 @@ public class JobLeaderIdService {
JobLeaderIdListener listener = jobLeaderIdListeners.get(jobId);
- return listener.getLeaderIdFuture().thenApply((UUID id) -> id != null ? new JobMasterId(id) : null);
+ return listener.getLeaderIdFuture().thenApply(JobMasterId::fromUuidOrNull);
}
public boolean isValidTimeout(JobID jobId, UUID timeoutId) {
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c753469..af736b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -889,7 +889,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
public void grantLeadership(final UUID newLeaderSessionID) {
runAsyncWithoutFencing(
() -> {
- final ResourceManagerId newResourceManagerId = new ResourceManagerId(newLeaderSessionID);
+ final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
index 3594e88..3405bb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.util.AbstractID;
+import javax.annotation.Nullable;
+
import java.util.UUID;
/**
@@ -29,30 +31,44 @@ public class ResourceManagerId extends AbstractID {
private static final long serialVersionUID = -6042820142662137374L;
- public ResourceManagerId(byte[] bytes) {
- super(bytes);
- }
-
- public ResourceManagerId(long lowerPart, long upperPart) {
- super(lowerPart, upperPart);
- }
-
- public ResourceManagerId(AbstractID id) {
- super(id);
- }
-
- public ResourceManagerId() {
- }
+ /**
+ * Generates a new random ResourceManagerId.
+ */
+ private ResourceManagerId() {}
- public ResourceManagerId(UUID uuid) {
- this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+ /**
+ * Creates a ResourceManagerId that takes the bits from the given UUID.
+ */
+ private ResourceManagerId(UUID uuid) {
+ super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
}
+ /**
+ * Creates a UUID with the bits from this ResourceManagerId.
+ */
public UUID toUUID() {
return new UUID(getUpperPart(), getLowerPart());
}
+ /**
+ * Generates a new random ResourceManagerId.
+ */
public static ResourceManagerId generate() {
return new ResourceManagerId();
}
+
+ /**
+ * Creates a ResourceManagerId that corresponds to the given UUID.
+ */
+ public static ResourceManagerId fromUuid(UUID uuid) {
+ return new ResourceManagerId(uuid);
+ }
+
+ /**
+ * If the given uuid is null, this returns null, otherwise a ResourceManagerId that
+ * corresponds to the UUID, via {@link #ResourceManagerId(UUID)}.
+ */
+ public static ResourceManagerId fromUuidOrNull(@Nullable UUID uuid) {
+ return uuid == null ? null : new ResourceManagerId(uuid);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 500d7e4..09ffd99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
@@ -277,12 +278,12 @@ public class JobLeaderService {
}
@Override
- public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) {
+ public void notifyLeaderAddress(final @Nullable String leaderAddress, final @Nullable UUID leaderId) {
if (stopped) {
LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " +
"However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId);
} else {
- final JobMasterId jobMasterId = leaderId != null ? new JobMasterId(leaderId) : null;
+ final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);
LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",
jobId, leaderAddress, jobMasterId);
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f665426..000d0c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1387,7 +1387,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
- leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null));
+ ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
index 75b0d05..a457599 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
@@ -44,7 +44,7 @@ public class SlotPoolSchedulingTestBase extends TestLogger {
private static final JobID jobId = new JobID();
- private static final JobMasterId jobMasterId = new JobMasterId();
+ private static final JobMasterId jobMasterId = JobMasterId.generate();
private static final String jobMasterAddress = "foobar";