You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/10 09:36:14 UTC
[flink] 13/16: [FLINK-12763][runtime] Use a timeout of zero to
indicate no startup period
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit faacb556544eb8a8e39ca75422efc1547effc6da
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 9 16:46:18 2019 +0200
[FLINK-12763][runtime] Use a timeout of zero to indicate no startup period
This allows users to configure a behavior similar to previous versions.
---
.../flink/configuration/ConfigurationUtils.java | 2 +-
.../resourcemanager/StandaloneResourceManager.java | 17 +--
.../resourcemanager/slotmanager/SlotManager.java | 4 +
.../StandaloneResourceManagerTest.java | 119 +++++++++++++++++++++
4 files changed, 135 insertions(+), 7 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index d130b80..2fe2f37 100755
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -116,7 +116,7 @@ public class ConfigurationUtils {
public static Time getStandaloneClusterStartupPeriodTime(Configuration configuration) {
final Time timeout;
long standaloneClusterStartupPeriodTime = configuration.getLong(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME);
- if (standaloneClusterStartupPeriodTime > 0) {
+ if (standaloneClusterStartupPeriodTime >= 0) {
timeout = Time.milliseconds(standaloneClusterStartupPeriodTime);
} else {
timeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
old mode 100644
new mode 100755
index dfbfec1..bc0e8df
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
*/
public class StandaloneResourceManager extends ResourceManager<ResourceID> {
+ /** The duration of the startup period. A duration of zero means there is no startup period. */
private final Time startupPeriodTime;
public StandaloneResourceManager(
@@ -79,12 +80,16 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
@Override
protected void initialize() throws ResourceManagerException {
- getRpcService().getScheduledExecutor().schedule(
- () -> getMainThreadExecutor().execute(
- () -> setFailUnfulfillableRequest(true)),
- startupPeriodTime.toMilliseconds(),
- TimeUnit.MILLISECONDS
- );
+ final long startupPeriodMillis = startupPeriodTime.toMilliseconds();
+
+ if (startupPeriodMillis > 0) {
+ getRpcService().getScheduledExecutor().schedule(
+ () -> getMainThreadExecutor().execute(
+ () -> setFailUnfulfillableRequest(true)),
+ startupPeriodMillis,
+ TimeUnit.MILLISECONDS
+ );
+ }
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
old mode 100644
new mode 100755
index 320612a..72a77f1
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -192,6 +192,10 @@ public class SlotManager implements AutoCloseable {
return pendingSlots.size();
}
+ public boolean isFailingUnfulfillableRequest() {
+ return failUnfulfillableRequest;
+ }
+
@VisibleForTesting
int getNumberAssignedPendingTaskManagerSlots() {
return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
new file mode 100755
index 0000000..4c78fca
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the Standalone Resource Manager.
+ */
+public class StandaloneResourceManagerTest {
+
+ @ClassRule
+ public static final TestingRpcServiceResource RPC_SERVICE = new TestingRpcServiceResource();
+
+ private static final Time TIMEOUT = Time.seconds(10);
+
+ private final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+ @Test
+ public void testStartupPeriod() throws Exception {
+ final Tuple2<StandaloneResourceManager, SlotManager> managers = createResourceManagerAndSlotManager(Time.milliseconds(1));
+ final StandaloneResourceManager rm = managers.f0;
+ final SlotManager sm = managers.f1;
+
+ assertHappensUntil(sm::isFailingUnfulfillableRequest, Deadline.fromNow(Duration.ofSeconds(10)));
+
+ rm.close();
+ }
+
+ @Test
+ public void testNoStartupPeriod() throws Exception {
+ final Tuple2<StandaloneResourceManager, SlotManager> managers = createResourceManagerAndSlotManager(Time.milliseconds(-1));
+ final StandaloneResourceManager rm = managers.f0;
+ final SlotManager sm = managers.f1;
+
+ // startup includes initialization and granting leadership, so by the time we are
+ // here, the initialization method scheduling the startup period will have been executed.
+
+ assertFalse(fatalErrorHandler.hasExceptionOccurred());
+ assertFalse(sm.isFailingUnfulfillableRequest());
+
+ rm.close();
+ }
+
+ private StandaloneResourceManager createResourceManager(Time startupPeriod) throws Exception {
+ return createResourceManagerAndSlotManager(startupPeriod).f0;
+ }
+
+ private Tuple2<StandaloneResourceManager, SlotManager> createResourceManagerAndSlotManager(
+ Time startupPeriod) throws Exception {
+
+ final MockResourceManagerRuntimeServices rmServices = new MockResourceManagerRuntimeServices(
+ RPC_SERVICE.getTestingRpcService(),
+ TIMEOUT);
+
+ final StandaloneResourceManager rm = new StandaloneResourceManager(
+ rmServices.rpcService,
+ UUID.randomUUID().toString(),
+ ResourceID.generate(),
+ rmServices.highAvailabilityServices,
+ rmServices.heartbeatServices,
+ rmServices.slotManager,
+ rmServices.metricRegistry,
+ rmServices.jobLeaderIdService,
+ new ClusterInformation("localhost", 1234),
+ fatalErrorHandler,
+ UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+ startupPeriod);
+
+ rm.start();
+ rmServices.grantLeadership();
+
+ return new Tuple2<>(rm, rmServices.slotManager);
+ }
+
+ private static void assertHappensUntil(Supplier<Boolean> condition, Deadline until) throws InterruptedException {
+ while (!condition.get()) {
+ if (!until.hasTimeLeft()) {
+ fail("condition was not fulfilled before the deadline");
+ }
+ Thread.sleep(2);
+ }
+ }
+}