You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/22 10:03:21 UTC

[GitHub] [flink] xintongsong commented on a change in pull request #8952: [FLINK-10868][flink-runtime] Add failure rater for resource manager

xintongsong commented on a change in pull request #8952:
URL: https://github.com/apache/flink/pull/8952#discussion_r547112777



##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,33 @@
 			"for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take " +
 			"effect for standalone clusters, where how many slots are allocated is not controlled by Flink.");
 
+	/**
+	 * Defines the maximum number of worker (YARN / Mesos) failures per minute before rejecting subsequent worker
+	 * requests until the failure rate falls below the maximum. It is to quickly catch external dependency caused
+	 * workers failure and wait for retry interval before sending new request. Be default, -1.0 is set to disable the feature.
+	 */
+	public static final ConfigOption<Double> MAXIMUM_WORKERS_FAILURE_RATE = ConfigOptions
+		.key("resourcemanager.maximum-workers-failure-rate")
+		.doubleType()
+		.defaultValue(-1.0)
+		.withDescription("Defines the maximum number of worker (YARN / Mesos) failures per minute before rejecting" +
+			" subsequent worker requests until the failure rate falls below the maximum. It is to quickly catch" +
+			" external dependency caused workers failure and terminate job accordingly." +
+			" Be default, -1.0 is set to disable the feature.");
+
+	/**
+	 * Defines the worker creation interval in milliseconds. In case of worker creation failures, we should wait for an interval before
+	 * trying to create new workers when the failure rate exceeds. Otherwise, ActiveResourceManager will always re-requesting
+	 * the worker, which keeps the main thread busy.
+	 */
+	public static final ConfigOption<Long> WORKER_CREATION_RETRY_INTERVAL = ConfigOptions
+		.key("resourcemanager.workers-creation-retry-interval")
+		.longType()

Review comment:
       `durationType()` is preferred for intervals.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,33 @@
 			"for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take " +
 			"effect for standalone clusters, where how many slots are allocated is not controlled by Flink.");
 
+	/**
+	 * Defines the maximum number of worker (YARN / Mesos) failures per minute before rejecting subsequent worker
+	 * requests until the failure rate falls below the maximum. It is to quickly catch external dependency caused
+	 * workers failure and wait for retry interval before sending new request. Be default, -1.0 is set to disable the feature.
+	 */
+	public static final ConfigOption<Double> MAXIMUM_WORKERS_FAILURE_RATE = ConfigOptions
+		.key("resourcemanager.maximum-workers-failure-rate")
+		.doubleType()
+		.defaultValue(-1.0)
+		.withDescription("Defines the maximum number of worker (YARN / Mesos) failures per minute before rejecting" +
+			" subsequent worker requests until the failure rate falls below the maximum. It is to quickly catch" +
+			" external dependency caused workers failure and terminate job accordingly." +
+			" Be default, -1.0 is set to disable the feature.");
+
+	/**
+	 * Defines the worker creation interval in milliseconds. In case of worker creation failures, we should wait for an interval before
+	 * trying to create new workers when the failure rate exceeds. Otherwise, ActiveResourceManager will always re-requesting
+	 * the worker, which keeps the main thread busy.
+	 */
+	public static final ConfigOption<Long> WORKER_CREATION_RETRY_INTERVAL = ConfigOptions
+		.key("resourcemanager.workers-creation-retry-interval")
+		.longType()
+		.defaultValue(3000L)
+		.withDescription("Defines the worker creation interval in milliseconds. In case of worker creation failures,"
+			+ " we should wait for an interval before trying to create new workers when the failure rate exceeds."
+			+ " Otherwise, ActiveResourceManager will always re-requesting the worker, which keeps the main thread busy.");
+

Review comment:
       I would suggest the following as keys for these configuration options.
   * `resourcemanager.start-worker.max-failure-rate`
   * `resourcemanager.start-worker.retry-internal`
   The common prefix suggests these two options are related. Also, "start worker failure" is more descriptive since "worker failure" can also mean workers failed after registration. 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ */
