You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StephanEwen <gi...@git.apache.org> on 2017/07/19 08:33:46 UTC

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/4364

    [FLINK-7216] [distr. coordination] Guard against concurrent global failover

    **This is one of the blocker issues for the 1.3.2 release.**
    
    ## What is the purpose of the change
    
    This fixed the bug [FLINK-7216](https://issues.apache.org/jira/browse/FLINK-7216) where some race conditions can trigger concurrent failovers, triggering a restart-storm.
    
    The heart of the bug is the fact that we allow initiating another restart while already being in state `RESTARTING`. That was introduced as a safety net to catch exceptions (implementation bugs) that are reported in that state and need a full recovery to ensure consistency.
    
    However, this means that accidentally, multiple restarts may be triggered/queued and then execute after another. While one attempt is executing the failover, the next one will interfere or abort (as detected conflicting) and schedule another recovery, leading to the above mentioned restart storm. The restart storm subsides once one restart attempt makes enough progress (before the other interferes) to actually finish the scheduling phase.
    
    ## Brief change log
    
    This contains three issues, because the first two were needed for a preparing the fix.
      - [FLINK-6665](https://issues.apache.org/jira/browse/FLINK-6665) and [FLINK-6667](https://issues.apache.org/jira/browse/FLINK-6667) introduce an indirection where the `RestartStrategy` does no longer call `restart()` on the `ExecutionGraph` directly. Instead, they call a callback to initiate the restart.
      - The actual fix makes sure that the `globalModVersion` (which tracks global changes such as full restarts in the ExecutionGraph) is unchanged between triggering the restart and executing it. When scheduling multiple restart requests, only one will actually take effect, while the others detect being subsumed.
    
    ## Verifying this change
    
    This change added the following tests:
      - `ExecutionGraphRestartTest#testConcurrentGlobalFailAndRestarts()` tests explicitly that setting
      - `ExecutionGraphRestartTest#testConcurrentLocalFailAndRestart()` tests a similar setup 
    
    The general working of that mechanism is also covered by various existing test in `org.apache.flink.runtime.executiongraph.restart`
    
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **no**
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes**:
    
    It the change affects the restart logic on the `JobManager`.
    
    ## Documentation
    
      - Does this pull request introduce a new feature? **no**
      - If yes, how is the feature documented? **not applicable**
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink concurrent_restarts_13

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4364.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4364
    
----
commit 1abb816d664bdac9d8b9af438769b9f685e768ce
Author: zjureel <zj...@gmail.com>
Date:   2017-07-18T17:27:56Z

    [FLINK-6665] [FLINK-6667] [distributed coordination] Use a callback and a ScheduledExecutor for ExecutionGraph restarts
    
    Initial work by zjureel@gmail.com , improved by sewen@apache.org.

commit ef88524c808766e08d990f3bb69c45b04807c7c2
Author: Stephan Ewen <se...@apache.org>
Date:   2017-07-18T17:49:56Z

    [FLINK-7216] [distr. coordination] Guard against concurrent global failover

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128515593
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception {
     		assertEquals(JobStatus.SUSPENDED, eg.getState());
     	}
     
    +	@Test
    +	public void testConcurrentLocalFailAndRestart() throws Exception {
    +		final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
    +		eg.setScheduleMode(ScheduleMode.EAGER);
    +		eg.scheduleForExecution();
    +
    +		waitUntilDeployedAndSwitchToRunning(eg, 1000);
    +
    +		final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
    +		final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
    +		final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
    +
    +		final OneShotLatch failTrigger = new OneShotLatch();
    +		final CountDownLatch readyLatch = new CountDownLatch(2);
    +
    +		Thread failure1 = new Thread() {
    +			@Override
    +			public void run() {
    +				readyLatch.countDown();
    +				try {
    +					failTrigger.await();
    +				} catch (InterruptedException ignored) {}
    +
    +				first.fail(new Exception("intended test failure 1"));
    +			}
    +		};
    +
    +		Thread failure2 = new Thread() {
    +			@Override
    +			public void run() {
    +				readyLatch.countDown();
    +				try {
    +					failTrigger.await();
    +				} catch (InterruptedException ignored) {}
    +
    +				last.fail(new Exception("intended test failure 2"));
    +			}
    +		};
    +
    +		// make sure both threads start simultaneously
    +		failure1.start();
    +		failure2.start();
    +		readyLatch.await();
    +		failTrigger.trigger();
    +
    +		waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
    +		completeCancellingForAllVertices(eg);
    +
    +		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
    +		waitUntilDeployedAndSwitchToRunning(eg, 1000);
    +		finishAllVertices(eg);
    +
    +		eg.waitUntilTerminal();
    +		assertEquals(JobStatus.FINISHED, eg.getState());
    +	}
    +
    +	@Test
    +	public void testConcurrentGlobalFailAndRestarts() throws Exception {
    --- End diff --
    
    Jip, I think so too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128509863
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception {
     		assertEquals(JobStatus.SUSPENDED, eg.getState());
     	}
     
    +	@Test
    +	public void testConcurrentLocalFailAndRestart() throws Exception {
    --- End diff --
    
    Right, this one was a test that should have been there in the first place and I took this chance to add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128515844
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java ---
    @@ -0,0 +1,128 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Unit test for the {@link FailureRateRestartStrategy}.
    + */
    +public class FailureRateRestartStrategyTest {
    +
    +	public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
    +
    +	public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
    +
    +	@After
    +	public void shutdownExecutor() {
    +		executorService.shutdownNow();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	@Test
    +	public void testManyFailuresWithinRate() throws Exception {
    +		final int numAttempts = 10;
    +		final int intervalMillis = 1;
    +
    +		final FailureRateRestartStrategy restartStrategy =
    +				new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0));
    +
    +		for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft) {
    +			assertTrue(restartStrategy.canRestart());
    +			restartStrategy.restart(new NoOpRestarter(), executor);
    +			sleepGuaranteed(2 * intervalMillis);
    +		}
    +
    +		assertTrue(restartStrategy.canRestart());
    +	}
    +
    +	@Test
    +	public void testFailuresExceedingRate() throws Exception {
    +		final int numFailures = 3;
    +		final int intervalMillis = 10_000;
    +
    +		final FailureRateRestartStrategy restartStrategy =
    +				new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0));
    +
    +		for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft) {
    +			assertTrue(restartStrategy.canRestart());
    +			restartStrategy.restart(new NoOpRestarter(), executor);
    +		}
    +
    +		// now the rate should be exceeded
    +		assertFalse(restartStrategy.canRestart());
    +	}
    +
    +	@Test
    +	public void testDelay() throws Exception {
    +		final long restartDelay = 2;
    +		final int numberRestarts = 10;
    +
    +		final FailureRateRestartStrategy strategy =
    +			new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay));
    +
    +		for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
    +			assertTrue(strategy.canRestart());
    +
    +			final OneShotLatch sync = new OneShotLatch();
    +			final RestartCallback restarter = new LatchedRestarter(sync);
    +
    +			final long time = System.nanoTime();
    +			strategy.restart(restarter, executor);
    +			sync.await();
    +
    +			final long elapsed = System.nanoTime() - time;
    +			assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * This method makes sure that the actual interval and is not spuriously waking up.
    --- End diff --
    
    Perfect


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128509787
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -159,34 +161,6 @@ public void testRestartAutomatically() throws Exception {
     	}
     
     	@Test
    -	public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
    --- End diff --
    
    Yes, as part of introducing the "callback" indirection, we can now also test the restart strategies much better, without always setting up a full ExecutionGraph. I added it to the refactoring.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128491763
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception {
     		assertEquals(JobStatus.SUSPENDED, eg.getState());
     	}
     
    +	@Test
    +	public void testConcurrentLocalFailAndRestart() throws Exception {
    +		final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
    +		eg.setScheduleMode(ScheduleMode.EAGER);
    +		eg.scheduleForExecution();
    +
    +		waitUntilDeployedAndSwitchToRunning(eg, 1000);
    +
    +		final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
    +		final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
    +		final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
    +
    +		final OneShotLatch failTrigger = new OneShotLatch();
    +		final CountDownLatch readyLatch = new CountDownLatch(2);
    +
    +		Thread failure1 = new Thread() {
    +			@Override
    +			public void run() {
    +				readyLatch.countDown();
    +				try {
    +					failTrigger.await();
    +				} catch (InterruptedException ignored) {}
    +
    +				first.fail(new Exception("intended test failure 1"));
    +			}
    +		};
    +
    +		Thread failure2 = new Thread() {
    +			@Override
    +			public void run() {
    +				readyLatch.countDown();
    +				try {
    +					failTrigger.await();
    +				} catch (InterruptedException ignored) {}
    +
    +				last.fail(new Exception("intended test failure 2"));
    +			}
    +		};
    +
    +		// make sure both threads start simultaneously
    +		failure1.start();
    +		failure2.start();
    +		readyLatch.await();
    +		failTrigger.trigger();
    +
    +		waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
    +		completeCancellingForAllVertices(eg);
    --- End diff --
    
    By the way, I noticed that `completeCancellingForAllVertices()` and `finishAllVertices()` have slightly misleading Javadoc. That threw me off a bit when reviewing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4364: [FLINK-7216] [distr. coordination] Guard against concurre...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4364
  
    Concerning the 'period' check style rule:
    
    I think that the common language rules (not JavaDoc specific) are to add a period after complete sentences. That would mean that parameter descriptions, when not complete sentences, are not terminated by a period.
    
    Are we rolling a rule that every text line has to be terminated in a period/fullstop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128512192
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java ---
    @@ -0,0 +1,128 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Unit test for the {@link FailureRateRestartStrategy}.
    + */
    +public class FailureRateRestartStrategyTest {
    +
    +	public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
    +
    +	public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
    +
    +	@After
    +	public void shutdownExecutor() {
    +		executorService.shutdownNow();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	@Test
    +	public void testManyFailuresWithinRate() throws Exception {
    +		final int numAttempts = 10;
    +		final int intervalMillis = 1;
    +
    +		final FailureRateRestartStrategy restartStrategy =
    +				new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0));
    +
    +		for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft) {
    +			assertTrue(restartStrategy.canRestart());
    +			restartStrategy.restart(new NoOpRestarter(), executor);
    +			sleepGuaranteed(2 * intervalMillis);
    +		}
    +
    +		assertTrue(restartStrategy.canRestart());
    +	}
    +
    +	@Test
    +	public void testFailuresExceedingRate() throws Exception {
    +		final int numFailures = 3;
    +		final int intervalMillis = 10_000;
    +
    +		final FailureRateRestartStrategy restartStrategy =
    +				new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0));
    +
    +		for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft) {
    +			assertTrue(restartStrategy.canRestart());
    +			restartStrategy.restart(new NoOpRestarter(), executor);
    +		}
    +
    +		// now the rate should be exceeded
    +		assertFalse(restartStrategy.canRestart());
    +	}
    +
    +	@Test
    +	public void testDelay() throws Exception {
    +		final long restartDelay = 2;
    +		final int numberRestarts = 10;
    +
    +		final FailureRateRestartStrategy strategy =
    +			new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay));
    +
    +		for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
    +			assertTrue(strategy.canRestart());
    +
    +			final OneShotLatch sync = new OneShotLatch();
    +			final RestartCallback restarter = new LatchedRestarter(sync);
    +
    +			final long time = System.nanoTime();
    +			strategy.restart(restarter, executor);
    +			sync.await();
    +
    +			final long elapsed = System.nanoTime() - time;
    +			assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * This method makes sure that the actual interval and is not spuriously waking up.
    --- End diff --
    
    Then the whole method and test anyways aborts exceptionally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128196537
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java ---
    @@ -19,27 +19,33 @@
     package org.apache.flink.runtime.executiongraph.restart;
     
     import org.apache.flink.runtime.executiongraph.ExecutionGraph;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    -
    -import java.util.concurrent.Callable;
    -
    -class ExecutionGraphRestarter {
    -	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
    -	public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) {
    -		return new Callable<Object>() {
    -			@Override
    -			public Object call() throws Exception {
    -				try {
    -					LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
    -					// do the delay
    -					Thread.sleep(delayBetweenRestartAttemptsInMillis);
    -				} catch(InterruptedException e) {
    -					// should only happen on shutdown
    -				}
    -				executionGraph.restart();
    -				return null;
    -			}
    -		};
    +
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}. 
    + * 
    + * <p>This callback implementation is one-shot; it can only be used once.
    + */
    +public class ExecutionGraphRestartCallback implements RestartCallback {
    +
    +	/** The ExecutionGraph to restart */
    +	private final ExecutionGraph execGraph;
    +
    +	/** Atomic flag to make sure this is used only once */
    --- End diff --
    
    Please add a period here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128474733
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -159,34 +161,6 @@ public void testRestartAutomatically() throws Exception {
     	}
     
     	@Test
    -	public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
    --- End diff --
    
    These tests are superseded by the newly added tests in `FailureRateRestartStrategyTest`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128196559
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java ---
    @@ -19,27 +19,33 @@
     package org.apache.flink.runtime.executiongraph.restart;
     
     import org.apache.flink.runtime.executiongraph.ExecutionGraph;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    -
    -import java.util.concurrent.Callable;
    -
    -class ExecutionGraphRestarter {
    -	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
    -	public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) {
    -		return new Callable<Object>() {
    -			@Override
    -			public Object call() throws Exception {
    -				try {
    -					LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
    -					// do the delay
    -					Thread.sleep(delayBetweenRestartAttemptsInMillis);
    -				} catch(InterruptedException e) {
    -					// should only happen on shutdown
    -				}
    -				executionGraph.restart();
    -				return null;
    -			}
    -		};
    +
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}. 
    + * 
    + * <p>This callback implementation is one-shot; it can only be used once.
    + */
    +public class ExecutionGraphRestartCallback implements RestartCallback {
    +
    +	/** The ExecutionGraph to restart */
    --- End diff --
    
    Please add a period here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128477775
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---
    @@ -189,6 +189,35 @@ public static void finishAllVertices(ExecutionGraph eg) {
     		}
     	}
     
    +	/**
    +	 * Turns a newly scheduled execution graph into a state where all vertices run.
    +	 * This waits until all executions have reached state 'DEPLOYING' and then switches them to running.
    +	 */
    +	public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout) throws TimeoutException {
    +		// wait until everything is running
    +		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
    +			final Execution exec = ev.getCurrentExecutionAttempt();
    +			waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout);
    +		}
    +
    +		// Note: As ugly as it is, we need this minor sleep, because between switching
    +		// to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check
    +		// against concurrent modifications (cancel / fail). We can only switch this to running
    +		// once that check is passed. For the actual runtime, this switch is triggered by a callback
    +		// from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers
    +		// which cannot easily tell us when that condition has happened, unfortunately.
    +		try {
    +			Thread.sleep(2);
    --- End diff --
    
    😢 but it seems there's no way around it. Could this lead to flaky tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4364: [FLINK-7216] [distr. coordination] Guard against concurre...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4364
  
    Okay, will update the periods. The linguist in my heart cries a bit, but I guess it makes sense that we cannot expect checkstyle to figure out if a sentence is a complete sentence or not...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4364: [FLINK-7216] [distr. coordination] Guard against concurre...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4364
  
    With the current rules, the first sentence of any **javadoc** must end in a period.
    
    So, this is invalid:
    ```
    /** some parameter */
    private final int myParameter
    ```
    
    But, this is fine:
    ```
    // some parameter
    private final int myParamter
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128196611
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java ---
    @@ -19,27 +19,33 @@
     package org.apache.flink.runtime.executiongraph.restart;
     
     import org.apache.flink.runtime.executiongraph.ExecutionGraph;
    -import org.slf4j.Logger;
    -import org.slf4j.LoggerFactory;
    -
    -import java.util.concurrent.Callable;
    -
    -class ExecutionGraphRestarter {
    -	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
    -	public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) {
    -		return new Callable<Object>() {
    -			@Override
    -			public Object call() throws Exception {
    -				try {
    -					LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
    -					// do the delay
    -					Thread.sleep(delayBetweenRestartAttemptsInMillis);
    -				} catch(InterruptedException e) {
    -					// should only happen on shutdown
    -				}
    -				executionGraph.restart();
    -				return null;
    -			}
    -		};
    +
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
    +/**
    + * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}. 
    + * 
    --- End diff --
    
    Please remove the trailing space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128511963
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception {
     		assertEquals(JobStatus.SUSPENDED, eg.getState());
     	}
     
    +	@Test
    +	public void testConcurrentLocalFailAndRestart() throws Exception {
    +		final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
    +		eg.setScheduleMode(ScheduleMode.EAGER);
    +		eg.scheduleForExecution();
    +
    +		waitUntilDeployedAndSwitchToRunning(eg, 1000);
    +
    +		final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
    +		final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
    +		final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
    +
    +		final OneShotLatch failTrigger = new OneShotLatch();
    +		final CountDownLatch readyLatch = new CountDownLatch(2);
    +
    +		Thread failure1 = new Thread() {
    +			@Override
    +			public void run() {
    +				readyLatch.countDown();
    +				try {
    +					failTrigger.await();
    +				} catch (InterruptedException ignored) {}
    +
    +				first.fail(new Exception("intended test failure 1"));
    +			}
    +		};
    +
    +		Thread failure2 = new Thread() {
    +			@Override
    +			public void run() {
    +				readyLatch.countDown();
    +				try {
    +					failTrigger.await();
    +				} catch (InterruptedException ignored) {}
    +
    +				last.fail(new Exception("intended test failure 2"));
    +			}
    +		};
    +
    +		// make sure both threads start simultaneously
    +		failure1.start();
    +		failure2.start();
    +		readyLatch.await();
    +		failTrigger.trigger();
    +
    +		waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
    +		completeCancellingForAllVertices(eg);
    +
    +		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
    +		waitUntilDeployedAndSwitchToRunning(eg, 1000);
    +		finishAllVertices(eg);
    +
    +		eg.waitUntilTerminal();
    +		assertEquals(JobStatus.FINISHED, eg.getState());
    +	}
    +
    +	@Test
    +	public void testConcurrentGlobalFailAndRestarts() throws Exception {
    --- End diff --
    
    From the offline chat: I think you are missing the asynchrony in the restarting, leading to a lock in the cherrypicked code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128480263
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception {
     		assertEquals(JobStatus.SUSPENDED, eg.getState());
     	}
     
    +	@Test
    +	public void testConcurrentLocalFailAndRestart() throws Exception {
    --- End diff --
    
    This only verifies that we don't break the existing and working local failover, right? This test should also succeed on the current master and I checked and it indeed does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128197299
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java ---
    @@ -37,15 +37,26 @@
     	/** Atomic flag to make sure this is used only once */
     	private final AtomicBoolean used;
     
    -	public ExecutionGraphRestartCallback(ExecutionGraph execGraph) {
    +	/** The globalModVersion that the ExecutionGraph needs to have for the restart to go through */
    --- End diff --
    
    Please add a period.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen closed the pull request at:

    https://github.com/apache/flink/pull/4364


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4364: [FLINK-7216] [distr. coordination] Guard against concurre...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/4364
  
    Thanks for the reviews. Addressing the comments, rerunning tests, and merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128476571
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---
    @@ -1120,10 +1125,16 @@ public void accept(Void value) {
     		}
     	}
     
    -	public void restart() {
    +	public void restart(long expectedGlobalVersion) {
     		try {
     			synchronized (progressLock) {
    -				JobStatus current = state;
    +				// check and increment the global version to move this recovery up
    --- End diff --
    
    "check the current global version to determine whether our recovery attempt is still current"?
    
    It's not incrementing the global version here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128512830
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java ---
    @@ -189,6 +189,35 @@ public static void finishAllVertices(ExecutionGraph eg) {
     		}
     	}
     
    +	/**
    +	 * Turns a newly scheduled execution graph into a state where all vertices run.
    +	 * This waits until all executions have reached state 'DEPLOYING' and then switches them to running.
    +	 */
    +	public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout) throws TimeoutException {
    +		// wait until everything is running
    +		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
    +			final Execution exec = ev.getCurrentExecutionAttempt();
    +			waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout);
    +		}
    +
    +		// Note: As ugly as it is, we need this minor sleep, because between switching
    +		// to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check
    +		// against concurrent modifications (cancel / fail). We can only switch this to running
    +		// once that check is passed. For the actual runtime, this switch is triggered by a callback
    +		// from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers
    +		// which cannot easily tell us when that condition has happened, unfortunately.
    +		try {
    +			Thread.sleep(2);
    --- End diff --
    
    In very rare cases, it might. I want to change the `Execution` a bit on the `master` to make this unnecessary.
    
    However, that is too much surgery in a critical part for a bugfix release, so I decided to be conservative in the runtime code and rather pay this price in the tests.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4364: [FLINK-7216] [distr. coordination] Guard against concurre...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4364
  
    +1 for merging!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128475216
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java ---
    @@ -0,0 +1,128 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.executiongraph.restart;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutor;
    +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
    +
    +import org.junit.After;
    +import org.junit.Test;
    +
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Unit test for the {@link FailureRateRestartStrategy}.
    + */
    +public class FailureRateRestartStrategyTest {
    +
    +	public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
    +
    +	public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
    +
    +	@After
    +	public void shutdownExecutor() {
    +		executorService.shutdownNow();
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	@Test
    +	public void testManyFailuresWithinRate() throws Exception {
    +		final int numAttempts = 10;
    +		final int intervalMillis = 1;
    +
    +		final FailureRateRestartStrategy restartStrategy =
    +				new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0));
    +
    +		for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft) {
    +			assertTrue(restartStrategy.canRestart());
    +			restartStrategy.restart(new NoOpRestarter(), executor);
    +			sleepGuaranteed(2 * intervalMillis);
    +		}
    +
    +		assertTrue(restartStrategy.canRestart());
    +	}
    +
    +	@Test
    +	public void testFailuresExceedingRate() throws Exception {
    +		final int numFailures = 3;
    +		final int intervalMillis = 10_000;
    +
    +		final FailureRateRestartStrategy restartStrategy =
    +				new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0));
    +
    +		for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft) {
    +			assertTrue(restartStrategy.canRestart());
    +			restartStrategy.restart(new NoOpRestarter(), executor);
    +		}
    +
    +		// now the rate should be exceeded
    +		assertFalse(restartStrategy.canRestart());
    +	}
    +
    +	@Test
    +	public void testDelay() throws Exception {
    +		final long restartDelay = 2;
    +		final int numberRestarts = 10;
    +
    +		final FailureRateRestartStrategy strategy =
    +			new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay));
    +
    +		for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
    +			assertTrue(strategy.canRestart());
    +
    +			final OneShotLatch sync = new OneShotLatch();
    +			final RestartCallback restarter = new LatchedRestarter(sync);
    +
    +			final long time = System.nanoTime();
    +			strategy.restart(restarter, executor);
    +			sync.await();
    +
    +			final long elapsed = System.nanoTime() - time;
    +			assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000);
    +		}
    +	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * This method makes sure that the actual interval and is not spuriously waking up.
    --- End diff --
    
    "This method makes sure to sleep for the required interval and that we don't spuriously wake up."?
    
    Also, what happens if `Thread.sleep()` is interrupted?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128477515
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -727,4 +837,46 @@ private static void haltExecution(ExecutionGraph eg) {
     
     		assertEquals(JobStatus.FINISHED, eg.getState());
     	}
    +
    +	// ------------------------------------------------------------------------
    +
    +	/**
    +	 * A TaskManager gateway that does not ack cancellations.
    +	 */
    +	private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway {
    +
    +		@Override
    +		public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
    +			return new FlinkCompletableFuture<>();
    +		}
    +	}
    +
    +	private static final class TriggeredRestartStrategy implements RestartStrategy {
    --- End diff --
    
    "A {@link RestartStrategy} that blocks restarting on a given {@link OneShotLatch}."?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128494097
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception {
     		assertEquals(JobStatus.SUSPENDED, eg.getState());
     	}
     
    +	@Test
    +	public void testConcurrentLocalFailAndRestart() throws Exception {
    +		final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
    +		eg.setScheduleMode(ScheduleMode.EAGER);
    +		eg.scheduleForExecution();
    +
    +		waitUntilDeployedAndSwitchToRunning(eg, 1000);
    +
    +		final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
    +		final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
    +		final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
    +
    +		final OneShotLatch failTrigger = new OneShotLatch();
    +		final CountDownLatch readyLatch = new CountDownLatch(2);
    +
    +		Thread failure1 = new Thread() {
    +			@Override
    +			public void run() {
    +				readyLatch.countDown();
    +				try {
    +					failTrigger.await();
    +				} catch (InterruptedException ignored) {}
    +
    +				first.fail(new Exception("intended test failure 1"));
    +			}
    +		};
    +
    +		Thread failure2 = new Thread() {
    +			@Override
    +			public void run() {
    +				readyLatch.countDown();
    +				try {
    +					failTrigger.await();
    +				} catch (InterruptedException ignored) {}
    +
    +				last.fail(new Exception("intended test failure 2"));
    +			}
    +		};
    +
    +		// make sure both threads start simultaneously
    +		failure1.start();
    +		failure2.start();
    +		readyLatch.await();
    +		failTrigger.trigger();
    +
    +		waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
    +		completeCancellingForAllVertices(eg);
    +
    +		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
    +		waitUntilDeployedAndSwitchToRunning(eg, 1000);
    +		finishAllVertices(eg);
    +
    +		eg.waitUntilTerminal();
    +		assertEquals(JobStatus.FINISHED, eg.getState());
    +	}
    +
    +	@Test
    +	public void testConcurrentGlobalFailAndRestarts() throws Exception {
    --- End diff --
    
    I tried running this on current master and the test failed but I didn't see a "storm of restarts"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128196982
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java ---
    @@ -33,9 +34,14 @@
     	boolean canRestart();
     
     	/**
    -	 * Restarts the given {@link ExecutionGraph}.
    +	 * Called by the ExecutionGraph to eventually trigger a full recovery.
    +	 * The recovery must be triggered on the given callback object, and may be delayed
    +	 * with the help of the given scheduled executor.
    +	 * 
    --- End diff --
    
    Please remove the trailing space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4364: [FLINK-7216] [distr. coordination] Guard against c...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4364#discussion_r128510738
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---
    @@ -581,6 +565,106 @@ public void testSuspendWhileRestarting() throws Exception {
     		assertEquals(JobStatus.SUSPENDED, eg.getState());
     	}
     
    +	@Test
    +	public void testConcurrentLocalFailAndRestart() throws Exception {
    +		final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
    +		eg.setScheduleMode(ScheduleMode.EAGER);
    +		eg.scheduleForExecution();
    +
    +		waitUntilDeployedAndSwitchToRunning(eg, 1000);
    +
    +		final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
    +		final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
    +		final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
    +
    +		final OneShotLatch failTrigger = new OneShotLatch();
    +		final CountDownLatch readyLatch = new CountDownLatch(2);
    +
    +		Thread failure1 = new Thread() {
    +			@Override
    +			public void run() {
    +				readyLatch.countDown();
    +				try {
    +					failTrigger.await();
    +				} catch (InterruptedException ignored) {}
    +
    +				first.fail(new Exception("intended test failure 1"));
    +			}
    +		};
    +
    +		Thread failure2 = new Thread() {
    +			@Override
    +			public void run() {
    +				readyLatch.countDown();
    +				try {
    +					failTrigger.await();
    +				} catch (InterruptedException ignored) {}
    +
    +				last.fail(new Exception("intended test failure 2"));
    +			}
    +		};
    +
    +		// make sure both threads start simultaneously
    +		failure1.start();
    +		failure2.start();
    +		readyLatch.await();
    +		failTrigger.trigger();
    +
    +		waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
    +		completeCancellingForAllVertices(eg);
    --- End diff --
    
    True, those docs are copy/paste wrong ;-) I fixed them...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---