You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/08/08 12:11:48 UTC

[flink] branch master updated: [FLINK-13581][coordination][tests] Harden BatchFineGrainedRecoveryITCase

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c24b9a3  [FLINK-13581][coordination][tests] Harden BatchFineGrainedRecoveryITCase
c24b9a3 is described below

commit c24b9a371658c2829f0ff1cedb9876592dc15b8d
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Tue Aug 6 17:33:27 2019 +0300

    [FLINK-13581][coordination][tests] Harden BatchFineGrainedRecoveryITCase
    
    If counting of mapper restarts in BatchFineGrainedRecoveryITCase is based on the open method of user function,
    the fact of the restart depends on internal implementation of the local Task and whether the open method is eventually called.
    
    If execution attempt numbers are used instead, the test behaviour is more stable because it depends only on coordination.
    The execution attempt numbers can be queried from the REST client of the testing mini cluster.
    
    This closes #9374.
---
 .../rest/messages/JobVertexDetailsInfo.java        |  12 ++
 .../recovery/BatchFineGrainedRecoveryITCase.java   | 207 ++++++++++++++++++---
 2 files changed, 189 insertions(+), 30 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfo.java
index 3ad1e41..89203df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfo.java
@@ -26,10 +26,12 @@ import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
 import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -75,6 +77,11 @@ public class JobVertexDetailsInfo implements ResponseBody {
 		this.subtasks = checkNotNull(subtasks);
 	}
 
+	@JsonIgnore
+	public List<VertexTaskDetail> getSubtasks() {
+		return Collections.unmodifiableList(subtasks);
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -164,6 +171,11 @@ public class JobVertexDetailsInfo implements ResponseBody {
 			this.metrics = checkNotNull(metrics);
 		}
 
+		@JsonIgnore
+		public int getAttempt() {
+			return attempt;
+		}
+
 		@Override
 		public boolean equals(Object o) {
 			if (this == o) {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
index 9e90c20..358629f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java
@@ -19,21 +19,46 @@
 package org.apache.flink.test.recovery;
 
 import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview;
+import org.apache.flink.runtime.messages.webmonitor.JobIdsWithStatusOverview.JobIdWithStatus;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.minicluster.TestingMiniCluster;
 import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration.Builder;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.RestClientConfiguration;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.JobVertexDetailsInfo;
+import org.apache.flink.runtime.rest.messages.JobVertexDetailsInfo.VertexTaskDetail;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -44,21 +69,28 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
-import static org.apache.flink.configuration.NettyShuffleEnvironmentOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION;
 import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
@@ -94,6 +126,10 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 	private static final int EMITTED_RECORD_NUMBER = 1000;
 	private static final int MAP_NUMBER = 3;
 
+	private static final String MAP_PARTITION_TEST_PARTITION_MAPPER = "MapPartition (Test partition mapper ";
+	private static final Pattern MAPPER_NUMBER_IN_TASK_NAME_PATTERN =
+		Pattern.compile("MapPartition \\(Test partition mapper (\\d+)\\)");
+
 	/**
 	 * Number of job failures for all mappers due to backtracking when the produced partitions get lost.
 	 *
@@ -110,13 +146,20 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 	private static final int MAX_JOB_RESTART_ATTEMPTS = ALL_MAPPERS_BACKTRACK_FAILURES + 2 * MAP_NUMBER;
 
 	/**
-	 * Expected restart number for each mapper.
-	 *
-	 * <p>Initial start plus exception failure plus restarts from this and each subsequent mapper's TM failure.
+	 * Expected attempt number for each mapper.
 	 */
-	private static final int[] EXPECTED_MAP_RESTARTS = IntStream
+	private static final int[] EXPECTED_MAP_ATTEMPT_NUMBERS = IntStream
 		.range(0, MAP_NUMBER)
-		.map(i -> 1 + 1 + MAP_NUMBER - i)
+		.map(i ->
+			// exception failure:
+				1 + // this mapper
+				i + // previous mappers
+			// TM failure:
+				(MAP_NUMBER - i - 1) + // subsequent mappers after PartitionNotFoundException
+				1 + // this mapper
+				1 + // this mapper after PartitionNotFoundException
+				i + // previous mappers
+				i) // previous mappers after PartitionNotFoundException
 		.toArray();
 
 	private static final String TASK_NAME_PREFIX = "Test partition mapper ";
@@ -126,16 +169,18 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 
 	private static TestingMiniCluster miniCluster;
 
+	private static MiniClusterClient client;
+
 	private static AtomicInteger lastTaskManagerIndexInMiniCluster;
 
 	private static final Random rnd = new Random();
 
 	private static GlobalMapFailureTracker failureTracker;
 
+	@SuppressWarnings("OverlyBroadThrowsClause")
 	@Before
 	public void setup() throws Exception {
 		Configuration configuration = new Configuration();
-		configuration.setBoolean(FORCE_PARTITION_RELEASE_ON_CONSUMPTION, false);
 		configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, PIPELINED_REGION_RESTART_STRATEGY_NAME);
 
 		miniCluster = new TestingMiniCluster(
@@ -148,6 +193,8 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 			null);
 		miniCluster.start();
 
+		client = new MiniClusterClient(miniCluster);
+
 		lastTaskManagerIndexInMiniCluster = new AtomicInteger(0);
 
 		failureTracker = new GlobalMapFailureTracker(MAP_NUMBER);
@@ -158,6 +205,10 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 		if (miniCluster != null) {
 			miniCluster.close();
 		}
+
+		if (client != null) {
+			client.close();
+		}
 	}
 
 	@Test
@@ -172,7 +223,7 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 		}
 
 		assertThat(input.collect(), is(EXPECTED_JOB_OUTPUT));
-		failureTracker.verify();
+		failureTracker.verify(getMapperAttempts());
 	}
 
 	private static FailureStrategy createFailureStrategy(int trackingIndex) {
@@ -207,6 +258,26 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 		}
 	}
 
