You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:43 UTC

[24/52] [abbrv] flink git commit: [FLINK-4958] [tm] Send slot report to RM when registering

[FLINK-4958] [tm] Send slot report to RM when registering

Fix failing test cases


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87341001
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87341001
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87341001

Branch: refs/heads/master
Commit: 873410010df4be494f3573c4adfc2cbbc3ad5d0b
Parents: 5776235
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 28 15:04:00 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:25 2016 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  16 +-
 ...TaskExecutorToResourceManagerConnection.java |  48 +++--
 .../taskexecutor/slot/TaskSlotTable.java        |  31 ++++
 .../taskexecutor/TaskExecutorITCase.java        | 183 +++++++++++++++++++
 .../runtime/taskexecutor/TaskExecutorTest.java  |  16 +-
 5 files changed, 277 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index c94113c..f11cb98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -626,10 +626,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			resourceManagerConnection =
 				new TaskExecutorToResourceManagerConnection(
 					log,
-					this,
+					getRpcService(),
+					getAddress(),
+					getResourceID(),
+					taskSlotTable.createSlotReport(getResourceID()),
 					newLeaderAddress,
 					newLeaderId,
-					getMainThreadExecutor());
+					getMainThreadExecutor(),
+					new ForwardingFatalErrorHandler());
 			resourceManagerConnection.start();
 		}
 	}
@@ -1054,6 +1058,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		}
 	}
 
+	private final class ForwardingFatalErrorHandler implements FatalErrorHandler {
+
+		@Override
+		public void onFatalError(Throwable exception) {
+			onFatalErrorAsync(exception);
+		}
+	}
+
 	private final class TaskManagerActionsImpl implements TaskManagerActions {
 		private final UUID jobMasterLeaderId;
 		private final JobMasterGateway jobMasterGateway;

http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 53f030e..6e3e39b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -22,12 +22,14 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.concurrent.Future;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import java.util.UUID;
@@ -41,29 +43,49 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 public class TaskExecutorToResourceManagerConnection
 		extends RegisteredRpcConnection<ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
 
-	/** the TaskExecutor whose connection to the ResourceManager this represents */
-	private final TaskExecutor taskExecutor;
+	private final RpcService rpcService;
+
+	private final String taskManagerAddress;
+
+	private final ResourceID taskManagerResourceId;
+
+	private final SlotReport slotReport;
+
+	private final FatalErrorHandler fatalErrorHandler;
 
 	private InstanceID registrationId;
 
 	public TaskExecutorToResourceManagerConnection(
 			Logger log,
-			TaskExecutor taskExecutor,
+			RpcService rpcService,
+			String taskManagerAddress,
+			ResourceID taskManagerResourceId,
+			SlotReport slotReport,
 			String resourceManagerAddress,
 			UUID resourceManagerLeaderId,
-			Executor executor) {
+			Executor executor,
+			FatalErrorHandler fatalErrorHandler) {
 
 		super(log, resourceManagerAddress, resourceManagerLeaderId, executor);
-		this.taskExecutor = checkNotNull(taskExecutor);
+
+		this.rpcService = Preconditions.checkNotNull(rpcService);
+		this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
+		this.taskManagerResourceId = Preconditions.checkNotNull(taskManagerResourceId);
+		this.slotReport = Preconditions.checkNotNull(slotReport);
+		this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
 	}
 
 
 	@Override
 	protected RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() {
 		return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
-			log, taskExecutor.getRpcService(),
-			getTargetAddress(), getTargetLeaderId(),
-			taskExecutor.getAddress(),taskExecutor.getResourceID());
+			log,
+			rpcService,
+			getTargetAddress(),
+			getTargetLeaderId(),
+			taskManagerAddress,
+			taskManagerResourceId,
+			slotReport);
 	}
 
 	@Override
