You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/16 09:14:37 UTC

[flink] 02/07: [hotfix][runtime] Move shared static test methods of slot allocator into ExecutionSlotAllocatorTestUtils

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a1c9a30e322b0b3adfeadc3d01a2cbfced9fa74e
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Thu Jun 11 10:37:44 2020 +0800

    [hotfix][runtime] Move shared static test methods of slot allocator into ExecutionSlotAllocatorTestUtils
---
 .../DefaultExecutionSlotAllocatorTest.java         | 23 +-------
 .../scheduler/ExecutionSlotAllocatorTestUtils.java | 61 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 21 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
index a6a559e..ad74357 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java
@@ -54,6 +54,8 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements;
+import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
@@ -280,27 +282,6 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger {
 			new DefaultPreferredLocationsRetriever(stateLocationRetriever, inputsLocationsRetriever));
 	}
 
-	private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(ExecutionVertexID... executionVertexIds) {
-		List<ExecutionVertexSchedulingRequirements> schedulingRequirements = new ArrayList<>(executionVertexIds.length);
-
-		for (ExecutionVertexID executionVertexId : executionVertexIds) {
-			schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder()
-					.withExecutionVertexId(executionVertexId).build());
-		}
-		return schedulingRequirements;
-	}
-
-	private SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId(
-			ExecutionVertexID executionVertexId,
-			Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
-		return slotExecutionVertexAssignments.stream()
-				.filter(slotExecutionVertexAssignment -> slotExecutionVertexAssignment.getExecutionVertexId().equals(executionVertexId))
-				.findFirst()
-				.orElseThrow(() -> new IllegalArgumentException(String.format(
-						"SlotExecutionVertexAssignment with execution vertex id %s not found",
-						executionVertexId)));
-	}
-
 	private static class AllocationToggableSlotProvider implements SlotProvider {
 
 		private final List<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>> slotAllocationRequests = new ArrayList<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java
new file mode 100644
index 0000000..426d5f7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Test utils for {@link ExecutionSlotAllocator}.
+ */
+class ExecutionSlotAllocatorTestUtils {
+
+	static List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(
+			final ExecutionVertexID... executionVertexIds) {
+
+		final List<ExecutionVertexSchedulingRequirements> schedulingRequirements =
+			new ArrayList<>(executionVertexIds.length);
+
+		for (ExecutionVertexID executionVertexId : executionVertexIds) {
+			schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder()
+					.withExecutionVertexId(executionVertexId).build());
+		}
+		return schedulingRequirements;
+	}
+
+	static SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId(
+			final ExecutionVertexID executionVertexId,
+			final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) {
+
+		return slotExecutionVertexAssignments.stream()
+				.filter(assignment -> assignment.getExecutionVertexId().equals(executionVertexId))
+				.findFirst()
+				.orElseThrow(
+					() ->
+						new IllegalArgumentException(String.format(
+							"SlotExecutionVertexAssignment with execution vertex id %s not found",
+							executionVertexId)));
+	}
+
+	private ExecutionSlotAllocatorTestUtils() {
+	}
+}