You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/04 11:21:08 UTC

[2/2] flink git commit: [hotfix] [runtime] Minor cleanups around JobMasterId, ResourceManagerId, DispatcherId.

[hotfix] [runtime] Minor cleanups around JobMasterId, ResourceManagerId, DispatcherId.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fdec156
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fdec156
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fdec156

Branch: refs/heads/release-1.5
Commit: 3fdec156467e40ac662b28a41d642f0531d8c55b
Parents: 9435cd4
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 3 21:13:36 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 4 13:20:45 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  2 +-
 .../flink/runtime/dispatcher/DispatcherId.java  | 28 ++++++------
 .../runtime/entrypoint/ClusterEntrypoint.java   |  4 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  2 +-
 .../flink/runtime/jobmaster/JobMasterId.java    | 41 +++++++++++------
 .../flink/runtime/minicluster/MiniCluster.java  |  4 +-
 .../resourcemanager/JobLeaderIdService.java     |  2 +-
 .../resourcemanager/ResourceManager.java        |  2 +-
 .../resourcemanager/ResourceManagerId.java      | 48 +++++++++++++-------
 .../runtime/taskexecutor/JobLeaderService.java  |  5 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../slotpool/SlotPoolSchedulingTestBase.java    |  2 +-
 12 files changed, 84 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c05255b..58ffda3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -698,7 +698,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	 */
 	@Override
 	public void grantLeadership(final UUID newLeaderSessionID) {
-		final DispatcherId dispatcherId = new DispatcherId(newLeaderSessionID);
+		final DispatcherId dispatcherId = DispatcherId.fromUuid(newLeaderSessionID);
 		log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), dispatcherId);
 
 		final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoverJobs();

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
index e563090..403f72f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherId.java
@@ -29,29 +29,27 @@ public class DispatcherId extends AbstractID {
 
 	private static final long serialVersionUID = -1654056277003743966L;
 
-	public DispatcherId(byte[] bytes) {
-		super(bytes);
-	}
-
-	public DispatcherId(long lowerPart, long upperPart) {
-		super(lowerPart, upperPart);
-	}
-
-	public DispatcherId(AbstractID id) {
-		super(id);
-	}
-
-	public DispatcherId() {}
+	private DispatcherId() {}
 
-	public DispatcherId(UUID uuid) {
-		this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+	private DispatcherId(UUID uuid) {
+		super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
 	}
 
 	public UUID toUUID() {
 		return new UUID(getUpperPart(), getLowerPart());
 	}
 
+	/**
+	 * Generates a new random DispatcherId.
+	 */
 	public static DispatcherId generate() {
 		return new DispatcherId();
 	}
+
+	/**
+	 * Creates a new DispatcherId that corresponds to the UUID.
+	 */
+	public static DispatcherId fromUuid(UUID uuid) {
+		return new DispatcherId(uuid);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f7a5858..f823ea7 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -306,14 +306,14 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
 				rpcService,
 				DispatcherGateway.class,
-				DispatcherId::new,
+				DispatcherId::fromUuid,
 				10,
 				Time.milliseconds(50L));
 
 			LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
 				rpcService,
 				ResourceManagerGateway.class,
-				ResourceManagerId::new,
+				ResourceManagerId::fromUuid,
 				10,
 				Time.milliseconds(50L));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f5606fb..f30c119 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1452,7 +1452,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			runAsync(
 				() -> notifyOfNewResourceManagerLeader(
 					leaderAddress,
-					leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null));
+					ResourceManagerId.fromUuidOrNull(leaderSessionID)));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
index 39f7ded..0ebf913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.util.AbstractID;
 
+import javax.annotation.Nullable;
+
 import java.util.UUID;
 
 /**
@@ -29,30 +31,39 @@ public class JobMasterId extends AbstractID {
 
 	private static final long serialVersionUID = -933276753644003754L;
 
-	public JobMasterId(byte[] bytes) {
-		super(bytes);
-	}
-
-	public JobMasterId(long lowerPart, long upperPart) {
-		super(lowerPart, upperPart);
-	}
-
-	public JobMasterId(AbstractID id) {
-		super(id);
-	}
-
-	public JobMasterId() {
+	/**
+	 * Creates a JobMasterId that takes the bits from the given UUID.
+	 */
+	public JobMasterId(UUID uuid) {
+		super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
 	}
 
