You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/28 10:53:17 UTC
[flink] 01/04: [hotfix][coordination] Move utility profiles to
FineGrainedSlotManagerTestBase
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e6cf590838a950672e81e526b71656e4056ce37b
Author: Yangze Guo <ka...@gmail.com>
AuthorDate: Wed Feb 24 18:15:15 2021 +0800
[hotfix][coordination] Move utility profiles to FineGrainedSlotManagerTestBase
---
.../AbstractFineGrainedSlotManagerITCase.java | 34 +++---
...gerDefaultResourceAllocationStrategyITCase.java | 57 +---------
.../slotmanager/FineGrainedSlotManagerTest.java | 119 ++++++---------------
.../FineGrainedSlotManagerTestBase.java | 36 +++----
4 files changed, 70 insertions(+), 176 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
index 296e406..2a8c9d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
@@ -122,7 +122,7 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
jobId,
targetAddress,
Collections.singleton(
- ResourceRequirement.create(getDefaultSlotResourceProfile(), 1)));
+ ResourceRequirement.create(DEFAULT_SLOT_RESOURCE_PROFILE, 1)));
final CompletableFuture<
Tuple6<
@@ -157,8 +157,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
.registerTaskManager(
taskExecutorConnection,
new SlotReport(),
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
}
getSlotManager().processResourceRequirements(requirements);
@@ -170,8 +170,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
.registerTaskManager(
taskExecutorConnection,
new SlotReport(),
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
}
assertThat(
@@ -185,7 +185,7 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
FUTURE_TIMEOUT_SECOND,
TimeUnit.SECONDS)
.f2,
- getDefaultSlotResourceProfile(),
+ DEFAULT_SLOT_RESOURCE_PROFILE,
targetAddress,
getResourceManagerId()))));
@@ -291,8 +291,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
.registerTaskManager(
taskManagerConnection,
slotReport,
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
getSlotManager().processResourceRequirements(resourceRequirements1);
@@ -372,13 +372,13 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
getSlotManager()
.processResourceRequirements(
createResourceRequirements(
- jobId, getDefaultNumberSlotsPerWorker()));
+ jobId, DEFAULT_NUM_SLOTS_PER_WORKER));
assertThat(resourceRequests.get(), is(1));
getSlotManager()
.processResourceRequirements(
createResourceRequirements(
- jobId, getDefaultNumberSlotsPerWorker() + 1));
+ jobId, DEFAULT_NUM_SLOTS_PER_WORKER + 1));
assertThat(resourceRequests.get(), is(2));
});
}
@@ -427,8 +427,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
.registerTaskManager(
taskManagerConnection,
slotReport,
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
getSlotManager().processResourceRequirements(resourceRequirements);
@@ -505,8 +505,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
.registerTaskManager(
taskExecutionConnection,
slotReport,
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
getSlotManager()
.unregisterTaskManager(
taskExecutionConnection.getInstanceID(),
@@ -561,8 +561,8 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
.registerTaskManager(
taskExecutionConnection,
slotReport,
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
getSlotManager()
.reportSlotStatus(
taskExecutionConnection.getInstanceID(),
@@ -571,7 +571,7 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
allocationId.get(
FUTURE_TIMEOUT_SECOND,
TimeUnit.SECONDS),
- getDefaultSlotResourceProfile())));
+ DEFAULT_SLOT_RESOURCE_PROFILE)));
assertThat(
trackingSecurityManager.getSystemExitFuture().isDone(),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
index bb976cf1..ef2e529 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase.java
@@ -19,7 +19,6 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.junit.Test;
@@ -34,58 +33,8 @@ import static org.junit.Assert.assertThat;
*/
public class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
extends AbstractFineGrainedSlotManagerITCase {
- private static final WorkerResourceSpec DEFAULT_WORKER_RESOURCE_SPEC =
- new WorkerResourceSpec.Builder()
- .setCpuCores(10.0)
- .setTaskHeapMemoryMB(1000)
- .setTaskOffHeapMemoryMB(1000)
- .setNetworkMemoryMB(1000)
- .setManagedMemoryMB(1000)
- .build();
- private static final WorkerResourceSpec LARGE_WORKER_RESOURCE_SPEC =
- new WorkerResourceSpec.Builder()
- .setCpuCores(100.0)
- .setTaskHeapMemoryMB(10000)
- .setTaskOffHeapMemoryMB(10000)
- .setNetworkMemoryMB(10000)
- .setManagedMemoryMB(10000)
- .build();
- private static final int DEFAULT_NUM_SLOTS_PER_WORKER = 2;
- private static final ResourceProfile DEFAULT_TOTAL_RESOURCE_PROFILE =
- SlotManagerUtils.generateTaskManagerTotalResourceProfile(DEFAULT_WORKER_RESOURCE_SPEC);
- private static final ResourceProfile DEFAULT_SLOT_RESOURCE_PROFILE =
- SlotManagerUtils.generateDefaultSlotResourceProfile(
- DEFAULT_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
- private static final ResourceProfile LARGE_TOTAL_RESOURCE_PROFILE =
- SlotManagerUtils.generateTaskManagerTotalResourceProfile(LARGE_WORKER_RESOURCE_SPEC);
- private static final ResourceProfile LARGE_SLOT_RESOURCE_PROFILE =
- SlotManagerUtils.generateDefaultSlotResourceProfile(
- LARGE_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
-
- @Override
- protected ResourceProfile getDefaultTaskManagerResourceProfile() {
- return DEFAULT_TOTAL_RESOURCE_PROFILE;
- }
-
- @Override
- protected ResourceProfile getDefaultSlotResourceProfile() {
- return DEFAULT_SLOT_RESOURCE_PROFILE;
- }
-
- @Override
- protected int getDefaultNumberSlotsPerWorker() {
- return DEFAULT_NUM_SLOTS_PER_WORKER;
- }
-
- @Override
- protected ResourceProfile getLargeTaskManagerResourceProfile() {
- return LARGE_TOTAL_RESOURCE_PROFILE;
- }
-
- @Override
- protected ResourceProfile getLargeSlotResourceProfile() {
- return LARGE_SLOT_RESOURCE_PROFILE;
- }
+ private static final ResourceProfile OTHER_SLOT_RESOURCE_PROFILE =
+ DEFAULT_TOTAL_RESOURCE_PROFILE.multiply(2);
@Override
protected Optional<ResourceAllocationStrategy> getResourceAllocationStrategy() {
@@ -114,7 +63,7 @@ public class FineGrainedSlotManagerDefaultResourceAllocationStrategyITCase
getSlotManager()
.processResourceRequirements(
createResourceRequirements(
- new JobID(), 1, getLargeSlotResourceProfile()));
+ new JobID(), 1, OTHER_SLOT_RESOURCE_PROFILE));
assertThat(resourceRequests.get(), is(0));
});
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
index 4d01336..dbb0e42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
@@ -59,58 +58,10 @@ import static org.junit.Assert.assertTrue;
/** Tests of {@link FineGrainedSlotManager}. */
public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
- private static final WorkerResourceSpec DEFAULT_WORKER_RESOURCE_SPEC =
- new WorkerResourceSpec.Builder()
- .setCpuCores(10.0)
- .setTaskHeapMemoryMB(1000)
- .setTaskOffHeapMemoryMB(1000)
- .setNetworkMemoryMB(1000)
- .setManagedMemoryMB(1000)
- .build();
- private static final WorkerResourceSpec LARGE_WORKER_RESOURCE_SPEC =
- new WorkerResourceSpec.Builder()
- .setCpuCores(100.0)
- .setTaskHeapMemoryMB(10000)
- .setTaskOffHeapMemoryMB(10000)
- .setNetworkMemoryMB(10000)
- .setManagedMemoryMB(10000)
- .build();
- private static final int DEFAULT_NUM_SLOTS_PER_WORKER = 2;
- private static final ResourceProfile DEFAULT_TOTAL_RESOURCE_PROFILE =
- SlotManagerUtils.generateTaskManagerTotalResourceProfile(DEFAULT_WORKER_RESOURCE_SPEC);
- private static final ResourceProfile DEFAULT_SLOT_RESOURCE_PROFILE =
- SlotManagerUtils.generateDefaultSlotResourceProfile(
- DEFAULT_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
- private static final ResourceProfile LARGE_TOTAL_RESOURCE_PROFILE =
- SlotManagerUtils.generateTaskManagerTotalResourceProfile(LARGE_WORKER_RESOURCE_SPEC);
private static final ResourceProfile LARGE_SLOT_RESOURCE_PROFILE =
- SlotManagerUtils.generateDefaultSlotResourceProfile(
- LARGE_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
-
- @Override
- protected ResourceProfile getDefaultTaskManagerResourceProfile() {
- return DEFAULT_TOTAL_RESOURCE_PROFILE;
- }
-
- @Override
- protected ResourceProfile getDefaultSlotResourceProfile() {
- return DEFAULT_SLOT_RESOURCE_PROFILE;
- }
-
- @Override
- protected int getDefaultNumberSlotsPerWorker() {
- return DEFAULT_NUM_SLOTS_PER_WORKER;
- }
-
- @Override
- protected ResourceProfile getLargeTaskManagerResourceProfile() {
- return LARGE_TOTAL_RESOURCE_PROFILE;
- }
-
- @Override
- protected ResourceProfile getLargeSlotResourceProfile() {
- return LARGE_SLOT_RESOURCE_PROFILE;
- }
+ DEFAULT_TOTAL_RESOURCE_PROFILE.multiply(2);
+ private static final ResourceProfile LARGE_TOTAL_RESOURCE_PROFILE =
+ LARGE_SLOT_RESOURCE_PROFILE.multiply(2);
@Override
protected Optional<ResourceAllocationStrategy> getResourceAllocationStrategy() {
@@ -146,12 +97,12 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
.registerTaskManager(
taskManagerConnection,
new SlotReport(),
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
assertThat(
getSlotManager().getNumberRegisteredSlots(),
- equalTo(getDefaultNumberSlotsPerWorker()));
+ equalTo(DEFAULT_NUM_SLOTS_PER_WORKER));
assertThat(
getTaskManagerTracker().getRegisteredTaskManagers().size(),
equalTo(1));
@@ -166,14 +117,14 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
taskManagerConnection.getInstanceID())
.get()
.getAvailableResource(),
- equalTo(getDefaultTaskManagerResourceProfile()));
+ equalTo(DEFAULT_TOTAL_RESOURCE_PROFILE));
assertThat(
getTaskManagerTracker()
.getRegisteredTaskManager(
taskManagerConnection.getInstanceID())
.get()
.getTotalResource(),
- equalTo(getDefaultTaskManagerResourceProfile()));
+ equalTo(DEFAULT_TOTAL_RESOURCE_PROFILE));
});
}
};
@@ -191,7 +142,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
final AllocationID allocationId = new AllocationID();
final SlotReport slotReport =
new SlotReport(
- createAllocatedSlotStatus(allocationId, getDefaultSlotResourceProfile()));
+ createAllocatedSlotStatus(allocationId, DEFAULT_SLOT_RESOURCE_PROFILE));
new Context() {
{
runTest(
@@ -200,8 +151,8 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
.registerTaskManager(
taskManagerConnection,
slotReport,
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
assertThat(
getTaskManagerTracker().getRegisteredTaskManagers().size(),
is(1));
@@ -235,7 +186,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
final SlotReport slotReportWithAllocatedSlot =
new SlotReport(
createAllocatedSlotStatus(
- new AllocationID(), getDefaultSlotResourceProfile()));
+ new AllocationID(), DEFAULT_SLOT_RESOURCE_PROFILE));
new Context() {
{
runTest(
@@ -243,15 +194,15 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
getTaskManagerTracker()
.addPendingTaskManager(
new PendingTaskManager(
- getDefaultTaskManagerResourceProfile(),
- getDefaultNumberSlotsPerWorker()));
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_NUM_SLOTS_PER_WORKER));
// task manager with allocated slot cannot deduct pending task manager
getSlotManager()
.registerTaskManager(
taskExecutionConnection1,
slotReportWithAllocatedSlot,
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
assertThat(
getTaskManagerTracker().getPendingTaskManagers().size(), is(1));
// task manager with mismatched resource cannot deduct pending task
@@ -260,16 +211,16 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
.registerTaskManager(
taskExecutionConnection2,
new SlotReport(),
- getLargeTaskManagerResourceProfile(),
- getLargeSlotResourceProfile());
+ LARGE_TOTAL_RESOURCE_PROFILE,
+ LARGE_SLOT_RESOURCE_PROFILE);
assertThat(
getTaskManagerTracker().getPendingTaskManagers().size(), is(1));
getSlotManager()
.registerTaskManager(
taskExecutionConnection3,
new SlotReport(),
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
assertThat(
getTaskManagerTracker().getPendingTaskManagers().size(), is(0));
});
@@ -340,7 +291,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
.addAllocationOnRegisteredResource(
jobId,
taskManagerConnection.getInstanceID(),
- getDefaultSlotResourceProfile())
+ DEFAULT_SLOT_RESOURCE_PROFILE)
.build()));
runTest(
() -> {
@@ -348,8 +299,8 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
.registerTaskManager(
taskManagerConnection,
slotReport,
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
getSlotManager()
.processResourceRequirements(
createResourceRequirements(jobId, 1));
@@ -364,7 +315,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
requestSlotFuture.get(
FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
assertEquals(jobId, requestSlot.f1);
- assertEquals(getDefaultSlotResourceProfile(), requestSlot.f3);
+ assertEquals(DEFAULT_SLOT_RESOURCE_PROFILE, requestSlot.f3);
});
}
};
@@ -386,8 +337,8 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
ResourceAllocationResult.builder()
.addPendingTaskManagerAllocate(
new PendingTaskManager(
- getDefaultTaskManagerResourceProfile(),
- getDefaultNumberSlotsPerWorker()))
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_NUM_SLOTS_PER_WORKER))
.build()));
runTest(
() -> {
@@ -405,7 +356,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
final JobID jobId = new JobID();
final PendingTaskManager pendingTaskManager =
new PendingTaskManager(
- getDefaultTaskManagerResourceProfile(), getDefaultNumberSlotsPerWorker());
+ DEFAULT_TOTAL_RESOURCE_PROFILE, DEFAULT_NUM_SLOTS_PER_WORKER);
final CompletableFuture<
Tuple6<
SlotID,
@@ -434,7 +385,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
.addAllocationOnPendingResource(
jobId,
pendingTaskManager.getPendingTaskManagerId(),
- getDefaultSlotResourceProfile())
+ DEFAULT_SLOT_RESOURCE_PROFILE)
.build()));
runTest(
() -> {
@@ -445,8 +396,8 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
.registerTaskManager(
taskManagerConnection,
new SlotReport(),
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
final Tuple6<
SlotID,
JobID,
@@ -458,7 +409,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
requestSlotFuture.get(
FUTURE_TIMEOUT_SECOND, TimeUnit.SECONDS);
assertEquals(jobId, requestSlot.f1);
- assertEquals(getDefaultSlotResourceProfile(), requestSlot.f3);
+ assertEquals(DEFAULT_SLOT_RESOURCE_PROFILE, requestSlot.f3);
});
}
};
@@ -547,9 +498,9 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
new SlotReport(
createAllocatedSlotStatus(
allocationId,
- getDefaultSlotResourceProfile())),
- getDefaultTaskManagerResourceProfile(),
- getDefaultSlotResourceProfile());
+ DEFAULT_SLOT_RESOURCE_PROFILE)),
+ DEFAULT_TOTAL_RESOURCE_PROFILE,
+ DEFAULT_SLOT_RESOURCE_PROFILE);
assertEquals(
getSlotManager().getTaskManagerIdleSince(instanceId),
Long.MAX_VALUE);
@@ -570,7 +521,7 @@ public class FineGrainedSlotManagerTest extends FineGrainedSlotManagerTestBase {
// SlotManager. The receiver of the callback can then decide what to do
// with the TaskManager.
assertEquals(
- getDefaultNumberSlotsPerWorker(),
+ DEFAULT_NUM_SLOTS_PER_WORKER,
getSlotManager().getNumberRegisteredSlots());
getSlotManager()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
index 4929b2c5..efc3003 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManagerTestBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.metrics.groups.SlotManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
@@ -48,27 +49,20 @@ public abstract class FineGrainedSlotManagerTestBase extends TestLogger {
private static final Executor MAIN_THREAD_EXECUTOR = Executors.directExecutor();
static final FlinkException TEST_EXCEPTION = new FlinkException("Test exception");
static final long FUTURE_TIMEOUT_SECOND = 5;
-
- /** Resource profile for the default task manager. */
- protected abstract ResourceProfile getDefaultTaskManagerResourceProfile();
-
- /** Resource profile for the default slot and requirement. */
- protected abstract ResourceProfile getDefaultSlotResourceProfile();
-
- /** The number of slot for the default task manager. */
- protected abstract int getDefaultNumberSlotsPerWorker();
-
- /**
- * Resource profile for a larger task manager, which can fulfill both the larger and the default
- * slots.
- */
- protected abstract ResourceProfile getLargeTaskManagerResourceProfile();
-
- /**
- * Resource profile for a larger slot or requirement, which can be fulfilled by the task manager
- * and cannot be fulfilled by the default task manager.
- */
- protected abstract ResourceProfile getLargeSlotResourceProfile();
+ static final WorkerResourceSpec DEFAULT_WORKER_RESOURCE_SPEC =
+ new WorkerResourceSpec.Builder()
+ .setCpuCores(10.0)
+ .setTaskHeapMemoryMB(1000)
+ .setTaskOffHeapMemoryMB(1000)
+ .setNetworkMemoryMB(1000)
+ .setManagedMemoryMB(1000)
+ .build();
+ static final int DEFAULT_NUM_SLOTS_PER_WORKER = 2;
+ static final ResourceProfile DEFAULT_TOTAL_RESOURCE_PROFILE =
+ SlotManagerUtils.generateTaskManagerTotalResourceProfile(DEFAULT_WORKER_RESOURCE_SPEC);
+ static final ResourceProfile DEFAULT_SLOT_RESOURCE_PROFILE =
+ SlotManagerUtils.generateDefaultSlotResourceProfile(
+ DEFAULT_WORKER_RESOURCE_SPEC, DEFAULT_NUM_SLOTS_PER_WORKER);
protected abstract Optional<ResourceAllocationStrategy> getResourceAllocationStrategy();