You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/02 14:08:17 UTC

[1/2] flink git commit: [FLINK-7960] [tests] Fix race conditions in ExecutionGraphRestartTest#completeCancellingForAllVertices

Repository: flink
Updated Branches:
  refs/heads/master 1ac3e05f9 -> 8198967ea


[FLINK-7960] [tests] Fix race conditions in ExecutionGraphRestartTest#completeCancellingForAllVertices

One race condition is between waitUntilJobStatus(eg, JobStatus.FAILING, 1000) and the
subsequent completeCancellingForAllVertices where not all execution are in state
CANCELLING.

The other race condition is between completeCancellingForAllVertices and the fixed
delay restart without a delay. The problem is that the 10th task could have failed.
In order to restart we would have to complete the cancel for the first 9 tasks. This
is enough for the restart strategy to restart the job. If this happens before
completeCancellingForAllVertices has also cancelled the execution of the 10th task,
it could happen that we cancel a fresh execution.

[hotfix] Make WaitForTasks using an AtomicInteger

[hotfix] Set optCancelCondition to Optional.empty() in SimpleAckingTaskManagerGateway

Add assertion message to ExecutionGraphTestUtils#switchToRunning

This closes #4933.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9b475f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9b475f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9b475f5

Branch: refs/heads/master
Commit: f9b475f5528ed6a85e27ad167086f98c1121ae20
Parents: 1ac3e05
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 1 16:53:14 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 2 15:01:37 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |  2 +-
 .../ExecutionGraphRestartTest.java              | 21 +++++++++++++++-----
 .../ExecutionGraphSuspendTest.java              |  3 ++-
 .../executiongraph/ExecutionGraphTestUtils.java |  3 ++-
 .../ExecutionGraphVariousFailuesTest.java       |  3 ++-
 .../ExecutionVertexCancelTest.java              |  3 ++-
 .../executiongraph/GlobalModVersionTest.java    |  3 ++-
 .../PipelinedRegionFailoverConcurrencyTest.java |  3 ++-
 .../utils/SimpleAckingTaskManagerGateway.java   |  8 ++++++++
 9 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 939c290..c1f423b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -844,7 +844,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				// failing in the meantime may happen and is no problem.
 				// anything else is a serious problem !!!
 				if (current != FAILED) {
-					String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
+					String message = String.format("Asynchronous race: Found %s in state %s after successful cancel call.", vertex.getTaskNameWithSubtaskIndex(), state);
 					LOG.error(message);
 					vertex.getExecutionGraph().failGlobal(new Exception(message));
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index acf854f..8770b06 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -76,6 +76,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
@@ -589,15 +590,19 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	public void testConcurrentLocalFailAndRestart() throws Exception {
 		final int parallelism = 10;
 		SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
+		final OneShotLatch restartLatch = new OneShotLatch();
+		final TriggeredRestartStrategy triggeredRestartStrategy = new TriggeredRestartStrategy(restartLatch);
 
 		final ExecutionGraph eg = createSimpleTestGraph(
 			new JobID(),
 			taskManagerGateway,
-			new FixedDelayRestartStrategy(10, 0L),
+			triggeredRestartStrategy,
 			createNoOpVertex(parallelism));
 
 		WaitForTasks waitForTasks = new WaitForTasks(parallelism);
+		WaitForTasks waitForTasksCancelled = new WaitForTasks(parallelism);
 		taskManagerGateway.setCondition(waitForTasks);
+		taskManagerGateway.setCancelCondition(waitForTasksCancelled);
 
 		eg.setScheduleMode(ScheduleMode.EAGER);
 		eg.scheduleForExecution();
@@ -648,8 +653,15 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism);
 		taskManagerGateway.setCondition(waitForTasksAfterRestart);
 
+		waitForTasksCancelled.getFuture().get(1000L, TimeUnit.MILLISECONDS);
+
 		completeCancellingForAllVertices(eg);
 
+		// block the restart until we have completed for all vertices the cancellation
+		// otherwise it might happen that the last vertex which failed will have a new
+		// execution set due to restart which is wrongly canceled
+		restartLatch.trigger();
+
 		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
 
 		waitForTasksAfterRestart.getFuture().get(1000, TimeUnit.MILLISECONDS);
@@ -1048,11 +1060,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		private final int tasksToWaitFor;
 		private final CompletableFuture<Boolean> allTasksReceived;
-		private int counter;
+		private final AtomicInteger counter;
 
 		public WaitForTasks(int tasksToWaitFor) {
 			this.tasksToWaitFor = tasksToWaitFor;
 			this.allTasksReceived = new CompletableFuture<>();
+			this.counter = new AtomicInteger();
 		}
 
 		public CompletableFuture<Boolean> getFuture() {
@@ -1061,9 +1074,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		@Override
 		public void accept(ExecutionAttemptID executionAttemptID) {
-			counter++;
-
-			if (counter >= tasksToWaitFor) {
+			if (counter.incrementAndGet() >= tasksToWaitFor) {
 				allTasksReceived.complete(true);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index b3a8c33..f0adc32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -45,7 +46,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 /**
  * Validates that suspending out of various states works correctly.
  */
-public class ExecutionGraphSuspendTest {
+public class ExecutionGraphSuspendTest extends TestLogger {
 
 	/**
 	 * Going into SUSPENDED out of CREATED should immediately cancel everything and

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 017e85f..b534ade 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -202,7 +202,8 @@ public class ExecutionGraphTestUtils {
 		// check that all execution are in state DEPLOYING
 		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
 			final Execution exec = ev.getCurrentExecutionAttempt();
-			assert(exec.getState() == ExecutionState.DEPLOYING);
+			final ExecutionState executionState = exec.getState();
+			assert executionState == ExecutionState.DEPLOYING : "Expected executionState to be DEPLOYING, was: " + executionState;
 		}
 
 		// switch executions to RUNNING

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
index 0797ef9..e23d495 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrat
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -30,7 +31,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 
-public class ExecutionGraphVariousFailuesTest {
+public class ExecutionGraphVariousFailuesTest extends TestLogger {
 
 	/**
 	 * Test that failing in state restarting will retrigger the restarting logic. This means that

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index fe6c3cd..44e1794 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -41,13 +41,14 @@ import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import scala.concurrent.ExecutionContext;
 
 @SuppressWarnings("serial")
-public class ExecutionVertexCancelTest {
+public class ExecutionVertexCancelTest extends TestLogger {
 
 	// --------------------------------------------------------------------------------------------
 	//  Canceling in different states

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
index d8f0309..534c33d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/GlobalModVersionTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -42,7 +43,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-public class GlobalModVersionTest {
+public class GlobalModVersionTest extends TestLogger {
 
 	/**
 	 * Tests that failures during a global cancellation are not handed to the local

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
index ac34c62..124a5b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PipelinedRegionFailoverConcurrencyTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -53,7 +54,7 @@ import static org.junit.Assert.assertTrue;
  * <p>This test must be in the package it resides in, because it uses package-private methods
  * from the ExecutionGraph classes.
  */
-public class PipelinedRegionFailoverConcurrencyTest {
+public class PipelinedRegionFailoverConcurrencyTest extends TestLogger {
 
 	/**
 	 * Tests that a cancellation concurrent to a local failover leads to a properly

http://git-wip-us.apache.org/repos/asf/flink/blob/f9b475f5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index 501bedd..682705a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -48,14 +48,21 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 
 	private Optional<Consumer<ExecutionAttemptID>> optSubmitCondition;
 
+	private Optional<Consumer<ExecutionAttemptID>> optCancelCondition;
+
 	public SimpleAckingTaskManagerGateway() {
 		optSubmitCondition = Optional.empty();
+		optCancelCondition = Optional.empty();
 	}
 
 	public void setCondition(Consumer<ExecutionAttemptID> predicate) {
 		optSubmitCondition = Optional.of(predicate);
 	}
 
+	public void setCancelCondition(Consumer<ExecutionAttemptID> predicate) {
+		optCancelCondition = Optional.of(predicate);
+	}
+
 	@Override
 	public String getAddress() {
 		return address;
@@ -96,6 +103,7 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {
 
 	@Override
 	public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
+		optCancelCondition.ifPresent(condition -> condition.accept(executionAttemptID));
 		return CompletableFuture.completedFuture(Acknowledge.get());
 	}
 


[2/2] flink git commit: [hotfix] Instantiate a HdfsConfiguration in HadoopUtils#getHadoopConfiguration()

Posted by tr...@apache.org.
[hotfix] Instantiate a HdfsConfiguration in HadoopUtils#getHadoopConfiguration()

Instantiate a HdfsConfiguration as the base configuration when calling
HadoopUtils#getHadoopConfiguration() to load the hdfs-site.xml and
hdfs-default.xml from the classpath before trying to load an explicitly
specified configuration.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8198967e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8198967e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8198967e

Branch: refs/heads/master
Commit: 8198967eaa6cd8e2a690dd1be31721b25e116d9b
Parents: f9b475f
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Nov 2 15:04:26 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 2 15:04:26 2017 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/runtime/util/HadoopUtils.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8198967e/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index 8bfcb5c..5b14f43 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.util;
 import org.apache.flink.configuration.ConfigConstants;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,7 +44,9 @@ public class HadoopUtils {
 
 	public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) {
 
-		Configuration result = new Configuration();
+		// Instantiate a HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
+		// from the classpath
+		Configuration result = new HdfsConfiguration();
 		boolean foundHadoopConfiguration = false;
 
 		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and