-	public JobMasterId(UUID uuid) {
-		this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+	/**
+	 * Generates a new random JobMasterId.
+	 */
+	private JobMasterId() {
+		super();
 	}
 
+	/**
+	 * Creates a UUID with the bits from this JobMasterId.
+	 */
 	public UUID toUUID() {
 		return new UUID(getUpperPart(), getLowerPart());
 	}
 
+	/**
+	 * Generates a new random JobMasterId.
+	 */
 	public static JobMasterId generate() {
 		return new JobMasterId();
 	}
+
+	/**
+	 * If the given uuid is null, this returns null, otherwise a JobMasterId that
+	 * corresponds to the UUID, via {@link #JobMasterId(UUID)}.
+	 */
+	public static JobMasterId fromUuidOrNull(@Nullable UUID uuid) {
+		return  uuid == null ? null : new JobMasterId(uuid);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 64d46c6..0c1644f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -324,13 +324,13 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
 				dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
 					jobManagerRpcService,
 					DispatcherGateway.class,
-					DispatcherId::new,
+					DispatcherId::fromUuid,
 					20,
 					Time.milliseconds(20L));
 				final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
 					jobManagerRpcService,
 					ResourceManagerGateway.class,
-					ResourceManagerId::new,
+					ResourceManagerId::fromUuid,
 					20,
 					Time.milliseconds(20L));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index da0a7fd..994db34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -188,7 +188,7 @@ public class JobLeaderIdService {
 
 		JobLeaderIdListener listener = jobLeaderIdListeners.get(jobId);
 
-		return listener.getLeaderIdFuture().thenApply((UUID id) -> id != null ? new JobMasterId(id) : null);
+		return listener.getLeaderIdFuture().thenApply(JobMasterId::fromUuidOrNull);
 	}
 
 	public boolean isValidTimeout(JobID jobId, UUID timeoutId) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c753469..af736b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -889,7 +889,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	public void grantLeadership(final UUID newLeaderSessionID) {
 		runAsyncWithoutFencing(
 			() -> {
-				final ResourceManagerId newResourceManagerId = new ResourceManagerId(newLeaderSessionID);
+				final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);
 
 				log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
index 3594e88..3405bb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.util.AbstractID;
 
+import javax.annotation.Nullable;
+
 import java.util.UUID;
 
 /**
@@ -29,30 +31,44 @@ public class ResourceManagerId extends AbstractID {
 
 	private static final long serialVersionUID = -6042820142662137374L;
 
-	public ResourceManagerId(byte[] bytes) {
-		super(bytes);
-	}
-
-	public ResourceManagerId(long lowerPart, long upperPart) {
-		super(lowerPart, upperPart);
-	}
-
-	public ResourceManagerId(AbstractID id) {
-		super(id);
-	}
-
-	public ResourceManagerId() {
-	}
+	/**
+	 * Generates a new random ResourceManagerId.
+	 */
+	private ResourceManagerId() {}
 
-	public ResourceManagerId(UUID uuid) {
-		this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+	/**
+	 * Creates a ResourceManagerId that takes the bits from the given UUID.
+	 */
+	private ResourceManagerId(UUID uuid) {
+		super(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
 	}
 
+	/**
+	 * Creates a UUID with the bits from this ResourceManagerId.
+	 */
 	public UUID toUUID() {
 		return new UUID(getUpperPart(), getLowerPart());
 	}
 
+	/**
+	 * Generates a new random ResourceManagerId.
+	 */
 	public static ResourceManagerId generate() {
 		return new ResourceManagerId();
 	}
+
+	/**
+	 * Creates a ResourceManagerId that corresponds to the given UUID.
+	 */
+	public static ResourceManagerId fromUuid(UUID uuid) {
+		return new ResourceManagerId(uuid);
+	}
+
+	/**
+	 * If the given uuid is null, this returns null, otherwise a ResourceManagerId that
+	 * corresponds to the UUID, via {@link #ResourceManagerId(UUID)}.
+	 */
+	public static ResourceManagerId fromUuidOrNull(@Nullable UUID uuid) {
+		return  uuid == null ? null : new ResourceManagerId(uuid);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 500d7e4..09ffd99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
@@ -277,12 +278,12 @@ public class JobLeaderService {
 		}
 
 		@Override
-		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) {
+		public void notifyLeaderAddress(final @Nullable String leaderAddress, final @Nullable UUID leaderId) {
 			if (stopped) {
 				LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " +
 					"However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId);
 			} else {
-				final JobMasterId jobMasterId = leaderId != null ? new JobMasterId(leaderId) : null;
+				final JobMasterId jobMasterId = JobMasterId.fromUuidOrNull(leaderId);
 
 				LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",
 					jobId, leaderAddress, jobMasterId);

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f665426..000d0c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1387,7 +1387,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 			runAsync(
 				() -> notifyOfNewResourceManagerLeader(
 					leaderAddress,
-					leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null));
+					ResourceManagerId.fromUuidOrNull(leaderSessionID)));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3fdec156/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
index 75b0d05..a457599 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
@@ -44,7 +44,7 @@ public class SlotPoolSchedulingTestBase extends TestLogger {
 
 	private static final JobID jobId = new JobID();
 
-	private static final JobMasterId jobMasterId = new JobMasterId();
+	private static final JobMasterId jobMasterId = JobMasterId.generate();
 
 	private static final String jobMasterAddress = "foobar";