+public class TimestampBasedFailureRater implements FailureRater {
+	private static final int DEFAULT_TIMESTAMP_SIZE = 300;
+	private static final Double MILLISECONDS_PER_SECOND = 1000.0;
+	private final int maximumFailureRate;
+	private final Time failureInterval;
+	private final ArrayDeque<Long> failureTimestamps;
+	private long failureCounter = 0;
+
+	public TimestampBasedFailureRater(int maximumFailureRate, Time failureInterval) {
+		this.maximumFailureRate = maximumFailureRate;
+		this.failureInterval = failureInterval;
+		this.failureTimestamps = new ArrayDeque<>(maximumFailureRate > 0 ? maximumFailureRate : DEFAULT_TIMESTAMP_SIZE);
+	}
+
+	@Override
+	public void markEvent() {
+		markEvent(System.currentTimeMillis());

Review comment:
       Same here. I think we are not using `markEvent` correctly.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ */
+public class TimestampBasedFailureRater implements FailureRater {
+	private static final int DEFAULT_TIMESTAMP_SIZE = 300;
+	private static final Double MILLISECONDS_PER_SECOND = 1000.0;
+	private final int maximumFailureRate;
+	private final Time failureInterval;
+	private final ArrayDeque<Long> failureTimestamps;
+	private long failureCounter = 0;
+
+	public TimestampBasedFailureRater(int maximumFailureRate, Time failureInterval) {
+		this.maximumFailureRate = maximumFailureRate;
+		this.failureInterval = failureInterval;
+		this.failureTimestamps = new ArrayDeque<>(maximumFailureRate > 0 ? maximumFailureRate : DEFAULT_TIMESTAMP_SIZE);
+	}
+
+	@Override
+	public void markEvent() {
+		markEvent(System.currentTimeMillis());
+	}
+
+	@Override
+	public void markEvent(long n) {
+		failureTimestamps.add(n);
+		failureCounter++;
+	}
+
+	@Override
+	public double getRate() {
+		return getCurrentFailureRate() / (failureInterval.toMilliseconds() / MILLISECONDS_PER_SECOND);
+	}
+
+	@Override
+	public long getCount() {
+		return failureCounter;
+	}
+
+	@Override
+	public boolean exceedsFailureRate() {
+		return getCurrentFailureRate() >= maximumFailureRate;
+	}
+
+	@Override
+	public void markFailure(Clock clock) {
+		failureTimestamps.add(clock.absoluteTimeMillis());
+		failureCounter++;
+	}
+
+	@Override
+	public double getCurrentFailureRate() {
+		Long currentTimeStamp = System.currentTimeMillis();
+		while (!failureTimestamps.isEmpty() &&
+			currentTimeStamp - failureTimestamps.peek() > failureInterval.toMilliseconds()) {
+			failureTimestamps.remove();
+		}
+
+		return failureTimestamps.size();

Review comment:
       The calculation logic strongly depends on that failures in queue are strictly ordered by time.
   To ensure that, we should not expose the argument `clock` in `markFailure`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -860,6 +875,24 @@ private void clearStateInternal() {
 		clearStateFuture = clearStateAsync();
 	}
 
+	/**
+	 * Record failure number of worker in ResourceManagers. If maximum failure rate is detected,
+	 * then cancel all pending requests.
+	 *
+	 * @return whether should acquire new container/worker after the a stop interval
+	 */
+	@VisibleForTesting
+	public boolean recordWorkerFailure() {

Review comment:
       This should also belong to `ActiveResourceManager`.
   Also, `@VisibleForTesting` should be removed.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
##########
@@ -259,6 +286,22 @@ private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
 				}));
 	}
 
+	private void tryResetWorkerCreationCoolDown() {
+		if (workerCreationCoolDown.isDone()) {
+			log.info("Worker creation failed. Will not retry creating worker in {}.", workerCreationRetryInterval);

Review comment:
       ```suggestion
   			log.info("Reaching max start worker failure rate. Will not retry creating worker in {}.", workerCreationRetryInterval);
   ```

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,33 @@
 			"for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take " +
 			"effect for standalone clusters, where how many slots are allocated is not controlled by Flink.");
 
+	/**
+	 * Defines the maximum number of worker (YARN / Mesos) failures per minute before rejecting subsequent worker
+	 * requests until the failure rate falls below the maximum. It is to quickly catch external dependency caused
+	 * workers failure and wait for retry interval before sending new request. Be default, -1.0 is set to disable the feature.
+	 */
+	public static final ConfigOption<Double> MAXIMUM_WORKERS_FAILURE_RATE = ConfigOptions
+		.key("resourcemanager.maximum-workers-failure-rate")
+		.doubleType()
+		.defaultValue(-1.0)

Review comment:
       I'm not sure about disabling this feature by default. I suspect not many people would be aware of the feature it's disabled by default. It might make sense to enable the feature a reasonable default rate, e.g., 10/min.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/NoFailureRater.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.util.clock.Clock;
+
+/**
+ * No failure rater only records to the per second rate for metrics reporting. Thus, its functionality falls back to
+ * a MeterView.
+ */
+public class NoFailureRater extends MeterView implements FailureRater  {
+
+	public NoFailureRater(int timeSpanInSeconds) {
+		super(timeSpanInSeconds);
+	}
+
+	@Override
+	public boolean exceedsFailureRate() {
+		return false;
+	}
+
+	@Override
+	public void markFailure(Clock clock) {
+		markEvent(clock.absoluteTimeMillis());

Review comment:
       I think this should not work.
   
   According to JavaDoc of `Meter#markEvent`, the argument represents number of events occurred, not the timestamp when the event occurred.
   
   I think we don't really need the argument `clock` in the interface. Shouldn't the event always be recorded with the current timestamp?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ */
+public class TimestampBasedFailureRater implements FailureRater {
+	private static final int DEFAULT_TIMESTAMP_SIZE = 300;
+	private static final Double MILLISECONDS_PER_SECOND = 1000.0;
+	private final int maximumFailureRate;
+	private final Time failureInterval;
+	private final ArrayDeque<Long> failureTimestamps;
+	private long failureCounter = 0;
+
+	public TimestampBasedFailureRater(int maximumFailureRate, Time failureInterval) {
+		this.maximumFailureRate = maximumFailureRate;
+		this.failureInterval = failureInterval;
+		this.failureTimestamps = new ArrayDeque<>(maximumFailureRate > 0 ? maximumFailureRate : DEFAULT_TIMESTAMP_SIZE);
+	}
+
+	@Override
+	public void markEvent() {
+		markEvent(System.currentTimeMillis());
+	}
+
+	@Override
+	public void markEvent(long n) {
+		failureTimestamps.add(n);
+		failureCounter++;
+	}
+
+	@Override
+	public double getRate() {
+		return getCurrentFailureRate() / (failureInterval.toMilliseconds() / MILLISECONDS_PER_SECOND);
+	}
+
+	@Override
+	public long getCount() {
+		return failureCounter;
+	}
+
+	@Override
+	public boolean exceedsFailureRate() {
+		return getCurrentFailureRate() >= maximumFailureRate;
+	}
+
+	@Override
+	public void markFailure(Clock clock) {
+		failureTimestamps.add(clock.absoluteTimeMillis());
+		failureCounter++;
+	}
+
+	@Override
+	public double getCurrentFailureRate() {
+		Long currentTimeStamp = System.currentTimeMillis();
+		while (!failureTimestamps.isEmpty() &&
+			currentTimeStamp - failureTimestamps.peek() > failureInterval.toMilliseconds()) {
+			failureTimestamps.remove();
+		}
+
+		return failureTimestamps.size();
+	}
+
+	public Time getFailureInterval() {
+		return failureInterval;
+	}

Review comment:
       Unused

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/FailureRater.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.util.clock.Clock;
+
+/**
+ * Failure rate interface.
+ */
+public interface FailureRater extends Meter {

Review comment:
       This interface has nothing specific to *failures*.
   We might consider to make it more general and reusable, something like `ThresholdMeter`.
   We can reuse `Meter#markEvent` for `markFailure` and `Meter#getRate` for `getCurrentFailureRate`. Only `exceedsFailureRate` needs to be added.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/FailureRaterUtil.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ResourceManagerOptions;
+
+/**
+ * Failure rate util.
+ */
+public class FailureRaterUtil {
+
+	public static FailureRater createFailureRater(
+		Configuration configuration) {
+		double rate = configuration.getDouble(ResourceManagerOptions.MAXIMUM_WORKERS_FAILURE_RATE);
+		if (rate < 0) {
+			return new NoFailureRater(60);
+		} else {
+			int maximumRate = Double.valueOf(rate).intValue();
+			return new TimestampBasedFailureRater(maximumRate < rate ? maximumRate + 1 : maximumRate, Time.minutes(1));
+		}

Review comment:
       I don't really think we need this util class.
   
   Not sure if we need a two different raters. `NoFailureRater` might be a bit light weighted compared to `TimestampBasedFailureRater`. But I wonder how significant the difference could be. Would it worth to complicates the codes and maintain two different raters?
   
   Also, I think we can convert the max rate from double to integer inside `TimestampBasedFailureRater`. It's a bit confusing to see an integer type *rate* in the constructor argument list.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ */
+public class TimestampBasedFailureRater implements FailureRater {
+	private static final int DEFAULT_TIMESTAMP_SIZE = 300;
+	private static final Double MILLISECONDS_PER_SECOND = 1000.0;
+	private final int maximumFailureRate;
+	private final Time failureInterval;
+	private final ArrayDeque<Long> failureTimestamps;
+	private long failureCounter = 0;
+
+	public TimestampBasedFailureRater(int maximumFailureRate, Time failureInterval) {
+		this.maximumFailureRate = maximumFailureRate;
+		this.failureInterval = failureInterval;
+		this.failureTimestamps = new ArrayDeque<>(maximumFailureRate > 0 ? maximumFailureRate : DEFAULT_TIMESTAMP_SIZE);

Review comment:
       Why do we make the initial capacity `maximumFailureRate`?
   There's no guarantee that the queue sizes won't exceed `maximumFailureRate`. New failures can still be recorded when the max rate is reached.
   On the other hand, in most cases where failures do not happen a lot, this queue should be almost empty.
   
   The community's code style guidelines suggest "Set the initial capacity for a collection only if there is a good proven reason".
   https://flink.apache.org/contributing/code-style-and-quality-java.html#collections

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -143,6 +150,8 @@
 
 	private final ResourceManagerMetricGroup resourceManagerMetricGroup;
 
+	protected final FailureRater failureRater;

Review comment:
       I think we should only introduce this for the active resource managers.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.clock.Clock;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ */
+public class TimestampBasedFailureRater implements FailureRater {
+	private static final int DEFAULT_TIMESTAMP_SIZE = 300;
+	private static final Double MILLISECONDS_PER_SECOND = 1000.0;
+	private final int maximumFailureRate;
+	private final Time failureInterval;
+	private final ArrayDeque<Long> failureTimestamps;

Review comment:
       Why do we need a `Deque`? Wouldn't a `Queue` be enough?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
##########
@@ -378,15 +382,69 @@ public void testOnError() throws Exception {
 		}};
 	}
 
-	private static class Context {
+	@Test
+	public void testWorkerCreationInterval() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setDouble(ResourceManagerOptions.MAXIMUM_WORKERS_FAILURE_RATE, 1);
+		configuration.setLong(ResourceManagerOptions.WORKER_CREATION_RETRY_INTERVAL,
+			WORKER_CREATION_INTERVAL.toMilliseconds());
+		new Context(configuration) {{
+			final AtomicInteger requestCount = new AtomicInteger(0);
+
+			final List<ResourceID> tmResourceIds = new ArrayList<>();
+			tmResourceIds.add(ResourceID.generate());
+			tmResourceIds.add(ResourceID.generate());
+
+			final List<CompletableFuture<Long>> requestWorkerFromDriverFutures = new ArrayList<>();
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+			requestWorkerFromDriverFutures.add(new CompletableFuture<>());
+
+			driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+				int idx = requestCount.getAndIncrement();
+				assertThat(idx, lessThan(2));
+
+				requestWorkerFromDriverFutures.get(idx).complete(System.currentTimeMillis());
+				return CompletableFuture.completedFuture(tmResourceIds.get(idx));
+			});
+
+			slotManagerBuilder.setGetRequiredResourcesSupplier(() -> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
 
-		final Configuration flinkConfig = new Configuration();
+			runTest(() -> {
+				// received worker request, verify requesting from driver
+				CompletableFuture<Boolean> startNewWorkerFuture = runInMainThread(() ->
+					getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+				long t1 = requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+				assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+
+				// first worker failed before register, verify requesting another worker from driver
+				runInMainThread(() -> getResourceManager().onWorkerTerminated(tmResourceIds.get(0), "terminate for testing"));

Review comment:
       This only covers recording failures for workers that are failed after being successfully requested.
   I think we should also cover the cases that `requestResource` future completes exceptionally.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
##########
@@ -259,6 +286,22 @@ private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
 				}));
 	}
 
+	private void tryResetWorkerCreationCoolDown() {
+		if (workerCreationCoolDown.isDone()) {
+			log.info("Worker creation failed. Will not retry creating worker in {}.", workerCreationRetryInterval);
+			workerCreationCoolDown = new CompletableFuture<>();
+			getMainThreadExecutor().schedule(

Review comment:
       We can use `scheduleRunAsync` directly.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
##########
@@ -212,6 +227,9 @@ public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWo
 
 	@Override
 	public void onWorkerTerminated(ResourceID resourceId, String diagnostics) {
+		if (recordWorkerFailure()) {

Review comment:
       We should only record failure events for workers that are not yet registered. They can be identified by checking against `currentAttemptUnregisteredWorkers`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -845,6 +856,10 @@ private void registerTaskExecutorMetrics() {
 		resourceManagerMetricGroup.gauge(
 			MetricNames.NUM_REGISTERED_TASK_MANAGERS,
 			() -> (long) taskExecutors.size());
+
+		resourceManagerMetricGroup.meter(

Review comment:
       This metric does not make any sense for standalone resource manager.
   We can make `ActiveResourceManager` override this method and register this additional metric.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
##########
@@ -109,6 +121,9 @@ public ActiveResourceManager(
 		this.workerNodeMap = new HashMap<>();
 		this.pendingWorkerCounter = new PendingWorkerCounter();
 		this.currentAttemptUnregisteredWorkers = new HashMap<>();
+		long retryInterval = flinkConfig.getLong(ResourceManagerOptions.WORKER_CREATION_RETRY_INTERVAL);

Review comment:
       It's preferred to parse the configuration option outside `ActiveResourceManager` and pass it in as an argument, per the dependency injection principle.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRaterTest.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test time stamp based failure rater.
+ */
+public class TimestampBasedFailureRaterTest {
+
+	@Test
+	public void testMaximumFailureCheck() {
+		TimestampBasedFailureRater rater = new TimestampBasedFailureRater(5, Time.of(10, TimeUnit.SECONDS));
+
+		for (int i = 0; i < 6; i++) {
+			rater.markEvent();
+		}
+
+		Assert.assertEquals(6.0, rater.getCurrentFailureRate(), 0.01);
+		Assert.assertTrue(rater.exceedsFailureRate());
+	}
+
+	@Test
+	public void testMovingRate() throws InterruptedException {
+		FailureRater rater = new TimestampBasedFailureRater(5, Time.of(500, TimeUnit.MILLISECONDS));
+
+		ManualClock manualClock = new ManualClock();
+		manualClock.advanceTime(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+		for (int i = 0; i < 6; i++) {
+			rater.markFailure(manualClock);
+			manualClock.advanceTime(150, TimeUnit.MILLISECONDS);
+			Thread.sleep(150);
+		}
+
+		Assert.assertEquals(3.0, rater.getCurrentFailureRate(), 0.01);
+		Assert.assertFalse(rater.exceedsFailureRate());
+	}
+}

Review comment:
       I think we should also test against the interfaces in `Meter`.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRaterTest.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.clock.ManualClock;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test time stamp based failure rater.
+ */
+public class TimestampBasedFailureRaterTest {

Review comment:
       ```suggestion
   public class TimestampBasedFailureRaterTest extends TestLogger {
   ```

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
##########
@@ -378,15 +382,69 @@ public void testOnError() throws Exception {
 		}};
 	}
 
-	private static class Context {
+	@Test
+	public void testWorkerCreationInterval() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setDouble(ResourceManagerOptions.MAXIMUM_WORKERS_FAILURE_RATE, 1);
+		configuration.setLong(ResourceManagerOptions.WORKER_CREATION_RETRY_INTERVAL,
+			WORKER_CREATION_INTERVAL.toMilliseconds());
+		new Context(configuration) {{

Review comment:
       No need to introduce a new constructor for `Context`.
   You can write into `flinkConfig` from inside `Context`, just before calling `runTest`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
##########
@@ -246,6 +269,10 @@ private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
 								workerResourceSpec,
 								count,
 								exception);
+						if (recordWorkerFailure()) {
+							// if exceed failure rate try to slow down
+							tryResetWorkerCreationCoolDown();
+						}

Review comment:
       Looks like `tryResetWorkerCreationCoolDown` is always called in the same if-block.
   We could combine them into something like `recordWorkerFailureAndPauseWorkerCreationIfNeeded`.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -67,6 +67,33 @@
 			"for streaming workloads, which may fail if there are not enough slots. Note that this configuration option does not take " +
 			"effect for standalone clusters, where how many slots are allocated is not controlled by Flink.");
 
+	/**
+	 * Defines the maximum number of worker (YARN / Mesos) failures per minute before rejecting subsequent worker
+	 * requests until the failure rate falls below the maximum. It is to quickly catch external dependency caused
+	 * workers failure and wait for retry interval before sending new request. Be default, -1.0 is set to disable the feature.
+	 */
+	public static final ConfigOption<Double> MAXIMUM_WORKERS_FAILURE_RATE = ConfigOptions
+		.key("resourcemanager.maximum-workers-failure-rate")
+		.doubleType()
+		.defaultValue(-1.0)

Review comment:
       Moreover, do we really need a invalid value for disabling the feature?
   In which situations would a user want to completely disable the feature? In those cases, wouldn't it be enough to set a very large rate? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org