You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/07 14:08:28 UTC

[30/30] flink git commit: [FLINK-7870] [tests] Add SlotPool test to verify cancellation of failed slot requests

[FLINK-7870] [tests] Add SlotPool test to verify cancellation of failed slot requests

Adds the SlotPoolTest#testSlotRequestCancellationUponFailingRequest.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/755ae519
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/755ae519
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/755ae519

Branch: refs/heads/master
Commit: 755ae519255f146aac49784af7bbe049d2c1fd13
Parents: 902425f
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Nov 6 12:16:04 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Nov 7 15:07:46 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/instance/SlotPoolTest.java    |  65 ++++++
 .../slotmanager/SlotManagerTest.java            |   9 -
 .../utils/TestingResourceManagerGateway.java    | 231 +++++++++++++++++++
 3 files changed, 296 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 5993dcb..f38894e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,22 +23,31 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -48,6 +57,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_MOCKS;
 import static org.mockito.Mockito.mock;
@@ -56,6 +66,8 @@ import static org.mockito.Mockito.when;
 
 public class SlotPoolTest extends TestLogger {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SlotPoolTest.class);
+
 	private final Time timeout = Time.seconds(10L);
 
 	private RpcService rpcService;
@@ -294,6 +306,59 @@ public class SlotPoolTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that a slot request is cancelled if it failed with an exception (e.g. TimeoutException).
+	 *
+	 * <p>See FLINK-7870
+	 */
+	@Test
+	public void testSlotRequestCancellationUponFailingRequest() throws Exception {
+		final SlotPool slotPool = new SlotPool(rpcService, jobId);
+		final CompletableFuture<Acknowledge> requestSlotFuture = new CompletableFuture<>();
+		final CompletableFuture<AllocationID> cancelSlotFuture = new CompletableFuture<>();
+		final CompletableFuture<AllocationID> requestSlotFutureAllocationId = new CompletableFuture<>();
+
+		final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+		resourceManagerGateway.setRequestSlotFuture(requestSlotFuture);
+		resourceManagerGateway.setRequestSlotConsumer(slotRequest -> requestSlotFutureAllocationId.complete(slotRequest.getAllocationId()));
+		resourceManagerGateway.setCancelSlotConsumer(allocationID -> cancelSlotFuture.complete(allocationID));
+
+		final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class));
+
+		try {
+			slotPool.start(JobMasterId.generate(), "localhost");
+
+			final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);
+
+			slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+			CompletableFuture<SimpleSlot> slotFuture = slotPoolGateway.allocateSlot(
+				scheduledUnit,
+				ResourceProfile.UNKNOWN,
+				Collections.emptyList(),
+				timeout);
+
+			requestSlotFuture.completeExceptionally(new FlinkException("Testing exception."));
+
+			try {
+				slotFuture.get();
+				fail("The slot future should not have been completed properly.");
+			} catch (Exception ignored) {
+				// expected
+			}
+
+			// check that a failure triggered the slot request cancellation
+			// with the correct allocation id
+			assertEquals(requestSlotFutureAllocationId.get(), cancelSlotFuture.get());
+		} finally {
+			try {
+				RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+			} catch (Exception e) {
+				LOG.warn("Could not properly terminate the SlotPool.", e);
+			}
+		}
+	}
+
 	private static ResourceManagerGateway createResourceManagerGatewayMock() {
 		ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
 		when(resourceManagerGateway

http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 55a9946..cf0aef9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -292,15 +292,6 @@ public class SlotManagerTest extends TestLogger {
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
 		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
-			// verify that if the request has not been assigned, should cancel the resource allocation
-			slotManager.registerSlotRequest(slotRequest);
-			PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId);
-			assertFalse(pendingSlotRequest.isAssigned());
-
-			slotManager.unregisterSlotRequest(allocationId);
-			pendingSlotRequest = slotManager.getSlotRequest(allocationId);
-			assertTrue(pendingSlotRequest == null);
-
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
 			TaskManagerSlot slot = slotManager.getSlot(slotId);

http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
new file mode 100644
index 0000000..f11a1eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of the {@link ResourceManagerGateway} for testing purposes solely.
+ */
+public class TestingResourceManagerGateway implements ResourceManagerGateway {
+
+	private final ResourceManagerId resourceManagerId;
+
+	private final ResourceID resourceId;
+
+	private final long heartbeatInterval;
+
+	private final String address;
+
+	private final String hostname;
+
+	private final AtomicReference<CompletableFuture<Acknowledge>> slotFutureReference;
+
+	private volatile Consumer<AllocationID> cancelSlotConsumer;
+
+	private volatile Consumer<SlotRequest> requestSlotConsumer;
+
+	public TestingResourceManagerGateway() {
+		this(
+			ResourceManagerId.generate(),
+			ResourceID.generate(),
+			10000L,
+			"localhost",
+			"localhost");
+	}
+
+	public TestingResourceManagerGateway(
+			ResourceManagerId resourceManagerId,
+			ResourceID resourceId,
+			long heartbeatInterval,
+			String address,
+			String hostname) {
+		this.resourceManagerId = Preconditions.checkNotNull(resourceManagerId);
+		this.resourceId = Preconditions.checkNotNull(resourceId);
+		this.heartbeatInterval = heartbeatInterval;
+		this.address = Preconditions.checkNotNull(address);
+		this.hostname = Preconditions.checkNotNull(hostname);
+		this.slotFutureReference = new AtomicReference<>();
+		this.cancelSlotConsumer = null;
+		this.requestSlotConsumer = null;
+	}
+
+	public void setRequestSlotFuture(CompletableFuture<Acknowledge> slotFuture) {
+		this.slotFutureReference.set(slotFuture);
+	}
+
+	public void setCancelSlotConsumer(Consumer<AllocationID> cancelSlotConsumer) {
+		this.cancelSlotConsumer = cancelSlotConsumer;
+	}
+
+	public void setRequestSlotConsumer(Consumer<SlotRequest> slotRequestConsumer) {
+		this.requestSlotConsumer = slotRequestConsumer;
+	}
+
+	@Override
+	public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) {
+		return CompletableFuture.completedFuture(
+			new JobMasterRegistrationSuccess(
+				heartbeatInterval,
+				resourceManagerId,
+				resourceId));
+	}
+
+	@Override
+	public CompletableFuture<Acknowledge> requestSlot(JobMasterId jobMasterId, SlotRequest slotRequest, Time timeout) {
+		Consumer<SlotRequest> currentRequestSlotConsumer = requestSlotConsumer;
+
+		if (currentRequestSlotConsumer != null) {
+			currentRequestSlotConsumer.accept(slotRequest);
+		}
+
+		CompletableFuture<Acknowledge> slotFuture = slotFutureReference.getAndSet(null);
+
+		if (slotFuture != null) {
+			return slotFuture;
+		} else {
+			return CompletableFuture.completedFuture(Acknowledge.get());
+		}
+	}
+
+	@Override
+	public void cancelSlotRequest(AllocationID allocationID) {
+		Consumer<AllocationID> currentCancelSlotConsumer = cancelSlotConsumer;
+
+		if (currentCancelSlotConsumer != null) {
+			currentCancelSlotConsumer.accept(allocationID);
+		}
+	}
+
+	@Override
+	public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, SlotReport slotReport, int dataPort, HardwareDescription hardwareDescription, Time timeout) {
+		return CompletableFuture.completedFuture(
+			new TaskExecutorRegistrationSuccess(
+				new InstanceID(),
+				resourceId,
+				heartbeatInterval));
+	}
+
+	@Override
+	public void notifySlotAvailable(InstanceID instanceId, SlotID slotID, AllocationID oldAllocationId) {
+
+	}
+
+	@Override
+	public void registerInfoMessageListener(String infoMessageListenerAddress) {
+
+	}
+
+	@Override
+	public void unRegisterInfoMessageListener(String infoMessageListenerAddress) {
+
+	}
+
+	@Override
+	public void shutDownCluster(ApplicationStatus finalStatus, String optionalDiagnostics) {
+
+	}
+
+	@Override
+	public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {
+		return CompletableFuture.completedFuture(0);
+	}
+
+	@Override
+	public void heartbeatFromTaskManager(ResourceID heartbeatOrigin, SlotReport slotReport) {
+
+	}
+
+	@Override
+	public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
+
+	}
+
+	@Override
+	public void disconnectTaskManager(ResourceID resourceID, Exception cause) {
+
+	}
+
+	@Override
+	public void disconnectJobManager(JobID jobId, Exception cause) {
+
+	}
+
+	@Override
+	public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) {
+		return CompletableFuture.completedFuture(Collections.emptyList());
+	}
+
+	@Override
+	public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, Time timeout) {
+		return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented"));
+	}
+
+	@Override
+	public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) {
+		return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented"));
+	}
+
+	@Override
+	public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+		return CompletableFuture.completedFuture(Collections.emptyList());
+	}
+
+	@Override
+	public ResourceManagerId getFencingToken() {
+		return resourceManagerId;
+	}
+
+	@Override
+	public String getAddress() {
+		return address;
+	}
+
+	@Override
+	public String getHostname() {
+		return hostname;
+	}
+}