You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/10 09:36:14 UTC

[flink] 13/16: [FLINK-12763][runtime] Use a timeout of zero to indicate no startup period

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit faacb556544eb8a8e39ca75422efc1547effc6da
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 9 16:46:18 2019 +0200

    [FLINK-12763][runtime] Use a timeout of zero to indicate no startup period
    
    This allows users to configure a behavior similar to previous versions.
---
 .../flink/configuration/ConfigurationUtils.java    |   2 +-
 .../resourcemanager/StandaloneResourceManager.java |  17 +--
 .../resourcemanager/slotmanager/SlotManager.java   |   4 +
 .../StandaloneResourceManagerTest.java             | 119 +++++++++++++++++++++
 4 files changed, 135 insertions(+), 7 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
index d130b80..2fe2f37 100755
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
@@ -116,7 +116,7 @@ public class ConfigurationUtils {
 	public static Time getStandaloneClusterStartupPeriodTime(Configuration configuration) {
 		final Time timeout;
 		long standaloneClusterStartupPeriodTime = configuration.getLong(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME);
-		if (standaloneClusterStartupPeriodTime > 0) {
+		if (standaloneClusterStartupPeriodTime >= 0) {
 			timeout = Time.milliseconds(standaloneClusterStartupPeriodTime);
 		} else {
 			timeout = Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
old mode 100644
new mode 100755
index dfbfec1..bc0e8df
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
  */
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
+	/** The duration of the startup period. A duration of zero means there is no startup period. */
 	private final Time startupPeriodTime;
 
 	public StandaloneResourceManager(
@@ -79,12 +80,16 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	@Override
 	protected void initialize() throws ResourceManagerException {
-		getRpcService().getScheduledExecutor().schedule(
-			() -> getMainThreadExecutor().execute(
-				() -> setFailUnfulfillableRequest(true)),
-			startupPeriodTime.toMilliseconds(),
-			TimeUnit.MILLISECONDS
-		);
+		final long startupPeriodMillis = startupPeriodTime.toMilliseconds();
+
+		if (startupPeriodMillis > 0) {
+			getRpcService().getScheduledExecutor().schedule(
+				() -> getMainThreadExecutor().execute(
+					() -> setFailUnfulfillableRequest(true)),
+				startupPeriodMillis,
+				TimeUnit.MILLISECONDS
+			);
+		}
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
old mode 100644
new mode 100755
index 320612a..72a77f1
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -192,6 +192,10 @@ public class SlotManager implements AutoCloseable {
 		return pendingSlots.size();
 	}
 
+	public boolean isFailingUnfulfillableRequest() {
+		return failUnfulfillableRequest;
+	}
+
 	@VisibleForTesting
 	int getNumberAssignedPendingTaskManagerSlots() {
 		return (int) pendingSlots.values().stream().filter(slot -> slot.getAssignedPendingSlotRequest() != null).count();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
new file mode 100755
index 0000000..4c78fca
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.UUID;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the Standalone Resource Manager.
+ */
+public class StandaloneResourceManagerTest {
+
+	@ClassRule
+	public static final TestingRpcServiceResource RPC_SERVICE = new TestingRpcServiceResource();
+
+	private static final Time TIMEOUT = Time.seconds(10);
+
+	private final TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
+
+	@Test
+	public void testStartupPeriod() throws Exception {
+		final Tuple2<StandaloneResourceManager, SlotManager> managers = createResourceManagerAndSlotManager(Time.milliseconds(1));
+		final StandaloneResourceManager rm = managers.f0;
+		final SlotManager sm = managers.f1;
+
+		assertHappensUntil(sm::isFailingUnfulfillableRequest, Deadline.fromNow(Duration.ofSeconds(10)));
+
+		rm.close();
+	}
+
+	@Test
+	public void testNoStartupPeriod() throws Exception {
+		final Tuple2<StandaloneResourceManager, SlotManager> managers = createResourceManagerAndSlotManager(Time.milliseconds(-1));
+		final StandaloneResourceManager rm = managers.f0;
+		final SlotManager sm = managers.f1;
+
+		// startup includes initialization and granting leadership, so by the time we are
+		// here, the initialization method scheduling the startup period will have been executed.
+
+		assertFalse(fatalErrorHandler.hasExceptionOccurred());
+		assertFalse(sm.isFailingUnfulfillableRequest());
+
+		rm.close();
+	}
+
+	private StandaloneResourceManager createResourceManager(Time startupPeriod) throws Exception {
+		return createResourceManagerAndSlotManager(startupPeriod).f0;
+	}
+
+	private Tuple2<StandaloneResourceManager, SlotManager> createResourceManagerAndSlotManager(
+			Time startupPeriod) throws Exception {
+
+		final MockResourceManagerRuntimeServices rmServices = new MockResourceManagerRuntimeServices(
+			RPC_SERVICE.getTestingRpcService(),
+			TIMEOUT);
+
+		final StandaloneResourceManager rm = new StandaloneResourceManager(
+			rmServices.rpcService,
+			UUID.randomUUID().toString(),
+			ResourceID.generate(),
+			rmServices.highAvailabilityServices,
+			rmServices.heartbeatServices,
+			rmServices.slotManager,
+			rmServices.metricRegistry,
+			rmServices.jobLeaderIdService,
+			new ClusterInformation("localhost", 1234),
+			fatalErrorHandler,
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+			startupPeriod);
+
+		rm.start();
+		rmServices.grantLeadership();
+
+		return new Tuple2<>(rm, rmServices.slotManager);
+	}
+
+	private static void assertHappensUntil(Supplier<Boolean> condition, Deadline until) throws InterruptedException {
+		while (!condition.get()) {
+			if (!until.hasTimeLeft()) {
+				fail("condition was not fulfilled before the deadline");
+			}
+			Thread.sleep(2);
+		}
+	}
+}