You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/05/04 16:49:15 UTC
[3/4] flink git commit: [hotfix] [runtime] Add resourceId to
TaskManager registration messages
[hotfix] [runtime] Add resourceId to TaskManager registration messages
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ae81d4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ae81d4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ae81d4e
Branch: refs/heads/master
Commit: 6ae81d4e995ad637f0856367437a8f5d526e7bae
Parents: ae8dc5f
Author: Stephan Ewen <se...@apache.org>
Authored: Thu May 3 21:50:48 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri May 4 18:48:16 2018 +0200
----------------------------------------------------------------------
.../registration/TaskExecutorConnection.java | 10 +++-
.../registration/WorkerRegistration.java | 4 +-
.../slotmanager/SlotManager.java | 2 +-
.../slotmanager/SlotManagerTest.java | 61 +++++++++++---------
.../slotmanager/SlotProtocolTest.java | 9 +--
5 files changed, 52 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6ae81d4e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
index babe5b9..edd6c24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorConnection.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.resourcemanager.registration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -29,15 +30,22 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class TaskExecutorConnection {
+ private final ResourceID resourceID;
+
private final InstanceID instanceID;
private final TaskExecutorGateway taskExecutorGateway;
- public TaskExecutorConnection(TaskExecutorGateway taskExecutorGateway) {
+ public TaskExecutorConnection(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) {
+ this.resourceID = checkNotNull(resourceID);
this.instanceID = new InstanceID();
this.taskExecutorGateway = checkNotNull(taskExecutorGateway);
}
+ public ResourceID getResourceID() {
+ return resourceID;
+ }
+
public InstanceID getInstanceID() {
return instanceID;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6ae81d4e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
index eaa3c03..67925e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -39,7 +39,9 @@ public class WorkerRegistration<WorkerType extends ResourceIDRetrievable> extend
WorkerType worker,
int dataPort,
HardwareDescription hardwareDescription) {
- super(taskExecutorGateway);
+
+ super(worker.getResourceID(), taskExecutorGateway);
+
this.worker = Preconditions.checkNotNull(worker);
this.dataPort = dataPort;
this.hardwareDescription = Preconditions.checkNotNull(hardwareDescription);
http://git-wip-us.apache.org/repos/asf/flink/blob/6ae81d4e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 6cdd997..fe503b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -319,7 +319,7 @@ public class SlotManager implements AutoCloseable {
public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
checkInit();
- LOG.info("Register TaskManager {} at the SlotManager.", taskExecutorConnection.getInstanceID());
+ LOG.info("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID(), taskExecutorConnection.getInstanceID());
// we identify task managers by their instance id
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6ae81d4e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index f504d77..59de473 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -86,9 +86,9 @@ public class SlotManagerTest extends TestLogger {
final ResourceActions resourceManagerActions = mock(ResourceActions.class);
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final ResourceID resourceId = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
- ResourceID resourceId = ResourceID.generate();
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
@@ -124,9 +124,9 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(new CompletableFuture<>());
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final ResourceID resourceId = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
- ResourceID resourceId = ResourceID.generate();
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final AllocationID allocationId1 = new AllocationID();
@@ -251,7 +251,7 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -278,7 +278,8 @@ public class SlotManagerTest extends TestLogger {
public void testUnregisterPendingSlotRequest() throws Exception {
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
final ResourceActions resourceManagerActions = mock(ResourceActions.class);
- final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+ final ResourceID resourceID = ResourceID.generate();
+ final SlotID slotId = new SlotID(resourceID, 0);
final AllocationID allocationId = new AllocationID();
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -296,7 +297,7 @@ public class SlotManagerTest extends TestLogger {
final SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, resourceProfile, "foobar");
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
slotManager.registerTaskManager(taskManagerConnection, slotReport);
@@ -348,7 +349,7 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -388,7 +389,7 @@ public class SlotManagerTest extends TestLogger {
// accept an incoming slot request
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -451,10 +452,11 @@ public class SlotManagerTest extends TestLogger {
final JobID jobId = new JobID();
final AllocationID allocationId = new AllocationID();
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
- final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+ final ResourceID resourceID = ResourceID.generate();
+ final SlotID slotId = new SlotID(resourceID, 0);
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -491,9 +493,11 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final ResourceID resourceID = ResourceID.generate();
+
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
- final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+ final SlotID slotId = new SlotID(resourceID, 0);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -536,9 +540,10 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final ResourceID resourceID = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
- final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+ final SlotID slotId = new SlotID(resourceID, 0);
final SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2));
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -619,7 +624,7 @@ public class SlotManagerTest extends TestLogger {
final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus1));
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
// check that we don't have any slots registered
@@ -658,11 +663,12 @@ public class SlotManagerTest extends TestLogger {
final ResourceActions resourceManagerActions = mock(ResourceActions.class);
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+ final ResourceID resourceID = ResourceID.generate();
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
- final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+ final SlotID slotId = new SlotID(resourceID, 0);
final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
final SlotReport slotReport = new SlotReport(slotStatus);
@@ -765,9 +771,9 @@ public class SlotManagerTest extends TestLogger {
any(ResourceManagerId.class),
any(Time.class))).thenReturn(slotRequestFuture1, slotRequestFuture2);
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
-
final ResourceID resourceId = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
+
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
@@ -843,9 +849,9 @@ public class SlotManagerTest extends TestLogger {
any(ResourceManagerId.class),
any(Time.class))).thenReturn(slotRequestFuture1, CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
-
final ResourceID resourceId = ResourceID.generate();
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
+
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
@@ -963,7 +969,7 @@ public class SlotManagerTest extends TestLogger {
eq(resourceManagerId),
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
- final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway);
final SlotID slotId1 = new SlotID(resourceId, 0);
final SlotID slotId2 = new SlotID(resourceId, 1);
@@ -1042,12 +1048,13 @@ public class SlotManagerTest extends TestLogger {
public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
final Time taskManagerTimeout = Time.milliseconds(10L);
final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
+ final ResourceID resourceID = ResourceID.generate();
final ResourceActions resourceActions = mock(ResourceActions.class);
final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
- final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway);
final SlotStatus slotStatus = new SlotStatus(
- new SlotID(ResourceID.generate(), 0),
+ new SlotID(resourceID, 0),
new ResourceProfile(1.0, 1));
final SlotReport initialSlotReport = new SlotReport(slotStatus);
@@ -1082,10 +1089,10 @@ public class SlotManagerTest extends TestLogger {
*/
@Test
public void testReportAllocatedSlot() throws Exception {
+ final ResourceID taskManagerId = ResourceID.generate();
final ResourceActions resourceActions = mock(ResourceActions.class);
final TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGateway();
- final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
- final ResourceID taskManagerId = ResourceID.generate();
+ final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskManagerId, taskExecutorGateway);
try (final SlotManager slotManager = new SlotManager(
TestingUtils.defaultScheduledExecutor(),
http://git-wip-us.apache.org/repos/asf/flink/blob/6ae81d4e/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index b3e5e91..eac530d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
@@ -59,11 +60,11 @@ public class SlotProtocolTest extends TestLogger {
private static final long timeout = 10000L;
- private static final ScheduledExecutorService scheduledExecutorService =
+ private static final ScheduledExecutorService scheduledExecutorService =
new ScheduledThreadPoolExecutor(1);
- private static final ScheduledExecutor scheduledExecutor =
+ private static final ScheduledExecutor scheduledExecutor =
new ScheduledExecutorServiceAdapter(scheduledExecutorService);
@AfterClass
@@ -118,7 +119,7 @@ public class SlotProtocolTest extends TestLogger {
final SlotReport slotReport =
new SlotReport(Collections.singletonList(slotStatus));
// register slot at SlotManager
- slotManager.registerTaskManager(new TaskExecutorConnection(taskExecutorGateway), slotReport);
+ slotManager.registerTaskManager(new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
// 4) Slot becomes available and TaskExecutor gets a SlotRequest
verify(taskExecutorGateway, timeout(5000L))
@@ -166,7 +167,7 @@ public class SlotProtocolTest extends TestLogger {
new SlotReport(Collections.singletonList(slotStatus));
// register slot at SlotManager
slotManager.registerTaskManager(
- new TaskExecutorConnection(taskExecutorGateway), slotReport);
+ new TaskExecutorConnection(resourceID, taskExecutorGateway), slotReport);
final String targetAddress = "foobar";