You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/02/24 17:24:28 UTC
[flink] 01/02: [FLINK-21399][coordination][tests] Refactor
registerSlotsRequiredForJobExecution
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 87c3933b44e38242e80717758261fd34487d185c
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Feb 22 08:30:24 2021 +0100
[FLINK-21399][coordination][tests] Refactor registerSlotsRequiredForJobExecution
---
.../jobmaster/JobMasterQueryableStateTest.java | 41 +-----------
.../runtime/jobmaster/JobMasterTestUtils.java | 74 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 38 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java
index fa53ebb..0de4ec9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterQueryableStateTest.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.queryablestate.KvStateID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -37,11 +35,6 @@ import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
-import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
-import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
-import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -53,10 +46,7 @@ import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.Collection;
import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.hamcrest.CoreMatchers.either;
@@ -318,35 +308,10 @@ public class JobMasterQueryableStateTest extends TestLogger {
}
}
- private void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway)
+ private static void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway)
throws ExecutionException, InterruptedException {
-
- final TaskExecutorGateway taskExecutorGateway =
- new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
- final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
- new LocalUnresolvedTaskManagerLocation();
-
- rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
-
- jobMasterGateway
- .registerTaskManager(
- taskExecutorGateway.getAddress(),
- unresolvedTaskManagerLocation,
- testingTimeout)
- .get();
-
- Collection<SlotOffer> slotOffers =
- IntStream.range(0, PARALLELISM)
- .mapToObj(
- index ->
- new SlotOffer(
- new AllocationID(), index, ResourceProfile.ANY))
- .collect(Collectors.toList());
-
- jobMasterGateway
- .offerSlots(
- unresolvedTaskManagerLocation.getResourceID(), slotOffers, testingTimeout)
- .get();
+ JobMasterTestUtils.registerTaskExecutorAndOfferSlots(
+ rpcService, jobMasterGateway, PARALLELISM, testingTimeout);
}
private static void registerKvState(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java
new file mode 100644
index 0000000..cd9609f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTestUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** JobMaster-related test utils. */
+public class JobMasterTestUtils {
+
+ public static void registerTaskExecutorAndOfferSlots(
+ TestingRpcService rpcService,
+ JobMasterGateway jobMasterGateway,
+ int numSlots,
+ Time testingTimeout)
+ throws ExecutionException, InterruptedException {
+
+ final TaskExecutorGateway taskExecutorGateway =
+ new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
+ final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
+ new LocalUnresolvedTaskManagerLocation();
+
+ rpcService.registerGateway(taskExecutorGateway.getAddress(), taskExecutorGateway);
+
+ jobMasterGateway
+ .registerTaskManager(
+ taskExecutorGateway.getAddress(),
+ unresolvedTaskManagerLocation,
+ testingTimeout)
+ .get();
+
+ Collection<SlotOffer> slotOffers =
+ IntStream.range(0, numSlots)
+ .mapToObj(
+ index ->
+ new SlotOffer(
+ new AllocationID(), index, ResourceProfile.ANY))
+ .collect(Collectors.toList());
+
+ jobMasterGateway
+ .offerSlots(
+ unresolvedTaskManagerLocation.getResourceID(), slotOffers, testingTimeout)
+ .get();
+ }
+
+ private JobMasterTestUtils() {}
+}