You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fijolekProjects <gi...@git.apache.org> on 2016/05/01 19:50:33 UTC

[GitHub] flink pull request: [FLINK-3190] failure rate restart strategy

GitHub user fijolekProjects opened a pull request:

    https://github.com/apache/flink/pull/1954

    [FLINK-3190] failure rate restart strategy

    Failure rate restart strategy - job should only die, if it fails too often in a given time frame

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fijolekProjects/flink FLINK-3190

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1954.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1954
    
----
commit 7b409b3de71b15fd927cf0c46b11c1d342d5d03d
Author: Michal Fijolek <mi...@gmail.com>
Date:   2016-03-13T00:40:15Z

    [FLINK-3190] failure rate restart strategy

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    @tillrohrmann I've pushed fixes and commented on your sugesstions. Should I squash and rebase it onto master as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r67505960
  
    --- Diff: docs/setup/config.md ---
    @@ -139,6 +140,15 @@ Default value is 1.
     - `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay".
     Default value is the `akka.ask.timeout`.
     
    +- `restart-strategy.failure-rate.max-failures-per-unit`: Maximum number of restarts in given time unit before failing a job in "failure-rate" strategy. 
    +Default value is 1.
    +
    +- `restart-strategy.failure-rate.failure-rate-unit`: Time unit for measuring failure rate in "failure-rate" strategy. One of java.util.concurrent.TimeUnit values .
    +Default value is `MINUTES`.
    --- End diff --
    
    I think it's better to specify an arbitrary interval.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r67506020
  
    --- Diff: docs/setup/config.md ---
    @@ -139,6 +140,15 @@ Default value is 1.
     - `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay".
     Default value is the `akka.ask.timeout`.
     
    +- `restart-strategy.failure-rate.max-failures-per-unit`: Maximum number of restarts in given time unit before failing a job in "failure-rate" strategy. 
    +Default value is 1.
    +
    +- `restart-strategy.failure-rate.failure-rate-unit`: Time unit for measuring failure rate in "failure-rate" strategy. One of java.util.concurrent.TimeUnit values .
    +Default value is `MINUTES`.
    +
    +- `restart-strategy.failure-rate.failure-rate-unit`: Delay between restart attempts, used if the default restart strategy is set to "failure-rate".
    --- End diff --
    
    shouldn't this be `restart-strategy.failure-rate.delay`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r67505611
  
    --- Diff: docs/apis/streaming/fault_tolerance.md ---
    @@ -338,6 +342,77 @@ The default value is the value of *akka.ask.timeout*.
     
     {% top %}
     
    +### Failure Rate Restart Strategy
    +
    +The failure rate restart strategy restarts job after failure, but when `failure rate` (failures per time unit) is exceeded, the job eventually fails.
    +In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
    +
    +This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
    +
    +~~~
    +restart-strategy: failure-rate
    +~~~
    +
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 40%">Configuration Parameter</th>
    +      <th class="text-left" style="width: 40%">Description</th>
    +      <th class="text-left">Default Value</th>
    +    </tr>
    +  </thead>
    +  <tbody>
    +    <tr>
    +        <td><it>restart-strategy.failure-rate.max-failures-per-unit</it></td>
    +        <td>Maximum number of restarts in given time unit before failing a job</td>
    +        <td>1</td>
    +    </tr>
    +    <tr>
    +        <td><it>restart-strategy.failure-rate.failure-rate-unit</it></td>
    +        <td>Time unit for measuring failure rate. One of java.util.concurrent.TimeUnit values</td>
    +        <td>MINUTES</td>
    +    </tr>
    +    <tr>
    +        <td><it>restart-strategy.failure-rate.delay</it></td>
    +        <td>Delay between two consecutive restart attempts</td>
    +        <td><it>akka.ask.timeout</it></td>
    +    </tr>
    +  </tbody>
    +</table>
    +
    +~~~
    +restart-strategy.failure-rate.max-failures-per-unit: 3
    +restart-strategy.failure-rate.failure-rate-unit: MINUTES
    +restart-strategy.failure-rate.delay: 10 s
    +~~~
    +
    +The failure rate restart strategy can also be set programmatically:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +env.setRestartStrategy(RestartStrategies.failureRateRestart(
    +  3, // max failures per unit
    +  java.util.concurrent.TimeUnit.MINUTES, //time unit for measuring failure rate
    +  10000 // delay in milliseconds
    --- End diff --
    
    Maybe we should support a more flexible delay specification. Something like `"10 seconds"` or `TimeUnit.seconds(10)`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3190] failure rate restart strategy

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1954#issuecomment-217126222
  
    Thanks a lot for opening a pull request!
    I'm sorry that nobody from the community responded to it yet. Its just that we are a bit overloaded currently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r67507706
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---
    @@ -108,22 +87,7 @@ public static FixedDelayRestartStrategyFactory createFactory(Configuration confi
     			timeoutString
     		);
     
    -		long delay;
    -
    -		try {
    -			delay = Duration.apply(delayString).toMillis();
    -		} catch (NumberFormatException nfe) {
    -			if (delayString.equals(timeoutString)) {
    -				throw new Exception("Invalid config value for " +
    -					ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + timeoutString +
    -					". Value must be a valid duration (such as '10 s' or '1 min')");
    -			} else {
    -				throw new Exception("Invalid config value for " +
    -					ConfigConstants.EXECUTION_RETRY_DELAY_KEY + ": " + delayString +
    -					". Value must be a valid duration (such as '100 milli' or '10 s')");
    -			}
    -		}
    -
    --- End diff --
    
    Why did  you remove the exception handling here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    Great to hear :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
GitHub user fijolekProjects reopened a pull request:

    https://github.com/apache/flink/pull/1954

    [FLINK-3190] failure rate restart strategy

    Failure rate restart strategy - job should only die, if it fails too often in a given time frame

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fijolekProjects/flink FLINK-3190

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1954.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1954
    
----
commit 1c78e8cbee8cc842daac3bd72891dbf7a515bf21
Author: Michal Fijolek <mi...@gmail.com>
Date:   2016-03-13T00:40:15Z

    [FLINK-3190] failure rate restart strategy

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r70004152
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---
    @@ -108,22 +87,7 @@ public static FixedDelayRestartStrategyFactory createFactory(Configuration confi
     			timeoutString
     		);
     
    -		long delay;
    -
    -		try {
    -			delay = Duration.apply(delayString).toMillis();
    -		} catch (NumberFormatException nfe) {
    -			if (delayString.equals(timeoutString)) {
    -				throw new Exception("Invalid config value for " +
    -					ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + timeoutString +
    -					". Value must be a valid duration (such as '10 s' or '1 min')");
    -			} else {
    -				throw new Exception("Invalid config value for " +
    -					ConfigConstants.EXECUTION_RETRY_DELAY_KEY + ": " + delayString +
    -					". Value must be a valid duration (such as '100 milli' or '10 s')");
    -			}
    -		}
    -
    --- End diff --
    
    Sorry for not being precise.
    What I meant was that `ConfigConstants` used in exception handling in `RestartStrategyFactory` and in `FixedDelayRestartStrategy` are the same even though couple lines higher during configuration reading in both classes different `ConfigConstants` are used. It looks like removed exception handling was referring to wrong `ConfigConstants`? Or maybe wrong `ConfigConstant` is used in one case (`AKKA_WATCH_HEARTBEAT_INTERVAL` vs `AKKA_WATCH_HEARTBEAT_PAUSE`)? And probably in `FixedDelayRestartStrategy` constant `RESTART_STRATEGY_FIXED_DELAY_DELAY` should be used instead of `EXECUTION_RETRY_DELAY_KEY`? I'm not quite sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on the pull request:

    https://github.com/apache/flink/pull/1954#issuecomment-217138008
  
    That's fine @rmetzger :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    Hi @tillrohrmann , that's nice. I'll do my best to push fixes today/tomorrow


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69491844
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.executiongraph.ExecutionGraph;
    +import scala.concurrent.duration.Duration;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +import static akka.dispatch.Futures.future;
    +
    +/**
    + * Restart strategy which tries to restart the given {@link ExecutionGraph} when failure rate exceeded
    + * with a fixed time delay in between.
    + */
    +public class FailureRateRestartStrategy implements RestartStrategy {
    +	private final int maxFailuresPerUnit;
    +	private final TimeUnit failureRateUnit;
    +	private final long delayBetweenRestartAttempts;
    +	private List<Long> restartTimestamps = new ArrayList<>();
    +	private boolean disabled = false;
    +
    +	public FailureRateRestartStrategy(int maxFailuresPerUnit, TimeUnit failureRateUnit, long delayBetweenRestartAttempts) {
    +		Preconditions.checkArgument(maxFailuresPerUnit > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
    +		Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive");
    +
    +		this.maxFailuresPerUnit = maxFailuresPerUnit;
    +		this.failureRateUnit = failureRateUnit;
    +		this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;
    +	}
    +
    +	@Override
    +	public boolean canRestart() {
    +		return !disabled && canRestartJob();
    +	}
    +
    +	private boolean canRestartJob() {
    +		int restartsInWindowSoFar = restartTimestamps.size();
    +		if (restartsInWindowSoFar >= maxFailuresPerUnit) {
    +			List<Long> lastFailures = restartTimestamps.subList(restartsInWindowSoFar - maxFailuresPerUnit, restartsInWindowSoFar);
    +			restartTimestamps = lastFailures; //deallocating not needed timestamps
    --- End diff --
    
    Great catch. After all I used EvictingQueue<Long> it looks really fine for this task


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r70045487
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---
    @@ -108,22 +87,7 @@ public static FixedDelayRestartStrategyFactory createFactory(Configuration confi
     			timeoutString
     		);
     
    -		long delay;
    -
    -		try {
    -			delay = Duration.apply(delayString).toMillis();
    -		} catch (NumberFormatException nfe) {
    -			if (delayString.equals(timeoutString)) {
    -				throw new Exception("Invalid config value for " +
    -					ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + timeoutString +
    -					". Value must be a valid duration (such as '10 s' or '1 min')");
    -			} else {
    -				throw new Exception("Invalid config value for " +
    -					ConfigConstants.EXECUTION_RETRY_DELAY_KEY + ": " + delayString +
    -					". Value must be a valid duration (such as '100 milli' or '10 s')");
    -			}
    -		}
    -
    --- End diff --
    
    Oh yes you're right @fijolekProjects. We use the wrong `ConfigConstant` in the exception message. It should be `ConfigConstant.RESTART_STRATEGY_FIXED_DELAY_DELAY` instead of `ConfigConstant.EXECUTION_RETRY_DELAY_KEY`. Could you fix that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    The changes look good to me. Thanks a lot for your work @fijolekProjects :-) Will be merging your PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r67505399
  
    --- Diff: docs/apis/streaming/fault_tolerance.md ---
    @@ -338,6 +342,77 @@ The default value is the value of *akka.ask.timeout*.
     
     {% top %}
     
    +### Failure Rate Restart Strategy
    +
    +The failure rate restart strategy restarts job after failure, but when `failure rate` (failures per time unit) is exceeded, the job eventually fails.
    +In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
    +
    +This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
    +
    +~~~
    +restart-strategy: failure-rate
    +~~~
    +
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 40%">Configuration Parameter</th>
    +      <th class="text-left" style="width: 40%">Description</th>
    +      <th class="text-left">Default Value</th>
    +    </tr>
    +  </thead>
    +  <tbody>
    +    <tr>
    +        <td><it>restart-strategy.failure-rate.max-failures-per-unit</it></td>
    +        <td>Maximum number of restarts in given time unit before failing a job</td>
    +        <td>1</td>
    +    </tr>
    +    <tr>
    +        <td><it>restart-strategy.failure-rate.failure-rate-unit</it></td>
    +        <td>Time unit for measuring failure rate. One of java.util.concurrent.TimeUnit values</td>
    +        <td>MINUTES</td>
    +    </tr>
    +    <tr>
    +        <td><it>restart-strategy.failure-rate.delay</it></td>
    +        <td>Delay between two consecutive restart attempts</td>
    +        <td><it>akka.ask.timeout</it></td>
    +    </tr>
    +  </tbody>
    +</table>
    +
    +~~~
    +restart-strategy.failure-rate.max-failures-per-unit: 3
    +restart-strategy.failure-rate.failure-rate-unit: MINUTES
    --- End diff --
    
    Wouldn't it also make sense to let the user specify an interval for the maximum number of failures. I think this would be more flexible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69697772
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java ---
    @@ -35,19 +34,21 @@
      * with a fixed time delay in between.
      */
     public class FailureRateRestartStrategy implements RestartStrategy {
    -	private final int maxFailuresPerUnit;
    -	private final TimeUnit failureRateUnit;
    -	private final long delayBetweenRestartAttempts;
    -	private List<Long> restartTimestamps = new ArrayList<>();
    +	private final Duration failuresInterval;
    +	private final Duration delayInterval;
    +	private EvictingQueue<Long> restartTimestampsQueue;
    --- End diff --
    
    Can we replace `EvictingQueue` with an `ArrayDequeue`? It gives you almost the same features as the `EvictingQueue` with similar performance and we would not introduce a further dependency on Guava. The reason is that we gradually try to get rid of Guava because different versions of Guava are not fully compatible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69491638
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -174,71 +140,54 @@ private void validateConstraints(ExecutionGraph eg) {
     
     	@Test
     	public void testRestartAutomatically() throws Exception {
    -		Instance instance = ExecutionGraphTestUtils.getInstance(
    -				new SimpleActorGateway(TestingUtils.directExecutionContext()),
    -				NUM_TASKS);
    +		RestartStrategy restartStrategy = new FixedDelayRestartStrategy(1, 1000);
    +		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
    +		ExecutionGraph eg = executionGraphInstanceTuple.f0;
     
    -		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
    -		scheduler.newInstanceAvailable(instance);
    -
    -		JobVertex sender = new JobVertex("Task");
    -		sender.setInvokableClass(Tasks.NoOpInvokable.class);
    -		sender.setParallelism(NUM_TASKS);
    -
    -		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
    -
    -		ExecutionGraph eg = new ExecutionGraph(
    -				TestingUtils.defaultExecutionContext(),
    -				new JobID(),
    -				"Test job",
    -				new Configuration(),
    -				ExecutionConfigTest.getSerializedConfig(),
    -				AkkaUtils.getDefaultTimeout(),
    -				new FixedDelayRestartStrategy(1, 1000));
    -		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
    +		restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true);
    +	}
     
    -		assertEquals(JobStatus.CREATED, eg.getState());
    +	@Test
    +	public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
    +		FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(2, TimeUnit.SECONDS, 0);
    +		FiniteDuration timeout = new FiniteDuration(50, TimeUnit.MILLISECONDS);
    +		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
    +		ExecutionGraph eg = executionGraphInstanceTuple.f0;
    +
    +		restartAfterFailure(eg, timeout, false);
    +		restartAfterFailure(eg, timeout, false);
    +		//failure rate limit not exceeded yet, so task is running
    +		assertEquals(JobStatus.RUNNING, eg.getState());
    +		Thread.sleep(1000); //wait for a second to restart limit rate
     
    -		eg.scheduleForExecution(scheduler);
    +		restartAfterFailure(eg, timeout, false);
    +		restartAfterFailure(eg, timeout, false);
    +		makeAFailureAndWait(eg, timeout);
    --- End diff --
    
    I deleted first half of the test (it's already tested in test below) and I increased interval to 2 seconds


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r67513578
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -174,71 +140,54 @@ private void validateConstraints(ExecutionGraph eg) {
     
     	@Test
     	public void testRestartAutomatically() throws Exception {
    -		Instance instance = ExecutionGraphTestUtils.getInstance(
    -				new SimpleActorGateway(TestingUtils.directExecutionContext()),
    -				NUM_TASKS);
    +		RestartStrategy restartStrategy = new FixedDelayRestartStrategy(1, 1000);
    +		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
    +		ExecutionGraph eg = executionGraphInstanceTuple.f0;
     
    -		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
    -		scheduler.newInstanceAvailable(instance);
    -
    -		JobVertex sender = new JobVertex("Task");
    -		sender.setInvokableClass(Tasks.NoOpInvokable.class);
    -		sender.setParallelism(NUM_TASKS);
    -
    -		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
    -
    -		ExecutionGraph eg = new ExecutionGraph(
    -				TestingUtils.defaultExecutionContext(),
    -				new JobID(),
    -				"Test job",
    -				new Configuration(),
    -				ExecutionConfigTest.getSerializedConfig(),
    -				AkkaUtils.getDefaultTimeout(),
    -				new FixedDelayRestartStrategy(1, 1000));
    -		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
    +		restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true);
    +	}
     
    -		assertEquals(JobStatus.CREATED, eg.getState());
    +	@Test
    +	public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
    +		FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(2, TimeUnit.SECONDS, 0);
    +		FiniteDuration timeout = new FiniteDuration(50, TimeUnit.MILLISECONDS);
    +		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
    +		ExecutionGraph eg = executionGraphInstanceTuple.f0;
    +
    +		restartAfterFailure(eg, timeout, false);
    +		restartAfterFailure(eg, timeout, false);
    +		//failure rate limit not exceeded yet, so task is running
    +		assertEquals(JobStatus.RUNNING, eg.getState());
    +		Thread.sleep(1000); //wait for a second to restart limit rate
     
    -		eg.scheduleForExecution(scheduler);
    +		restartAfterFailure(eg, timeout, false);
    +		restartAfterFailure(eg, timeout, false);
    +		makeAFailureAndWait(eg, timeout);
    --- End diff --
    
    Can we try to harden this test a little bit. The problem is that on Travis concurrent executions (e.g. the restart future) can take quite some time. Thus, it might easily happen that we run into the 50 milliseconds timeout or that the three failures don't occur within one second, even though that the test passes without problem on your local machine.
    
    I think it would be better to split the test so that you treat the first half and the second half in separate test cases. In the second test case, we should increase the failure interval to make sure that we can produce 3 failures within that time interval.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    Hi @fijolekProjects, what's the status on your side? I would like to include your PR in the upcoming release :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69491950
  
    --- Diff: docs/setup/config.md ---
    @@ -139,6 +140,15 @@ Default value is 1.
     - `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay".
     Default value is the `akka.ask.timeout`.
     
    +- `restart-strategy.failure-rate.max-failures-per-unit`: Maximum number of restarts in given time unit before failing a job in "failure-rate" strategy. 
    +Default value is 1.
    +
    +- `restart-strategy.failure-rate.failure-rate-unit`: Time unit for measuring failure rate in "failure-rate" strategy. One of java.util.concurrent.TimeUnit values .
    +Default value is `MINUTES`.
    +
    +- `restart-strategy.failure-rate.failure-rate-unit`: Delay between restart attempts, used if the default restart strategy is set to "failure-rate".
    --- End diff --
    
    you're right :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69698427
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/TimeInterval.java ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.util;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +
    +import java.io.Serializable;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Value object that represents time interval
    + * */
    +@PublicEvolving
    +public final class TimeInterval implements Serializable {
    --- End diff --
    
    I like your solution to have a `TimeInterval` class. However, we already have a nearly identical class `Time` in Flink. It is currently contained in the flink-streaming-java module. Thus, in order to not replicate code, I think it would be better to move the `Time` class to flink-core and use it instead of `TimeInterval`. We can move the `org.apache.flink.streaming.api.windowing.time` package to `org.apache.flink.api.common.time`. But then we have to make sure that we update possible references in the documentation to the new location.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69696777
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---
    @@ -108,22 +87,7 @@ public static FixedDelayRestartStrategyFactory createFactory(Configuration confi
     			timeoutString
     		);
     
    -		long delay;
    -
    -		try {
    -			delay = Duration.apply(delayString).toMillis();
    -		} catch (NumberFormatException nfe) {
    -			if (delayString.equals(timeoutString)) {
    -				throw new Exception("Invalid config value for " +
    -					ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + timeoutString +
    -					". Value must be a valid duration (such as '10 s' or '1 min')");
    -			} else {
    -				throw new Exception("Invalid config value for " +
    -					ConfigConstants.EXECUTION_RETRY_DELAY_KEY + ": " + delayString +
    -					". Value must be a valid duration (such as '100 milli' or '10 s')");
    -			}
    -		}
    -
    --- End diff --
    
    No we cannot remove this exception handling here, because this method and the code in `RestartStrategyFactory` are two different code paths. So please reintroduce the exception handling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69694326
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java ---
    @@ -35,19 +34,21 @@
      * with a fixed time delay in between.
      */
     public class FailureRateRestartStrategy implements RestartStrategy {
    -	private final int maxFailuresPerUnit;
    -	private final TimeUnit failureRateUnit;
    -	private final long delayBetweenRestartAttempts;
    -	private List<Long> restartTimestamps = new ArrayList<>();
    +	private final Duration failuresInterval;
    +	private final Duration delayInterval;
    +	private EvictingQueue<Long> restartTimestampsQueue;
     	private boolean disabled = false;
     
    -	public FailureRateRestartStrategy(int maxFailuresPerUnit, TimeUnit failureRateUnit, long delayBetweenRestartAttempts) {
    -		Preconditions.checkArgument(maxFailuresPerUnit > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
    -		Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive");
    +	public FailureRateRestartStrategy(int maxFailuresPerInterval, Duration failuresInterval, Duration delayInterval) {
    +		Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
    +		Preconditions.checkNotNull(failuresInterval, "Failures interval cannot be null.");
    +		Preconditions.checkNotNull(failuresInterval.length() > 0, "Failures interval must be greater than 0 ms.");
    --- End diff --
    
    `checkArgument`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    Thanks for support @tillrohrmann 
    1. Done
    2. Done
    3. EvictingQueue replaced with own FixedSizeFifoQueue. I couldn't figure out how make `ArrayDeque` fixed in size?
    
    Let me know if i should squash these commits and rebase them onto the master - there are some conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r67509184
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.executiongraph.ExecutionGraph;
    +import scala.concurrent.duration.Duration;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +import static akka.dispatch.Futures.future;
    +
    +/**
    + * Restart strategy which tries to restart the given {@link ExecutionGraph} when failure rate exceeded
    + * with a fixed time delay in between.
    + */
    +public class FailureRateRestartStrategy implements RestartStrategy {
    +	private final int maxFailuresPerUnit;
    +	private final TimeUnit failureRateUnit;
    +	private final long delayBetweenRestartAttempts;
    +	private List<Long> restartTimestamps = new ArrayList<>();
    +	private boolean disabled = false;
    +
    +	public FailureRateRestartStrategy(int maxFailuresPerUnit, TimeUnit failureRateUnit, long delayBetweenRestartAttempts) {
    +		Preconditions.checkArgument(maxFailuresPerUnit > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
    +		Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive");
    +
    +		this.maxFailuresPerUnit = maxFailuresPerUnit;
    +		this.failureRateUnit = failureRateUnit;
    +		this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;
    +	}
    +
    +	@Override
    +	public boolean canRestart() {
    +		return !disabled && canRestartJob();
    +	}
    +
    +	private boolean canRestartJob() {
    +		int restartsInWindowSoFar = restartTimestamps.size();
    +		if (restartsInWindowSoFar >= maxFailuresPerUnit) {
    +			List<Long> lastFailures = restartTimestamps.subList(restartsInWindowSoFar - maxFailuresPerUnit, restartsInWindowSoFar);
    +			restartTimestamps = lastFailures; //deallocating not needed timestamps
    --- End diff --
    
    This does not deallocate not needed timestamps, because `subList` only returns a view on the `restartTimestamps` list. So internally, the original list will always be referenced. Furthermore, each add might cause that the whole list is copied to a new list which is one element larger.
    
    I think we should use a fixed size queue (size = maxFailuresPerUnit). Whenever we call `canRestartJob` we first pop all timestamps which fall out of the current failure interval. If the resulting size of the queue is `< maxFailuresPerUnit`, then we can restart. Otherwise we've seen more than `maxFailurePerUnit` in the specified time interval.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r67506447
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java ---
    @@ -54,6 +55,18 @@ public static RestartStrategyConfiguration fixedDelayRestart(
     		return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayBetweenAttempts);
     	}
     
    +	/**
    +	 * Generates a FailureRateRestartStrategyConfiguration.
    +	 *
    +	 * @param maxFailureRate Maximum number of restarts in given time unit {@code failureRateUnit} before failing a job
    +	 * @param failureRateUnit Time unit for measuring failure rate
    +	 * @param delayBetweenRestartAttempts Delay in-between restart attempts
    +	 */
    +	public static FailureRateRestartStrategyConfiguration failureRateRestart(
    +			int maxFailureRate, TimeUnit failureRateUnit, long delayBetweenRestartAttempts) {
    --- End diff --
    
    Maybe we could change the parameter list to `failureRateRestart(int rate, long interval, TimeUnit intervalUnit, long delay, TimeUnit delayUnit)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69491935
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java ---
    @@ -54,6 +55,18 @@ public static RestartStrategyConfiguration fixedDelayRestart(
     		return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayBetweenAttempts);
     	}
     
    +	/**
    +	 * Generates a FailureRateRestartStrategyConfiguration.
    +	 *
    +	 * @param maxFailureRate Maximum number of restarts in given time unit {@code failureRateUnit} before failing a job
    +	 * @param failureRateUnit Time unit for measuring failure rate
    +	 * @param delayBetweenRestartAttempts Delay in-between restart attempts
    +	 */
    +	public static FailureRateRestartStrategyConfiguration failureRateRestart(
    +			int maxFailureRate, TimeUnit failureRateUnit, long delayBetweenRestartAttempts) {
    --- End diff --
    
    I've added TimeInterval to group (long, TimeUnit) pair. I guess it will be more convenient for the user, what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    Thanks for your patience @tillrohrmann :) 
    I've replaced `FixedSizeFifoQueue` with `ArrayDeque`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r70094658
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/util/FixedSizeFifoQueue.java ---
    @@ -0,0 +1,55 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.util;
    +
    +import java.util.concurrent.ArrayBlockingQueue;
    +
    +/**
    + * Adding element to full queue removes its head and then adds new element. It's why size of this queue is fixed.
    + * Example:
    + * <pre>
    + * {@code
    + * Queue q = new FixedSizeFifoQueue<Long>(2);
    + * q.add(1); // q = [1]
    + * q.add(2); // q = [1, 2]
    + * q.add(3); // q = [2, 3]
    + * q.peek(); // 2
    + * }
    + * </pre>
    + */
    +public class FixedSizeFifoQueue<E> extends ArrayBlockingQueue<E> {
    --- End diff --
    
    I think it's not so efficient to use a `ArrayBlockingQueue` as the basis for the implementation because we don't need protection against concurrency. I think it would be great if we could use `ArrayDeque` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69490778
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java ---
    @@ -108,22 +87,7 @@ public static FixedDelayRestartStrategyFactory createFactory(Configuration confi
     			timeoutString
     		);
     
    -		long delay;
    -
    -		try {
    -			delay = Duration.apply(delayString).toMillis();
    -		} catch (NumberFormatException nfe) {
    -			if (delayString.equals(timeoutString)) {
    -				throw new Exception("Invalid config value for " +
    -					ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + timeoutString +
    -					". Value must be a valid duration (such as '10 s' or '1 min')");
    -			} else {
    -				throw new Exception("Invalid config value for " +
    -					ConfigConstants.EXECUTION_RETRY_DELAY_KEY + ": " + delayString +
    -					". Value must be a valid duration (such as '100 milli' or '10 s')");
    -			}
    -		}
    -
    --- End diff --
    
    it looks like copypaste from RestartStrategyFactory and it doesn't make sense to me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r69694368
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java ---
    @@ -35,19 +34,21 @@
      * with a fixed time delay in between.
      */
     public class FailureRateRestartStrategy implements RestartStrategy {
    -	private final int maxFailuresPerUnit;
    -	private final TimeUnit failureRateUnit;
    -	private final long delayBetweenRestartAttempts;
    -	private List<Long> restartTimestamps = new ArrayList<>();
    +	private final Duration failuresInterval;
    +	private final Duration delayInterval;
    +	private EvictingQueue<Long> restartTimestampsQueue;
     	private boolean disabled = false;
     
    -	public FailureRateRestartStrategy(int maxFailuresPerUnit, TimeUnit failureRateUnit, long delayBetweenRestartAttempts) {
    -		Preconditions.checkArgument(maxFailuresPerUnit > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
    -		Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive");
    +	public FailureRateRestartStrategy(int maxFailuresPerInterval, Duration failuresInterval, Duration delayInterval) {
    +		Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
    +		Preconditions.checkNotNull(failuresInterval, "Failures interval cannot be null.");
    +		Preconditions.checkNotNull(failuresInterval.length() > 0, "Failures interval must be greater than 0 ms.");
    +		Preconditions.checkNotNull(delayInterval, "Delay interval cannot be null.");
    +		Preconditions.checkNotNull(delayInterval.length() >= 0, "Delay interval must be at least 0 ms.");
    --- End diff --
    
    `checkArgument`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1954#discussion_r70094058
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java ---
    @@ -36,19 +36,19 @@
     public class FailureRateRestartStrategy implements RestartStrategy {
     	private final Duration failuresInterval;
     	private final Duration delayInterval;
    -	private EvictingQueue<Long> restartTimestampsQueue;
    +	private FixedSizeFifoQueue<Long> restartTimestampsQueue;
     	private boolean disabled = false;
     
     	public FailureRateRestartStrategy(int maxFailuresPerInterval, Duration failuresInterval, Duration delayInterval) {
    -		Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
     		Preconditions.checkNotNull(failuresInterval, "Failures interval cannot be null.");
    -		Preconditions.checkNotNull(failuresInterval.length() > 0, "Failures interval must be greater than 0 ms.");
     		Preconditions.checkNotNull(delayInterval, "Delay interval cannot be null.");
    -		Preconditions.checkNotNull(delayInterval.length() >= 0, "Delay interval must be at least 0 ms.");
    +		Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
    +		Preconditions.checkArgument(failuresInterval.length() > 0, "Failures interval must be greater than 0 ms.");
    +		Preconditions.checkArgument(delayInterval.length() >= 0, "Delay interval must be at least 0 ms.");
     
     		this.failuresInterval = failuresInterval;
     		this.delayInterval = delayInterval;
    -		this.restartTimestampsQueue = EvictingQueue.create(maxFailuresPerInterval);
    +		this.restartTimestampsQueue = new FixedSizeFifoQueue<>(maxFailuresPerInterval);
    --- End diff --
    
    Can't we simply use `new ArrayDeque(maxFailuresPerInterval)`? Of course, we would then allocate 2^(ceil(log(maxFailuresPerInterval)/log(2)) elements, but this should be ok. We could then check in the `restart` method via the `size` method whether the queue is full or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    Thanks for great review :) gonna fix this issues soon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    Hi @fijolekProjects, sorry for the delayed review. Your work looks really good :-) I only had some minor comments which we should address before your PR can be merged. It's a cool feature which you've contributed \U0001f44d 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    The changes look good to me. Thanks for your work @fijolekProjects :-) I had one final remark concerning the `FixedSizeFifoQueue`. Maybe we could try to replace it with the `ArrayDeque`. 
    
    I think you cannot make `ArrayDeque` fixed in size, but this is not a problem if you check the size before appending elements. If the maximum size is reached, then we have to remove elements from the queue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #1954: [FLINK-3190] failure rate restart strategy

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1954


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3190] failure rate restart strategy

Posted by fijolekProjects <gi...@git.apache.org>.
Github user fijolekProjects closed the pull request at:

    https://github.com/apache/flink/pull/1954


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #1954: [FLINK-3190] failure rate restart strategy

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/1954
  
    Good changes and thanks for your work @fijolekProjects. I think the PR is almost ready to get merged :-) I had only three more comments left to address: 
    
    1. We should move `Time` to flink-core. Then we could use `Time` instead of `TimeInterval`. 
    2. We have to reintroduce the exception handling in `FixedDelayRestartStrategy`.
    3. If possible we should replace `EvictingQueue` with `ArrayDequeue`.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---