You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:43 UTC
[24/52] [abbrv] flink git commit: [FLINK-4958] [tm] Send slot report
to RM when registering
[FLINK-4958] [tm] Send slot report to RM when registering
Fix failing test cases
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87341001
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87341001
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87341001
Branch: refs/heads/master
Commit: 873410010df4be494f3573c4adfc2cbbc3ad5d0b
Parents: 5776235
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Oct 28 15:04:00 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:25 2016 +0100
----------------------------------------------------------------------
.../runtime/taskexecutor/TaskExecutor.java | 16 +-
...TaskExecutorToResourceManagerConnection.java | 48 +++--
.../taskexecutor/slot/TaskSlotTable.java | 31 ++++
.../taskexecutor/TaskExecutorITCase.java | 183 +++++++++++++++++++
.../runtime/taskexecutor/TaskExecutorTest.java | 16 +-
5 files changed, 277 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index c94113c..f11cb98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -626,10 +626,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
- this,
+ getRpcService(),
+ getAddress(),
+ getResourceID(),
+ taskSlotTable.createSlotReport(getResourceID()),
newLeaderAddress,
newLeaderId,
- getMainThreadExecutor());
+ getMainThreadExecutor(),
+ new ForwardingFatalErrorHandler());
resourceManagerConnection.start();
}
}
@@ -1054,6 +1058,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
}
}
+ private final class ForwardingFatalErrorHandler implements FatalErrorHandler {
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ onFatalErrorAsync(exception);
+ }
+ }
+
private final class TaskManagerActionsImpl implements TaskManagerActions {
private final UUID jobMasterLeaderId;
private final JobMasterGateway jobMasterGateway;
http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 53f030e..6e3e39b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -22,12 +22,14 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import java.util.UUID;
@@ -41,29 +43,49 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class TaskExecutorToResourceManagerConnection
extends RegisteredRpcConnection<ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
- /** the TaskExecutor whose connection to the ResourceManager this represents */
- private final TaskExecutor taskExecutor;
+ private final RpcService rpcService;
+
+ private final String taskManagerAddress;
+
+ private final ResourceID taskManagerResourceId;
+
+ private final SlotReport slotReport;
+
+ private final FatalErrorHandler fatalErrorHandler;
private InstanceID registrationId;
public TaskExecutorToResourceManagerConnection(
Logger log,
- TaskExecutor taskExecutor,
+ RpcService rpcService,
+ String taskManagerAddress,
+ ResourceID taskManagerResourceId,
+ SlotReport slotReport,
String resourceManagerAddress,
UUID resourceManagerLeaderId,
- Executor executor) {
+ Executor executor,
+ FatalErrorHandler fatalErrorHandler) {
super(log, resourceManagerAddress, resourceManagerLeaderId, executor);
- this.taskExecutor = checkNotNull(taskExecutor);
+
+ this.rpcService = Preconditions.checkNotNull(rpcService);
+ this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress);
+ this.taskManagerResourceId = Preconditions.checkNotNull(taskManagerResourceId);
+ this.slotReport = Preconditions.checkNotNull(slotReport);
+ this.fatalErrorHandler = Preconditions.checkNotNull(fatalErrorHandler);
}
@Override
protected RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() {
return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
- log, taskExecutor.getRpcService(),
- getTargetAddress(), getTargetLeaderId(),
- taskExecutor.getAddress(),taskExecutor.getResourceID());
+ log,
+ rpcService,
+ getTargetAddress(),
+ getTargetLeaderId(),
+ taskManagerAddress,
+ taskManagerResourceId,
+ slotReport);
}
@Override
@@ -78,7 +100,7 @@ public class TaskExecutorToResourceManagerConnection
protected void onRegistrationFailure(Throwable failure) {
log.info("Failed to register at resource manager {}.", getTargetAddress(), failure);
- taskExecutor.onFatalErrorAsync(failure);
+ fatalErrorHandler.onFatalError(failure);
}
/**
@@ -100,17 +122,21 @@ public class TaskExecutorToResourceManagerConnection
private final ResourceID resourceID;
+ private final SlotReport slotReport;
+
ResourceManagerRegistration(
Logger log,
RpcService rpcService,
String targetAddress,
UUID leaderId,
String taskExecutorAddress,
- ResourceID resourceID) {
+ ResourceID resourceID,
+ SlotReport slotReport) {
super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId);
this.taskExecutorAddress = checkNotNull(taskExecutorAddress);
this.resourceID = checkNotNull(resourceID);
+ this.slotReport = checkNotNull(slotReport);
}
@Override
@@ -118,7 +144,7 @@ public class TaskExecutorToResourceManagerConnection
ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
Time timeout = Time.milliseconds(timeoutMillis);
- return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, new SlotReport(), timeout);
+ return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 88b83a0..081d8f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -21,8 +21,12 @@ package org.apache.flink.runtime.taskexecutor.slot;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
@@ -127,6 +131,33 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> {
}
// ---------------------------------------------------------------------
+ // Slot report methods
+ // ---------------------------------------------------------------------
+
+ public SlotReport createSlotReport(ResourceID resourceId) {
+ final int numberSlots = taskSlots.size();
+
+ List<SlotStatus> slotStatuses = Arrays.asList(new SlotStatus[numberSlots]);
+
+ for (int i = 0; i < numberSlots; i++) {
+ TaskSlot taskSlot = taskSlots.get(i);
+ SlotID slotId = new SlotID(resourceId, taskSlot.getIndex());
+
+ SlotStatus slotStatus = new SlotStatus(
+ slotId,
+ taskSlot.getResourceProfile(),
+ taskSlot.getJobId(),
+ taskSlot.getAllocationId());
+
+ slotStatuses.set(i, slotStatus);
+ }
+
+ final SlotReport slotReport = new SlotReport(slotStatuses);
+
+ return slotReport;
+ }
+
+ // ---------------------------------------------------------------------
// Slot methods
// ---------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
new file mode 100644
index 0000000..050db44
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TaskExecutorITCase {
+
+ @Test
+ public void testSlotAllocation() throws Exception {
+ TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+ TestingHighAvailabilityServices testingHAServices = new TestingHighAvailabilityServices();
+ final Configuration configuration = new Configuration();
+ final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+ final ResourceID taskManagerResourceId = new ResourceID("foobar");
+ final UUID rmLeaderId = UUID.randomUUID();
+ final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
+ final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
+ final String rmAddress = "rm";
+ final String jmAddress = "jm";
+ final UUID jmLeaderId = UUID.randomUUID();
+ final JobID jobId = new JobID();
+ final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1L);
+
+ testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+ testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+ testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress, jmLeaderId));
+
+ TestingSerialRpcService rpcService = new TestingSerialRpcService();
+ ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.milliseconds(500L), Time.milliseconds(500L));
+ SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+ JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHAServices);
+ MetricRegistry metricRegistry = mock(MetricRegistry.class);
+
+ final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+ final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(taskManagerResourceId, InetAddress.getLocalHost(), 1234);
+ final MemoryManager memoryManager = mock(MemoryManager.class);
+ final IOManager ioManager = mock(IOManager.class);
+ final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class);
+ final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
+ final BroadcastVariableManager broadcastVariableManager = mock(BroadcastVariableManager.class);
+ final FileCache fileCache = mock(FileCache.class);
+ final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService));
+ final JobManagerTable jobManagerTable = new JobManagerTable();
+ final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
+
+ ResourceManager<ResourceID> resourceManager = new StandaloneResourceManager(
+ rpcService,
+ resourceManagerConfiguration,
+ testingHAServices,
+ slotManagerFactory,
+ metricRegistry,
+ jobLeaderIdService,
+ testingFatalErrorHandler);
+
+ TaskExecutor taskExecutor = new TaskExecutor(
+ taskManagerConfiguration,
+ taskManagerLocation,
+ rpcService,
+ memoryManager,
+ ioManager,
+ networkEnvironment,
+ testingHAServices,
+ metricRegistry,
+ taskManagerMetricGroup,
+ broadcastVariableManager,
+ fileCache,
+ taskSlotTable,
+ jobManagerTable,
+ jobLeaderService,
+ testingFatalErrorHandler);
+
+ JobMasterGateway jmGateway = mock(JobMasterGateway.class);
+
+ when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class)))
+ .thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
+ when(jmGateway.getAddress()).thenReturn(jmAddress);
+
+
+ rpcService.registerGateway(rmAddress, resourceManager.getSelf());
+ rpcService.registerGateway(jmAddress, jmGateway);
+
+ final AllocationID allocationId = new AllocationID();
+ final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile);
+ final SlotOffer slotOffer = new SlotOffer(allocationId, 0, resourceProfile);
+
+ try {
+ resourceManager.start();
+ taskExecutor.start();
+
+ // notify the RM that it is the leader
+ rmLeaderElectionService.isLeader(rmLeaderId);
+
+ // notify the TM about the new RM leader
+ rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId);
+
+ Future<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(rmLeaderId, jmLeaderId, jmAddress, jobId);
+
+ RegistrationResponse registrationResponse = registrationResponseFuture.get();
+
+ assertTrue(registrationResponse instanceof JobMasterRegistrationSuccess);
+
+ resourceManager.requestSlot(jmLeaderId, rmLeaderId, slotRequest);
+
+ verify(jmGateway).offerSlots(
+ eq(taskManagerResourceId),
+ (Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)),
+ eq(jmLeaderId), any(Time.class));
+ } finally {
+ if (testingFatalErrorHandler.hasExceptionOccurred()) {
+ testingFatalErrorHandler.rethrowError();
+ }
+ }
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/87341001/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 1ef7140..2af97b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -119,6 +119,10 @@ public class TaskExecutorTest extends TestLogger {
NonHaServices haServices = new NonHaServices(resourceManagerAddress);
+ final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+ final SlotReport slotReport = new SlotReport();
+ when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+
TaskExecutor taskManager = new TaskExecutor(
taskManagerServicesConfiguration,
taskManagerLocation,
@@ -131,7 +135,7 @@ public class TaskExecutorTest extends TestLogger {
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
- mock(TaskSlotTable.class),
+ taskSlotTable,
mock(JobManagerTable.class),
mock(JobLeaderService.class),
mock(FatalErrorHandler.class));
@@ -140,7 +144,7 @@ public class TaskExecutorTest extends TestLogger {
String taskManagerAddress = taskManager.getAddress();
verify(rmGateway).registerTaskExecutor(
- any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+ any(UUID.class), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
}
finally {
rpc.stopService();
@@ -178,6 +182,10 @@ public class TaskExecutorTest extends TestLogger {
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
when(taskManagerLocation.getHostname()).thenReturn("foobar");
+ final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+ final SlotReport slotReport = new SlotReport();
+ when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+
TaskExecutor taskManager = new TaskExecutor(
taskManagerServicesConfiguration,
taskManagerLocation,
@@ -190,7 +198,7 @@ public class TaskExecutorTest extends TestLogger {
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
- mock(TaskSlotTable.class),
+ taskSlotTable,
mock(JobManagerTable.class),
mock(JobLeaderService.class),
mock(FatalErrorHandler.class));
@@ -215,7 +223,7 @@ public class TaskExecutorTest extends TestLogger {
testLeaderService.notifyListener(address2, leaderId2);
verify(rmGateway2).registerTaskExecutor(
- eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+ eq(leaderId2), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
assertNotNull(taskManager.getResourceManagerConnection());
}
finally {