You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/16 15:09:00 UTC

[GitHub] asfgit closed pull request #6856: [FLINK-10565] [tests] Refactor SchedulerTestBase

asfgit closed pull request #6856: [FLINK-10565] [tests] Refactor SchedulerTestBase
URL: https://github.com/apache/flink/pull/6856
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
index 6c5a4468cfc..f4cd0357416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java
@@ -32,8 +32,6 @@
 import org.apache.flink.util.FlinkException;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.util.function.Predicate;
 
@@ -47,15 +45,10 @@
  * Additional {@link ExecutionGraph} restart tests {@link ExecutionGraphRestartTest} which
  * require the usage of a {@link SlotProvider}.
  */
-@RunWith(Parameterized.class)
 public class ExecutionGraphCoLocationRestartTest extends SchedulerTestBase {
 
 	private static final int NUM_TASKS = 31;
 
-	public ExecutionGraphCoLocationRestartTest(SchedulerType schedulerType) {
-		super(schedulerType);
-	}
-
 	@Test
 	public void testConstraintsAfterRestart() throws Exception {
 		final long timeout = 5000L;
@@ -79,10 +72,10 @@ public void testConstraintsAfterRestart() throws Exception {
 			groupVertex,
 			groupVertex2);
 
-		if (schedulerType == SchedulerType.SLOT_POOL) {
-			// enable the queued scheduling for the slot pool
-			eg.setQueuedSchedulingAllowed(true);
-		}
+
+		// enable the queued scheduling for the slot pool
+		eg.setQueuedSchedulingAllowed(true);
+
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
@@ -122,7 +115,7 @@ private void validateConstraints(ExecutionGraph eg) {
 
 		ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
 
-		for(int i = 0; i < NUM_TASKS; i++){
+		for (int i = 0; i < NUM_TASKS; i++) {
 			CoLocationConstraint constr1 = tasks[0].getTaskVertices()[i].getLocationConstraint();
 			CoLocationConstraint constr2 = tasks[1].getTaskVertices()[i].getLocationConstraint();
 			assertThat(constr1.isAssigned(), is(true));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index ed3c361fd56..eaa85e3a3d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -27,8 +27,6 @@
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
@@ -40,13 +38,8 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-@RunWith(Parameterized.class)
 public class ScheduleWithCoLocationHintTest extends SchedulerTestBase {
 
-	public ScheduleWithCoLocationHintTest(SchedulerType schedulerType) {
-		super(schedulerType);
-	}
-
 	@Test
 	public void scheduleAllSharedAndCoLocated() throws Exception {
 		JobVertexID jid1 = new JobVertexID();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 2abf9fbddad..03da3b14d21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -27,8 +27,6 @@
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -53,12 +51,7 @@
 /**
  * Tests for the {@link Scheduler} when scheduling individual tasks.
  */
-@RunWith(Parameterized.class)
 public class SchedulerIsolatedTasksTest extends SchedulerTestBase {
-
-	public SchedulerIsolatedTasksTest(SchedulerType schedulerType) {
-		super(schedulerType);
-	}
 	
 	@Test
 	public void testScheduleImmediately() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index 067001dd67a..20e221d5bc2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -27,8 +27,6 @@
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.util.Collections;
 import java.util.Random;
@@ -52,13 +50,8 @@
 /**
  * Tests for the scheduler when scheduling tasks in slot sharing groups.
  */
-@RunWith(Parameterized.class)
 public class SchedulerSlotSharingTest extends SchedulerTestBase {
 
-	public SchedulerSlotSharingTest(SchedulerType schedulerType) {
-		super(schedulerType);
-	}
-
 	@Test
 	public void scheduleSingleVertexType() {
 		try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
index 8888064c254..683b0cde52f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java
@@ -25,7 +25,6 @@
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -52,20 +51,14 @@
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.SchedulerType.SCHEDULER;
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.SchedulerType.SLOT_POOL;
-import static org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
-
 /**
  * Test base for scheduler related test cases. The test are
  * executed with the {@link Scheduler} and the {@link SlotPool}.
@@ -74,49 +67,21 @@
 
 	protected TestingSlotProvider testingSlotProvider;
 
-	protected SchedulerType schedulerType;
-
 	private RpcService rpcService;
 
-	public enum SchedulerType {
-		SCHEDULER,
-		SLOT_POOL
-	}
-
-	@Parameterized.Parameters(name = "Scheduler type = {0}")
-	public static Collection<Object[]> schedulerTypes() {
-		return Arrays.asList(
-			new Object[]{SCHEDULER},
-			new Object[]{SLOT_POOL});
-	}
-
-	protected SchedulerTestBase(SchedulerType schedulerType) {
-		this.schedulerType = Preconditions.checkNotNull(schedulerType);
-		rpcService = null;
-	}
-
 	@Before
 	public void setup() throws Exception {
-		switch (schedulerType) {
-			case SCHEDULER:
-				testingSlotProvider = new TestingSchedulerSlotProvider(
-					new Scheduler(
-						TestingUtils.defaultExecutionContext()));
-				break;
-			case SLOT_POOL:
-				rpcService = new TestingRpcService();
-				final JobID jobId = new JobID();
-				final TestingSlotPool slotPool = new TestingSlotPool(
-					rpcService,
-					jobId,
-					LocationPreferenceSchedulingStrategy.getInstance());
-				testingSlotProvider = new TestingSlotPoolSlotProvider(slotPool);
-
-				final JobMasterId jobMasterId = JobMasterId.generate();
-				final String jobManagerAddress = "localhost";
-				slotPool.start(jobMasterId, jobManagerAddress);
-				break;
-		}
+		rpcService = new TestingRpcService();
+		final JobID jobId = new JobID();
+		final TestingSlotPool slotPool = new TestingSlotPool(
+			rpcService,
+			jobId,
+			LocationPreferenceSchedulingStrategy.getInstance());
+		testingSlotProvider = new TestingSlotPoolSlotProvider(slotPool);
+
+		final JobMasterId jobMasterId = JobMasterId.generate();
+		final String jobManagerAddress = "localhost";
+		slotPool.start(jobMasterId, jobManagerAddress);
 	}
 
 	@After
@@ -154,86 +119,6 @@ public void teardown() throws Exception {
 		void shutdown() throws Exception;
 	}
 
-	private static final class TestingSchedulerSlotProvider implements TestingSlotProvider {
-		private final Scheduler scheduler;
-
-		private TestingSchedulerSlotProvider(Scheduler scheduler) {
-			this.scheduler = Preconditions.checkNotNull(scheduler);
-		}
-
-		@Override
-		public CompletableFuture<LogicalSlot> allocateSlot(
-			SlotRequestId slotRequestId,
-			ScheduledUnit task,
-			boolean allowQueued,
-			SlotProfile slotProfile,
-			Time allocationTimeout) {
-			return scheduler.allocateSlot(task, allowQueued, slotProfile, allocationTimeout);
-		}
-
-		@Override
-		public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) {
-			return CompletableFuture.completedFuture(Acknowledge.get());
-		}
-
-		@Override
-		public TaskManagerLocation addTaskManager(int numberSlots) {
-			final Instance instance = getRandomInstance(numberSlots);
-			scheduler.newInstanceAvailable(instance);
-
-			return instance.getTaskManagerLocation();
-		}
-
-		@Override
-		public void releaseTaskManager(ResourceID resourceId) {
-			final Instance instance = scheduler.getInstance(resourceId);
-
-			if (instance != null) {
-				scheduler.instanceDied(instance);
-			}
-		}
-
-		@Override
-		public int getNumberOfAvailableSlots() {
-			return scheduler.getNumberOfAvailableSlots();
-		}
-
-		@Override
-		public int getNumberOfLocalizedAssignments() {
-			return scheduler.getNumberOfLocalizedAssignments();
-		}
-
-		@Override
-		public int getNumberOfNonLocalizedAssignments() {
-			return scheduler.getNumberOfNonLocalizedAssignments();
-		}
-
-		@Override
-		public int getNumberOfUnconstrainedAssignments() {
-			return scheduler.getNumberOfUnconstrainedAssignments();
-		}
-
-		@Override
-		public int getNumberOfHostLocalizedAssignments() {
-			return 0;
-		}
-
-		@Override
-		public int getNumberOfSlots(SlotSharingGroup slotSharingGroup) {
-			return slotSharingGroup.getTaskAssignment().getNumberOfSlots();
-		}
-
-		@Override
-		public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexId) {
-			return slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexId);
-		}
-
-		@Override
-		public void shutdown() {
-			scheduler.shutdown();
-		}
-	}
-
 	private static final class TestingSlotPoolSlotProvider implements TestingSlotProvider {
 
 		private final TestingSlotPool slotPool;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services