You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:15 UTC

[08/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Multiple small cleanups across Resource Manager related code

[FLINK-5810] [flip-6] Multiple small cleanups across Resource Manager related code


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/759f46ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/759f46ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/759f46ea

Branch: refs/heads/table-retraction
Commit: 759f46ea6716e2f9da98002ed881be2fe6d7ab39
Parents: 59aefb5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Mar 16 18:53:21 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 28 15:28:25 2017 +0200

----------------------------------------------------------------------
 .../types/ResourceIDRetrievable.java            |  4 +++-
 .../runtime/clusterframework/types/SlotID.java  | 20 ++++++--------------
 .../runtime/concurrent/ScheduledExecutor.java   |  2 +-
 .../flink/runtime/messages/Acknowledge.java     |  3 ++-
 .../registration/TaskExecutorConnection.java    |  7 ++++---
 .../slotmanager/PendingSlotRequest.java         | 10 +++++++++-
 .../taskexecutor/TaskExecutorGateway.java       |  2 --
 .../slotmanager/SlotProtocolTest.java           | 16 ++++++++--------
 8 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
index b45d53c..e65840d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
@@ -22,6 +22,8 @@ package org.apache.flink.runtime.clusterframework.types;
  */
 public interface ResourceIDRetrievable {
 
+	/**
+	 * Gets the ResourceID of the object.
+	 */
 	ResourceID getResourceID();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index d6409b6..83c28b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -24,7 +24,11 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Unique identifier for a slot which located in TaskManager.
+ * Unique identifier for a slot on a TaskManager. This ID is constant across the
+ * life time of the TaskManager.
+ * 
+ * <p>In contrast, the {@link AllocationID} represents the a slot allocation and changes
+ * every time the slot is allocated by a JobManager.
  */
 public class SlotID implements ResourceIDRetrievable, Serializable {
 
@@ -66,10 +70,7 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
 
 		SlotID slotID = (SlotID) o;
 
-		if (slotNumber != slotID.slotNumber) {
-			return false;
-		}
-		return resourceId.equals(slotID.resourceId);
+		return slotNumber == slotID.slotNumber && resourceId.equals(slotID.resourceId);
 	}
 
 	@Override
@@ -83,13 +84,4 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
 	public String toString() {
 		return resourceId + "_" + slotNumber;
 	}
-
-	/**
-	 * Generate a random slot id.
-	 *
-	 * @return A random slot id.
-	 */
-	public static SlotID generate() {
-		return new SlotID(ResourceID.generate(), 0);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
index c1b47e2..d09cfc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
@@ -82,7 +82,7 @@ public interface ScheduledExecutor extends Executor {
 	 * @param delay the time between the end of the current and the start of the next execution
 	 * @param unit the time unit of the initial delay and the delay parameter
 	 * @return a ScheduledFuture representing the repeatedly executed task. This future never
-	 * completes unless th exectuion of the given task fails or if the future is cancelled
+	 * completes unless the execution of the given task fails or if the future is cancelled
 	 */
 	ScheduledFuture<?> scheduleWithFixedDelay(
 		Runnable command,

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
index 4bbc50a..2c08ec9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
@@ -62,8 +62,9 @@ public class Acknowledge implements Serializable {
 
 	/**
 	 * Read resolve to preserve the singleton object property.
+	 * (per best practices, this should have visibility 'protected')
 	 */
-	private Object readResolve() throws java.io.ObjectStreamException {
+	protected Object readResolve() throws java.io.ObjectStreamException {
 		return INSTANCE;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
index e4522f2..babe5b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.resourcemanager.registration;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * This class is responsible for grouping the TaskExecutorGateway and the InstanceID
  * of a registered task executor.
@@ -29,11 +31,11 @@ public class TaskExecutorConnection {
 
 	private final InstanceID instanceID;
 
-	private TaskExecutorGateway taskExecutorGateway;
+	private final TaskExecutorGateway taskExecutorGateway;
 
 	public TaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
 		this.instanceID = new InstanceID();
-		this.taskExecutorGateway = taskExecutorGateway;
+		this.taskExecutorGateway = checkNotNull(taskExecutorGateway);
 	}
 
 	public InstanceID getInstanceID() {
@@ -43,5 +45,4 @@ public class TaskExecutorConnection {
 	public TaskExecutorGateway getTaskExecutorGateway() {
 		return taskExecutorGateway;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index 894f146..1195791 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
 import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 
@@ -33,16 +34,21 @@ public class PendingSlotRequest {
 
 	private final SlotRequest slotRequest;
 
+	@Nullable
 	private CompletableFuture<Acknowledge> requestFuture;
 
+	@Nullable
 	private UUID timeoutIdentifier;
 
+	@Nullable
 	private ScheduledFuture<?> timeoutFuture;
 
 	public PendingSlotRequest(SlotRequest slotRequest) {
 		this.slotRequest = Preconditions.checkNotNull(slotRequest);
 	}
 
+	// ------------------------------------------------------------------------
+
 	public AllocationID getAllocationId() {
 		return slotRequest.getAllocationId();
 	}
@@ -51,6 +57,7 @@ public class PendingSlotRequest {
 		return slotRequest.getResourceProfile();
 	}
 
+	@Nullable
 	public UUID getTimeoutIdentifier() {
 		return timeoutIdentifier;
 	}
@@ -67,10 +74,11 @@ public class PendingSlotRequest {
 		return null != requestFuture;
 	}
 
-	public void setRequestFuture(CompletableFuture<Acknowledge> requestFuture) {
+	public void setRequestFuture(@Nullable CompletableFuture<Acknowledge> requestFuture) {
 		this.requestFuture = requestFuture;
 	}
 
+	@Nullable
 	public CompletableFuture<Acknowledge> getRequestFuture() {
 		return requestFuture;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index bedf8ec..d4afdbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskmanager.Task;
 
 import java.util.UUID;
@@ -47,7 +46,6 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param slotId slot id for the request
 	 * @param allocationId id for the request
 	 * @param resourceManagerLeaderId current leader id of the ResourceManager
-	 * @throws SlotAllocationException if the slot allocation fails
 	 * @return answer to the slot request
 	 */
 	Future<Acknowledge> requestSlot(

http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a72969e..c09316c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -33,8 +34,8 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -54,13 +55,12 @@ public class SlotProtocolTest extends TestLogger {
 
 	private static final long timeout = 10000L;
 
+	private static final ScheduledExecutorService scheduledExecutorService = 
+			new ScheduledThreadPoolExecutor(1);
 
-	private static ScheduledExecutorService scheduledExecutorService;
 
-	@BeforeClass
-	public static void beforeClass() {
-		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
-	}
+	private static final ScheduledExecutor scheduledExecutor = 
+			new ScheduledExecutorServiceAdapter(scheduledExecutorService);
 
 	@AfterClass
 	public static void afterClass() {
@@ -81,7 +81,7 @@ public class SlotProtocolTest extends TestLogger {
 		final UUID rmLeaderID = UUID.randomUUID();
 
 		try (SlotManager slotManager = new SlotManager(
-			new ScheduledExecutorServiceAdapter(scheduledExecutorService),
+			scheduledExecutor,
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime())) {
@@ -144,7 +144,7 @@ public class SlotProtocolTest extends TestLogger {
 			.thenReturn(mock(FlinkFuture.class));
 
 		try (SlotManager slotManager = new SlotManager(
-			new ScheduledExecutorServiceAdapter(scheduledExecutorService),
+			scheduledExecutor,
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime())) {