You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:28 UTC
[30/30] flink git commit: [FLINK-7870] [tests] Add SlotPool test to
verify cancellation of failed slot requests
[FLINK-7870] [tests] Add SlotPool test to verify cancellation of failed slot requests
Adds the SlotPoolTest#testSlotRequestCancellationUponFailingRequest.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/755ae519
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/755ae519
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/755ae519
Branch: refs/heads/master
Commit: 755ae519255f146aac49784af7bbe049d2c1fd13
Parents: 902425f
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 6 12:16:04 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:46 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/instance/SlotPoolTest.java | 65 ++++++
.../slotmanager/SlotManagerTest.java | 9 -
.../utils/TestingResourceManagerGateway.java | 231 +++++++++++++++++++
3 files changed, 296 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 5993dcb..f38894e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,22 +23,31 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -48,6 +57,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_MOCKS;
import static org.mockito.Mockito.mock;
@@ -56,6 +66,8 @@ import static org.mockito.Mockito.when;
public class SlotPoolTest extends TestLogger {
+ private static final Logger LOG = LoggerFactory.getLogger(SlotPoolTest.class);
+
private final Time timeout = Time.seconds(10L);
private RpcService rpcService;
@@ -294,6 +306,59 @@ public class SlotPoolTest extends TestLogger {
}
}
+ /**
+ * Tests that a slot request is cancelled if it failed with an exception (e.g. TimeoutException).
+ *
+ * <p>See FLINK-7870
+ */
+ @Test
+ public void testSlotRequestCancellationUponFailingRequest() throws Exception {
+ final SlotPool slotPool = new SlotPool(rpcService, jobId);
+ final CompletableFuture<Acknowledge> requestSlotFuture = new CompletableFuture<>();
+ final CompletableFuture<AllocationID> cancelSlotFuture = new CompletableFuture<>();
+ final CompletableFuture<AllocationID> requestSlotFutureAllocationId = new CompletableFuture<>();
+
+ final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+ resourceManagerGateway.setRequestSlotFuture(requestSlotFuture);
+ resourceManagerGateway.setRequestSlotConsumer(slotRequest -> requestSlotFutureAllocationId.complete(slotRequest.getAllocationId()));
+ resourceManagerGateway.setCancelSlotConsumer(allocationID -> cancelSlotFuture.complete(allocationID));
+
+ final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class));
+
+ try {
+ slotPool.start(JobMasterId.generate(), "localhost");
+
+ final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
+
+ slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+ CompletableFuture<SimpleSlot> slotFuture = slotPoolGateway.allocateSlot(
+ scheduledUnit,
+ ResourceProfile.UNKNOWN,
+ Collections.emptyList(),
+ timeout);
+
+ requestSlotFuture.completeExceptionally(new FlinkException("Testing exception."));
+
+ try {
+ slotFuture.get();
+ fail("The slot future should not have been completed properly.");
+ } catch (Exception ignored) {
+ // expected
+ }
+
+ // check that a failure triggered the slot request cancellation
+ // with the correct allocation id
+ assertEquals(requestSlotFutureAllocationId.get(), cancelSlotFuture.get());
+ } finally {
+ try {
+ RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+ } catch (Exception e) {
+ LOG.warn("Could not properly terminate the SlotPool.", e);
+ }
+ }
+ }
+
private static ResourceManagerGateway createResourceManagerGatewayMock() {
ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
when(resourceManagerGateway
http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 55a9946..cf0aef9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -292,15 +292,6 @@ public class SlotManagerTest extends TestLogger {
final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
- // verify that if the request has not been assigned, should cancel the resource allocation
- slotManager.registerSlotRequest(slotRequest);
- PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId);
- assertFalse(pendingSlotRequest.isAssigned());
-
- slotManager.unregisterSlotRequest(allocationId);
- pendingSlotRequest = slotManager.getSlotRequest(allocationId);
- assertTrue(pendingSlotRequest == null);
-
slotManager.registerTaskManager(taskManagerConnection, slotReport);
TaskManagerSlot slot = slotManager.getSlot(slotId);
http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
new file mode 100644
index 0000000..f11a1eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of the {@link ResourceManagerGateway} for testing purposes solely.
+ */
+public class TestingResourceManagerGateway implements ResourceManagerGateway {
+
+ private final ResourceManagerId resourceManagerId;
+
+ private final ResourceID resourceId;
+
+ private final long heartbeatInterval;
+
+ private final String address;
+
+ private final String hostname;
+
+ private final AtomicReference<CompletableFuture<Acknowledge>> slotFutureReference;
+
+ private volatile Consumer<AllocationID> cancelSlotConsumer;
+
+ private volatile Consumer<SlotRequest> requestSlotConsumer;
+
+ public TestingResourceManagerGateway() {
+ this(
+ ResourceManagerId.generate(),
+ ResourceID.generate(),
+ 10000L,
+ "localhost",
+ "localhost");
+ }
+
+ public TestingResourceManagerGateway(
+ ResourceManagerId resourceManagerId,
+ ResourceID resourceId,
+ long heartbeatInterval,
+ String address,
+ String hostname) {
+ this.resourceManagerId = Preconditions.checkNotNull(resourceManagerId);
+ this.resourceId = Preconditions.checkNotNull(resourceId);
+ this.heartbeatInterval = heartbeatInterval;
+ this.address = Preconditions.checkNotNull(address);
+ this.hostname = Preconditions.checkNotNull(hostname);
+ this.slotFutureReference = new AtomicReference<>();
+ this.cancelSlotConsumer = null;
+ this.requestSlotConsumer = null;
+ }
+
+ public void setRequestSlotFuture(CompletableFuture<Acknowledge> slotFuture) {
+ this.slotFutureReference.set(slotFuture);
+ }
+
+ public void setCancelSlotConsumer(Consumer<AllocationID> cancelSlotConsumer) {
+ this.cancelSlotConsumer = cancelSlotConsumer;
+ }
+
+ public void setRequestSlotConsumer(Consumer<SlotRequest> slotRequestConsumer) {
+ this.requestSlotConsumer = slotRequestConsumer;
+ }
+
+ @Override
+ public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) {
+ return CompletableFuture.completedFuture(
+ new JobMasterRegistrationSuccess(
+ heartbeatInterval,
+ resourceManagerId,
+ resourceId));
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> requestSlot(JobMasterId jobMasterId, SlotRequest slotRequest, Time timeout) {
+ Consumer<SlotRequest> currentRequestSlotConsumer = requestSlotConsumer;
+
+ if (currentRequestSlotConsumer != null) {
+ currentRequestSlotConsumer.accept(slotRequest);
+ }
+
+ CompletableFuture<Acknowledge> slotFuture = slotFutureReference.getAndSet(null);
+
+ if (slotFuture != null) {
+ return slotFuture;
+ } else {
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
+ }
+
+ @Override
+ public void cancelSlotRequest(AllocationID allocationID) {
+ Consumer<AllocationID> currentCancelSlotConsumer = cancelSlotConsumer;
+
+ if (currentCancelSlotConsumer != null) {
+ currentCancelSlotConsumer.accept(allocationID);
+ }
+ }
+
+ @Override
+ public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, SlotReport slotReport, int dataPort, HardwareDescription hardwareDescription, Time timeout) {
+ return CompletableFuture.completedFuture(
+ new TaskExecutorRegistrationSuccess(
+ new InstanceID(),
+ resourceId,
+ heartbeatInterval));
+ }
+
+ @Override
+ public void notifySlotAvailable(InstanceID instanceId, SlotID slotID, AllocationID oldAllocationId) {
+
+ }
+
+ @Override
+ public void registerInfoMessageListener(String infoMessageListenerAddress) {
+
+ }
+
+ @Override
+ public void unRegisterInfoMessageListener(String infoMessageListenerAddress) {
+
+ }
+
+ @Override
+ public void shutDownCluster(ApplicationStatus finalStatus, String optionalDiagnostics) {
+
+ }
+
+ @Override
+ public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {
+ return CompletableFuture.completedFuture(0);
+ }
+
+ @Override
+ public void heartbeatFromTaskManager(ResourceID heartbeatOrigin, SlotReport slotReport) {
+
+ }
+
+ @Override
+ public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
+
+ }
+
+ @Override
+ public void disconnectTaskManager(ResourceID resourceID, Exception cause) {
+
+ }
+
+ @Override
+ public void disconnectJobManager(JobID jobId, Exception cause) {
+
+ }
+
+ @Override
+ public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
+ @Override
+ public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, Time timeout) {
+ return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented"));
+ }
+
+ @Override
+ public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
+ return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented"));
+ }
+
+ @Override
+ public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+
+ @Override
+ public ResourceManagerId getFencingToken() {
+ return resourceManagerId;
+ }
+
+ @Override
+ public String getAddress() {
+ return address;
+ }
+
+ @Override
+ public String getHostname() {
+ return hostname;
+ }
+}