You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:15 UTC
[08/50] [abbrv] flink git commit: [FLINK-5810] [flip-6] Multiple
small cleanups across Resource Manager related code
[FLINK-5810] [flip-6] Multiple small cleanups across Resource Manager related code
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/759f46ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/759f46ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/759f46ea
Branch: refs/heads/table-retraction
Commit: 759f46ea6716e2f9da98002ed881be2fe6d7ab39
Parents: 59aefb5
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Mar 16 18:53:21 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Apr 28 15:28:25 2017 +0200
----------------------------------------------------------------------
.../types/ResourceIDRetrievable.java | 4 +++-
.../runtime/clusterframework/types/SlotID.java | 20 ++++++--------------
.../runtime/concurrent/ScheduledExecutor.java | 2 +-
.../flink/runtime/messages/Acknowledge.java | 3 ++-
.../registration/TaskExecutorConnection.java | 7 ++++---
.../slotmanager/PendingSlotRequest.java | 10 +++++++++-
.../taskexecutor/TaskExecutorGateway.java | 2 --
.../slotmanager/SlotProtocolTest.java | 16 ++++++++--------
8 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
index b45d53c..e65840d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceIDRetrievable.java
@@ -22,6 +22,8 @@ package org.apache.flink.runtime.clusterframework.types;
*/
public interface ResourceIDRetrievable {
+ /**
+ * Gets the ResourceID of the object.
+ */
ResourceID getResourceID();
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index d6409b6..83c28b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -24,7 +24,11 @@ import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * Unique identifier for a slot which located in TaskManager.
+ * Unique identifier for a slot on a TaskManager. This ID is constant across the
+ * life time of the TaskManager.
+ *
+ * <p>In contrast, the {@link AllocationID} represents the a slot allocation and changes
+ * every time the slot is allocated by a JobManager.
*/
public class SlotID implements ResourceIDRetrievable, Serializable {
@@ -66,10 +70,7 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
SlotID slotID = (SlotID) o;
- if (slotNumber != slotID.slotNumber) {
- return false;
- }
- return resourceId.equals(slotID.resourceId);
+ return slotNumber == slotID.slotNumber && resourceId.equals(slotID.resourceId);
}
@Override
@@ -83,13 +84,4 @@ public class SlotID implements ResourceIDRetrievable, Serializable {
public String toString() {
return resourceId + "_" + slotNumber;
}
-
- /**
- * Generate a random slot id.
- *
- * @return A random slot id.
- */
- public static SlotID generate() {
- return new SlotID(ResourceID.generate(), 0);
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
index c1b47e2..d09cfc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.java
@@ -82,7 +82,7 @@ public interface ScheduledExecutor extends Executor {
* @param delay the time between the end of the current and the start of the next execution
* @param unit the time unit of the initial delay and the delay parameter
* @return a ScheduledFuture representing the repeatedly executed task. This future never
- * completes unless th exectuion of the given task fails or if the future is cancelled
+ * completes unless the execution of the given task fails or if the future is cancelled
*/
ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command,
http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
index 4bbc50a..2c08ec9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/Acknowledge.java
@@ -62,8 +62,9 @@ public class Acknowledge implements Serializable {
/**
* Read resolve to preserve the singleton object property.
+ * (per best practices, this should have visibility 'protected')
*/
- private Object readResolve() throws java.io.ObjectStreamException {
+ protected Object readResolve() throws java.io.ObjectStreamException {
return INSTANCE;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
index e4522f2..babe5b9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.resourcemanager.registration;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* This class is responsible for grouping the TaskExecutorGateway and the InstanceID
* of a registered task executor.
@@ -29,11 +31,11 @@ public class TaskExecutorConnection {
private final InstanceID instanceID;
- private TaskExecutorGateway taskExecutorGateway;
+ private final TaskExecutorGateway taskExecutorGateway;
public TaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
this.instanceID = new InstanceID();
- this.taskExecutorGateway = taskExecutorGateway;
+ this.taskExecutorGateway = checkNotNull(taskExecutorGateway);
}
public InstanceID getInstanceID() {
@@ -43,5 +45,4 @@ public class TaskExecutorConnection {
public TaskExecutorGateway getTaskExecutorGateway() {
return taskExecutorGateway;
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index 894f146..1195791 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
import java.util.UUID;
import java.util.concurrent.ScheduledFuture;
@@ -33,16 +34,21 @@ public class PendingSlotRequest {
private final SlotRequest slotRequest;
+ @Nullable
private CompletableFuture<Acknowledge> requestFuture;
+ @Nullable
private UUID timeoutIdentifier;
+ @Nullable
private ScheduledFuture<?> timeoutFuture;
public PendingSlotRequest(SlotRequest slotRequest) {
this.slotRequest = Preconditions.checkNotNull(slotRequest);
}
+ // ------------------------------------------------------------------------
+
public AllocationID getAllocationId() {
return slotRequest.getAllocationId();
}
@@ -51,6 +57,7 @@ public class PendingSlotRequest {
return slotRequest.getResourceProfile();
}
+ @Nullable
public UUID getTimeoutIdentifier() {
return timeoutIdentifier;
}
@@ -67,10 +74,11 @@ public class PendingSlotRequest {
return null != requestFuture;
}
- public void setRequestFuture(CompletableFuture<Acknowledge> requestFuture) {
+ public void setRequestFuture(@Nullable CompletableFuture<Acknowledge> requestFuture) {
this.requestFuture = requestFuture;
}
+ @Nullable
public CompletableFuture<Acknowledge> getRequestFuture() {
return requestFuture;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index bedf8ec..d4afdbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
-import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskmanager.Task;
import java.util.UUID;
@@ -47,7 +46,6 @@ public interface TaskExecutorGateway extends RpcGateway {
* @param slotId slot id for the request
* @param allocationId id for the request
* @param resourceManagerLeaderId current leader id of the ResourceManager
- * @throws SlotAllocationException if the slot allocation fails
* @return answer to the slot request
*/
Future<Acknowledge> requestSlot(
http://git-wip-us.apache.org/repos/asf/flink/blob/759f46ea/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a72969e..c09316c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -33,8 +34,8 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
+
import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@@ -54,13 +55,12 @@ public class SlotProtocolTest extends TestLogger {
private static final long timeout = 10000L;
+ private static final ScheduledExecutorService scheduledExecutorService =
+ new ScheduledThreadPoolExecutor(1);
- private static ScheduledExecutorService scheduledExecutorService;
- @BeforeClass
- public static void beforeClass() {
- scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
- }
+ private static final ScheduledExecutor scheduledExecutor =
+ new ScheduledExecutorServiceAdapter(scheduledExecutorService);
@AfterClass
public static void afterClass() {
@@ -81,7 +81,7 @@ public class SlotProtocolTest extends TestLogger {
final UUID rmLeaderID = UUID.randomUUID();
try (SlotManager slotManager = new SlotManager(
- new ScheduledExecutorServiceAdapter(scheduledExecutorService),
+ scheduledExecutor,
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime())) {
@@ -144,7 +144,7 @@ public class SlotProtocolTest extends TestLogger {
.thenReturn(mock(FlinkFuture.class));
try (SlotManager slotManager = new SlotManager(
- new ScheduledExecutorServiceAdapter(scheduledExecutorService),
+ scheduledExecutor,
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime(),
TestingUtils.infiniteTime())) {