@@ -78,7 +100,7 @@ public class TaskExecutorToResourceManagerConnection
 	protected void onRegistrationFailure(Throwable failure) {
 		log.info("Failed to register at resource manager {}.", getTargetAddress(), failure);
 
-		taskExecutor.onFatalErrorAsync(failure);
+		fatalErrorHandler.onFatalError(failure);
 	}
 
 	/**
@@ -100,17 +122,21 @@ public class TaskExecutorToResourceManagerConnection
 		
 		private final ResourceID resourceID;
 
+		private final SlotReport slotReport;
+
 		ResourceManagerRegistration(
 				Logger log,
 				RpcService rpcService,
 				String targetAddress,
 				UUID leaderId,
 				String taskExecutorAddress,
-				ResourceID resourceID) {
+				ResourceID resourceID,
+				SlotReport slotReport) {
 
 			super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId);
 			this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
 			this.resourceID = checkNotNull(resourceID);
+			this.slotReport = checkNotNull(slotReport);
 		}
 
 		@Override
@@ -118,7 +144,7 @@ public class TaskExecutorToResourceManagerConnection
 				ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
 
 			Time timeout = Time.milliseconds(timeoutMillis);
-			return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, new SlotReport(), timeout);
+			return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 88b83a0..081d8f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -21,8 +21,12 @@ package org.apache.flink.runtime.taskexecutor.slot;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -127,6 +131,33 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
 	}
 
 	// ---------------------------------------------------------------------
+	// Slot report methods
+	// ---------------------------------------------------------------------
+
+	public SlotReport createSlotReport(ResourceID resourceId) {
+		final int numberSlots = taskSlots.size();
+
+		List<SlotStatus> slotStatuses = Arrays.asList(new SlotStatus[numberSlots]);
+
+		for (int i = 0; i < numberSlots; i++) {
+			TaskSlot taskSlot = taskSlots.get(i);
+			SlotID slotId = new SlotID(resourceId, taskSlot.getIndex());
+
+			SlotStatus slotStatus = new SlotStatus(
+				slotId,
+				taskSlot.getResourceProfile(),
+				taskSlot.getJobId(),
+				taskSlot.getAllocationId());
+
+			slotStatuses.set(i, slotStatus);
+		}
+
+		final SlotReport slotReport = new SlotReport(slotStatuses);
+
+		return slotReport;
+	}
+
+	// ---------------------------------------------------------------------
 	// Slot methods
 	// ---------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
new file mode 100644
index 0000000..050db44
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TaskExecutorITCase {
+
+	@Test
+	public void testSlotAllocation() throws Exception {
+		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		TestingHighAvailabilityServices testingHAServices = new TestingHighAvailabilityServices();
+		final Configuration configuration = new Configuration();
+		final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+		final ResourceID taskManagerResourceId = new ResourceID("foobar");
+		final UUID rmLeaderId = UUID.randomUUID();
+		final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+		final String rmAddress = "rm";
+		final String jmAddress = "jm";
+		final UUID jmLeaderId = UUID.randomUUID();
+		final JobID jobId = new JobID();
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1L);
+
+		testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+		testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+		testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress, jmLeaderId));
+
+		TestingSerialRpcService rpcService = new TestingSerialRpcService();
+		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.milliseconds(500L), Time.milliseconds(500L));
+		SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHAServices);
+		MetricRegistry metricRegistry = mock(MetricRegistry.class);
+
+		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
+		final MemoryManager memoryManager = mock(MemoryManager.class);
+		final IOManager ioManager = mock(IOManager.class);
+		final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+		final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
+		final BroadcastVariableManager broadcastVariableManager = mock(BroadcastVariableManager.class);
+		final FileCache fileCache = mock(FileCache.class);
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService));
+		final JobManagerTable jobManagerTable = new JobManagerTable();
+		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
+
+		ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
+			rpcService,
+			resourceManagerConfiguration,
+			testingHAServices,
+			slotManagerFactory,
+			metricRegistry,
+			jobLeaderIdService,
+			testingFatalErrorHandler);
+
+		TaskExecutor taskExecutor = new TaskExecutor(
+			taskManagerConfiguration,
+			taskManagerLocation,
+			rpcService,
+			memoryManager,
+			ioManager,
+			networkEnvironment,
+			testingHAServices,
+			metricRegistry,
+			taskManagerMetricGroup,
+			broadcastVariableManager,
+			fileCache,
+			taskSlotTable,
+			jobManagerTable,
+			jobLeaderService,
+			testingFatalErrorHandler);
+
+		JobMasterGateway jmGateway = mock(JobMasterGateway.class);
+
+		when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class)))
+			.thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
+		when(jmGateway.getAddress()).thenReturn(jmAddress);
+
+
+		rpcService.registerGateway(rmAddress, resourceManager.getSelf());
+		rpcService.registerGateway(jmAddress, jmGateway);
+
+		final AllocationID allocationId = new AllocationID();
+		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile);
+		final SlotOffer slotOffer = new SlotOffer(allocationId, 0, resourceProfile);
+
+		try {
+			resourceManager.start();
+			taskExecutor.start();
+
+			// notify the RM that it is the leader
+			rmLeaderElectionService.isLeader(rmLeaderId);
+
+			// notify the TM about the new RM leader
+			rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId);
+
+			Future<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(rmLeaderId, jmLeaderId, jmAddress, jobId);
+
+			RegistrationResponse registrationResponse = registrationResponseFuture.get();
+
+			assertTrue(registrationResponse instanceof JobMasterRegistrationSuccess);
+
+			resourceManager.requestSlot(jmLeaderId, rmLeaderId, slotRequest);
+
+			verify(jmGateway).offerSlots(
+				eq(taskManagerResourceId),
+				(Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)),
+				eq(jmLeaderId), any(Time.class));
+		} finally {
+			if (testingFatalErrorHandler.hasExceptionOccurred()) {
+				testingFatalErrorHandler.rethrowError();
+			}
+		}
+
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 1ef7140..2af97b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -119,6 +119,10 @@ public class TaskExecutorTest extends TestLogger {
 
 			NonHaServices haServices = new NonHaServices(resourceManagerAddress);
 
+			final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+			final SlotReport slotReport = new SlotReport();
+			when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+
 			TaskExecutor taskManager = new TaskExecutor(
 				taskManagerServicesConfiguration,
 				taskManagerLocation,
@@ -131,7 +135,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
-				mock(TaskSlotTable.class),
+				taskSlotTable,
 				mock(JobManagerTable.class),
 				mock(JobLeaderService.class),
 				mock(FatalErrorHandler.class));
@@ -140,7 +144,7 @@ public class TaskExecutorTest extends TestLogger {
 			String taskManagerAddress = taskManager.getAddress();
 
 			verify(rmGateway).registerTaskExecutor(
-					any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+					any(UUID.class), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
 		}
 		finally {
 			rpc.stopService();
@@ -178,6 +182,10 @@ public class TaskExecutorTest extends TestLogger {
 			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
 			when(taskManagerLocation.getHostname()).thenReturn("foobar");
 
+			final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+			final SlotReport slotReport = new SlotReport();
+			when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+
 			TaskExecutor taskManager = new TaskExecutor(
 				taskManagerServicesConfiguration,
 				taskManagerLocation,
@@ -190,7 +198,7 @@ public class TaskExecutorTest extends TestLogger {
 				mock(TaskManagerMetricGroup.class),
 				mock(BroadcastVariableManager.class),
 				mock(FileCache.class),
-				mock(TaskSlotTable.class),
+				taskSlotTable,
 				mock(JobManagerTable.class),
 				mock(JobLeaderService.class),
 				mock(FatalErrorHandler.class));
@@ -215,7 +223,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address2, leaderId2);
 
 			verify(rmGateway2).registerTaskExecutor(
-					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+					eq(leaderId2), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 		}
 		finally {