You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/16 09:14:37 UTC
[flink] 02/07: [hotfix][runtime] Move shared static test methods of
slot allocator into ExecutionSlotAllocatorTestUtils
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a1c9a30e322b0b3adfeadc3d01a2cbfced9fa74e
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 11 10:37:44 2020 +0800
[hotfix][runtime] Move shared static test methods of slot allocator into ExecutionSlotAllocatorTestUtils
---
.../DefaultExecutionSlotAllocatorTest.java | 23 +-------
.../scheduler/ExecutionSlotAllocatorTestUtils.java | 61 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 21 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
index a6a559e..ad74357 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
@@ -54,6 +54,8 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
+import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
@@ -280,27 +282,6 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
new DefaultPreferredLocationsRetriever(stateLocationRetriever, inputsLocationsRetriever));
}
- private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(ExecutionVertexID... executionVertexIds) {
- List<ExecutionVertexSchedulingRequirements> schedulingRequirements = new ArrayList<>(executionVertexIds.length);
-
- for (ExecutionVertexID executionVertexId : executionVertexIds) {
- schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder()
- .withExecutionVertexId(executionVertexId).build());
- }
- return schedulingRequirements;
- }
-
- private SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId(
- ExecutionVertexID executionVertexId,
- Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
- return slotExecutionVertexAssignments.stream()
- .filter(slotExecutionVertexAssignment -> slotExecutionVertexAssignment.getExecutionVertexId().equals(executionVertexId))
- .findFirst()
- .orElseThrow(() -> new IllegalArgumentException(String.format(
- "SlotExecutionVertexAssignment with execution vertex id %s not found",
- executionVertexId)));
- }
-
private static class AllocationToggableSlotProvider implements SlotProvider {
private final List<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>> slotAllocationRequests = new ArrayList<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java
new file mode 100644
index 0000000..426d5f7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Test utils for {@link ExecutionSlotAllocator}.
+ */
+class ExecutionSlotAllocatorTestUtils {
+
+ static List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(
+ final ExecutionVertexID... executionVertexIds) {
+
+ final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+ new ArrayList<>(executionVertexIds.length);
+
+ for (ExecutionVertexID executionVertexId : executionVertexIds) {
+ schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder()
+ .withExecutionVertexId(executionVertexId).build());
+ }
+ return schedulingRequirements;
+ }
+
+ static SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId(
+ final ExecutionVertexID executionVertexId,
+ final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
+
+ return slotExecutionVertexAssignments.stream()
+ .filter(assignment -> assignment.getExecutionVertexId().equals(executionVertexId))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(String.format(
+ "SlotExecutionVertexAssignment with execution vertex id %s not found",
+ executionVertexId)));
+ }
+
+ private ExecutionSlotAllocatorTestUtils() {
+ }
+}