You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/24 17:24:28 UTC

[flink] 01/02: [FLINK-21399][coordination][tests] Refactor registerSlotsRequiredForJobExecution

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 87c3933b44e38242e80717758261fd34487d185c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 22 08:30:24 2021 +0100

    [FLINK-21399][coordination][tests] Refactor registerSlotsRequiredForJobExecution
---
 .../jobmaster/JobMasterQueryableStateTest.java     | 41 +-----------
 .../runtime/jobmaster/JobMasterTestUtils.java      | 74 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 38 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java
index fa53ebb..0de4ec9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.queryablestate.KvStateID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -37,11 +35,6 @@ import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -53,10 +46,7 @@ import org.junit.Test;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
-import java.util.Collection;
 import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
 import static org.hamcrest.CoreMatchers.either;
@@ -318,35 +308,10 @@ public class JobMasterQueryableStateTest extends TestLogger {
         }
     }
 
-    private void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway)
+    private static void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway)
             throws ExecutionException, InterruptedException {
-
-        final TaskExecutorGateway taskExecutorGateway =
-                new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
-        final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
-                new LocalUnresolvedTaskManagerLocation();
-
-        rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
-
-        jobMasterGateway
-                .registerTaskManager(
-                        taskExecutorGateway.getAddress(),
-                        unresolvedTaskManagerLocation,
-                        testingTimeout)
-                .get();
-
-        Collection<SlotOffer> slotOffers =
-                IntStream.range(0, PARALLELISM)
-                        .mapToObj(
-                                index ->
-                                        new SlotOffer(
-                                                new AllocationID(), index, ResourceProfile.ANY))
-                        .collect(Collectors.toList());
-
-        jobMasterGateway
-                .offerSlots(
-                        unresolvedTaskManagerLocation.getResourceID(), slotOffers, testingTimeout)
-                .get();
+        JobMasterTestUtils.registerTaskExecutorAndOfferSlots(
+                rpcService, jobMasterGateway, PARALLELISM, testingTimeout);
     }
 
     private static void registerKvState(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java
new file mode 100644
index 0000000..cd9609f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** JobMaster-related test utils. */
+public class JobMasterTestUtils {
+
+    public static void registerTaskExecutorAndOfferSlots(
+            TestingRpcService rpcService,
+            JobMasterGateway jobMasterGateway,
+            int numSlots,
+            Time testingTimeout)
+            throws ExecutionException, InterruptedException {
+
+        final TaskExecutorGateway taskExecutorGateway =
+                new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+        final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
+                new LocalUnresolvedTaskManagerLocation();
+
+        rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
+
+        jobMasterGateway
+                .registerTaskManager(
+                        taskExecutorGateway.getAddress(),
+                        unresolvedTaskManagerLocation,
+                        testingTimeout)
+                .get();
+
+        Collection<SlotOffer> slotOffers =
+                IntStream.range(0, numSlots)
+                        .mapToObj(
+                                index ->
+                                        new SlotOffer(
+                                                new AllocationID(), index, ResourceProfile.ANY))
+                        .collect(Collectors.toList());
+
+        jobMasterGateway
+                .offerSlots(
+                        unresolvedTaskManagerLocation.getResourceID(), slotOffers, testingTimeout)
+                .get();
+    }
+
+    private JobMasterTestUtils() {}
+}