+	private static int[] getMapperAttempts() {
+		int[] attempts = new int[MAP_NUMBER];
+		//noinspection StaticVariableUsedBeforeInitialization
+		client
+			.getInternalTaskInfos()
+			.stream()
+			.filter(t -> t.name.startsWith(MAP_PARTITION_TEST_PARTITION_MAPPER))
+			.forEach(t -> attempts[parseMapperNumberFromTaskName(t.name)] = t.attempt);
+		return attempts;
+	}
+
+	private static int parseMapperNumberFromTaskName(String name) {
+		Matcher m = MAPPER_NUMBER_IN_TASK_NAME_PATTERN.matcher(name);
+		if (m.matches()) {
+			return Integer.parseInt(m.group(1));
+		} else {
+			throw new FlinkRuntimeException("Failed to find mapper number in its task name: " + name);
+		}
+	}
+
 	@FunctionalInterface
 	private interface FailureStrategy extends Serializable {
 		/**
@@ -381,7 +452,6 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 	}
 
 	private static class GlobalMapFailureTracker {
-		private final List<AtomicInteger> mapRestarts;
 		private final List<Set<FailureStrategy>> mapFailures;
 
 		private final Object classLock = new Object();
@@ -389,15 +459,13 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 		private Throwable unexpectedFailure;
 
 		private GlobalMapFailureTracker(int numberOfMappers) {
-			mapRestarts = new ArrayList<>(numberOfMappers);
 			mapFailures = new ArrayList<>(numberOfMappers);
 			IntStream.range(0, numberOfMappers).forEach(i -> addNewMapper());
 		}
 
 		private int addNewMapper() {
-			mapRestarts.add(new AtomicInteger(0));
 			mapFailures.add(new HashSet<>(2));
-			return mapRestarts.size() - 1;
+			return mapFailures.size() - 1;
 		}
 
 		private boolean failOrNot(int index, FailureStrategy failureStrategy) throws Exception {
@@ -416,23 +484,19 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 			return failedNow;
 		}
 
-		private void mapRestart(int index) {
-			mapRestarts.get(index).incrementAndGet();
-		}
-
 		private void unrelatedFailure(Throwable failure) {
 			synchronized (classLock) {
 				unexpectedFailure = ExceptionUtils.firstOrSuppressed(failure, unexpectedFailure);
 			}
 		}
 
-		private void verify() {
+		private void verify(int[] mapAttemptNumbers) {
 			synchronized (classLock) {
 				if (unexpectedFailure != null) {
 					throw new AssertionError("Test failed due to unexpected exception.", unexpectedFailure);
 				}
 			}
-			assertThat(mapRestarts.stream().mapToInt(AtomicInteger::get).toArray(), is(EXPECTED_MAP_RESTARTS));
+			assertThat(mapAttemptNumbers, is(EXPECTED_MAP_ATTEMPT_NUMBERS));
 		}
 	}
 
@@ -448,18 +512,6 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 		}
 
 		@Override
-		public void open(Configuration parameters) throws Exception {
-			//noinspection OverlyBroadCatchBlock
-			try {
-				super.open(parameters);
-				failureTracker.mapRestart(trackingIndex);
-			} catch (Throwable t) {
-				failureTracker.unrelatedFailure(t);
-				throw t;
-			}
-		}
-
-		@Override
 		public void mapPartition(Iterable<Long> values, Collector<Long> out) throws Exception {
 			for (Long value : values) {
 				failureStrategy.failOrNot(trackingIndex);
@@ -467,4 +519,99 @@ public class BatchFineGrainedRecoveryITCase extends TestLogger {
 			}
 		}
 	}
+
+	private static class MiniClusterClient implements AutoCloseable {
+		private final RestClient restClient;
+		private final ExecutorService executorService;
+		private final URI restAddress;
+
+		private MiniClusterClient(TestingMiniCluster miniCluster) throws ConfigurationException {
+			restAddress = miniCluster.getRestAddress().join();
+			executorService = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-RestClient-IO"));
+			restClient = createRestClient();
+		}
+
+		private RestClient createRestClient() throws ConfigurationException {
+			return new RestClient(
+				RestClientConfiguration.fromConfiguration(new UnmodifiableConfiguration(new Configuration())),
+				executorService);
+		}
+
+		private List<InternalTaskInfo> getInternalTaskInfos() {
+			return getJobs()
+				.stream()
+				.flatMap(jobId -> getJobDetails(jobId).join()
+					.getJobVertexInfos()
+					.stream()
+					.map(info -> Tuple2.of(jobId, info)))
+				.flatMap(vertexInfoWithJobId ->
+					getJobVertexDetailsInfo(vertexInfoWithJobId.f0, vertexInfoWithJobId.f1.getJobVertexID())
+						.getSubtasks()
+						.stream()
+						.map(subtask -> new InternalTaskInfo(vertexInfoWithJobId.f1.getName(), subtask)))
+				.collect(Collectors.toList());
+		}
+
+		private Collection<JobID> getJobs() {
+			JobIdsWithStatusOverview jobIds =
+				sendRequest(
+					JobIdsWithStatusesOverviewHeaders.getInstance(),
+					EmptyMessageParameters.getInstance())
+				.join();
+			return jobIds
+				.getJobsWithStatus()
+				.stream()
+				.map(JobIdWithStatus::getJobId)
+				.collect(Collectors.toList());
+		}
+
+		private CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobId) {
+			JobMessageParameters params = new JobMessageParameters();
+			params.jobPathParameter.resolve(jobId);
+			return sendRequest(JobDetailsHeaders.getInstance(), params);
+		}
+
+		private JobVertexDetailsInfo getJobVertexDetailsInfo(JobID jobId, JobVertexID jobVertexID) {
+			JobVertexDetailsHeaders detailsHeaders = JobVertexDetailsHeaders.getInstance();
+			JobVertexMessageParameters params = new JobVertexMessageParameters();
+			params.jobPathParameter.resolve(jobId);
+			params.jobVertexIdPathParameter.resolve(jobVertexID);
+			return sendRequest(detailsHeaders, params).join();
+		}
+
+		private <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P>
+				sendRequest(M messageHeaders, U messageParameters) {
+			try {
+				return restClient.sendRequest(
+					restAddress.getHost(),
+					restAddress.getPort(),
+					messageHeaders,
+					messageParameters,
+					EmptyRequestBody.getInstance());
+			} catch (IOException e) {
+				return FutureUtils.completedExceptionally(e);
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			restClient.close();
+			executorService.shutdownNow();
+		}
+	}
+
+	private static class InternalTaskInfo {
+		private final String name;
+		private final int attempt;
+
+		private InternalTaskInfo(String name, VertexTaskDetail vertexTaskDetail) {
+			this.name = name;
+			this.attempt = vertexTaskDetail.getAttempt();
+		}
+
+		@Override
+		public String toString() {
+			return name + " (Attempt #" + attempt + ')';
+		}
+	}
 }