You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/20 06:59:41 UTC

[flink] 03/03: [FLINK-8871][checkpoint][tests] Add ITcase for NotifiCheckpointAborted mechanism

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bbf731869adeaf7d7183d6d7299a0d1406afd1bc
Author: Yun Tang <my...@live.com>
AuthorDate: Sun May 17 21:30:30 2020 +0800

    [FLINK-8871][checkpoint][tests] Add ITcase for NotifiCheckpointAborted mechanism
---
 .../runtime/tasks/ExceptionallyDoneFuture.java     |   2 +-
 .../NotifyCheckpointAbortedITCase.java             | 449 +++++++++++++++++++++
 2 files changed, 450 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
index 55bfc18..f95ec5c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/ExceptionallyDoneFuture.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
  *
  * @param <V> type of the RunnableFuture
  */
-class ExceptionallyDoneFuture<V> implements RunnableFuture<V> {
+public class ExceptionallyDoneFuture<V> implements RunnableFuture<V> {
 
 	private final Throwable throwable;
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
new file mode 100644
index 0000000..e7e6e5c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
+import org.apache.flink.runtime.state.BackendBuildingException;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
+import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.tasks.ExceptionallyDoneFuture;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integrated tests to verify the logic to notify checkpoint aborted via RPC message.
+ */
+@RunWith(Parameterized.class)
+public class NotifyCheckpointAbortedITCase extends TestLogger {
+	private static final long DECLINE_CHECKPOINT_ID = 2L;
+	private static final long TEST_TIMEOUT = 60000;
+	private static final String DECLINE_SINK_NAME = "DeclineSink";
+	private static MiniClusterWithClientResource cluster;
+
+	private static Path checkpointPath;
+
+	@Parameterized.Parameter
+	public boolean unalignedCheckpointEnabled;
+
+	@Parameterized.Parameters(name = "unalignedCheckpointEnabled ={0}")
+	public static Collection<Boolean> parameter() {
+		return Arrays.asList(true, false);
+	}
+
+	@ClassRule
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+	@Before
+	public void setup() throws Exception {
+		Configuration configuration = new Configuration();
+		configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
+		configuration.setString(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName());
+
+		checkpointPath = new Path(TEMPORARY_FOLDER.newFolder().toURI());
+		cluster = new MiniClusterWithClientResource(
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(configuration)
+				.setNumberTaskManagers(1)
+				.setNumberSlotsPerTaskManager(1).build());
+		cluster.before();
+
+		NormalMap.reset();
+		DeclineSink.reset();
+		TestingCompletedCheckpointStore.reset();
+	}
+
+	@After
+	public void shutdown() {
+		if (cluster != null) {
+			cluster.after();
+			cluster = null;
+		}
+
+	}
+
+	/**
+	 * Verify operators would be notified as checkpoint aborted.
+	 *
+	 * <p>The job would run with at least two checkpoints. The 1st checkpoint would fail due to add checkpoint to store,
+	 * and the 2nd checkpoint would decline by async checkpoint phase of 'DeclineSink'.
+	 *
+	 * <p>The job graph looks like:
+	 * NormalSource --> keyBy --> NormalMap --> DeclineSink
+	 */
+	@Test(timeout = TEST_TIMEOUT)
+	public void testNotifyCheckpointAborted() throws Exception {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.enableCheckpointing(200, CheckpointingMode.EXACTLY_ONCE);
+		env.getCheckpointConfig().enableUnalignedCheckpoints(unalignedCheckpointEnabled);
+		env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
+		env.disableOperatorChaining();
+		env.setParallelism(1);
+
+		final StateBackend failingStateBackend = new DeclineSinkFailingStateBackend(checkpointPath);
+		env.setStateBackend(failingStateBackend);
+
+		env.addSource(new NormalSource()).name("NormalSource")
+			.keyBy((KeySelector<Tuple2<Integer, Integer>, Integer>) value -> value.f0)
+			.transform("NormalMap", TypeInformation.of(Integer.class), new NormalMap())
+			.transform(DECLINE_SINK_NAME, TypeInformation.of(Object.class), new DeclineSink());
+
+		final ClusterClient<?> clusterClient = cluster.getClusterClient();
+		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+		JobID jobID = jobGraph.getJobID();
+
+		ClientUtils.submitJob(clusterClient, jobGraph);
+
+		TestingCompletedCheckpointStore.addCheckpointLatch.await();
+		TestingCompletedCheckpointStore.abortCheckpointLatch.trigger();
+
+		verifyAllOperatorsNotifyAborted();
+		resetAllOperatorsNotifyAbortedLatches();
+		verifyAllOperatorsNotifyAbortedTimes(1);
+
+		DeclineSink.waitLatch.trigger();
+		verifyAllOperatorsNotifyAborted();
+		verifyAllOperatorsNotifyAbortedTimes(2);
+
+		clusterClient.cancel(jobID).get();
+	}
+
+	private void verifyAllOperatorsNotifyAborted() throws InterruptedException {
+		NormalMap.notifiedAbortedLatch.await();
+		DeclineSink.notifiedAbortedLatch.await();
+	}
+
+	private void resetAllOperatorsNotifyAbortedLatches() {
+		NormalMap.notifiedAbortedLatch.reset();
+		DeclineSink.notifiedAbortedLatch.reset();
+	}
+
+	private void verifyAllOperatorsNotifyAbortedTimes(int expectedTimes) {
+		assertEquals(expectedTimes, NormalMap.notifiedAbortedTimes.get());
+		assertEquals(expectedTimes, DeclineSink.notifiedAbortedTimes.get());
+	}
+
+	/**
+	 * Normal source function.
+	 */
+	private static class NormalSource implements SourceFunction<Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+		protected volatile boolean running;
+
+		NormalSource() {
+			this.running = true;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+			while (running) {
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(Tuple2.of(ThreadLocalRandom.current().nextInt(), ThreadLocalRandom.current().nextInt()));
+				}
+				Thread.sleep(10);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			this.running = false;
+		}
+	}
+
+	private static class NormalMap extends StreamMap<Tuple2<Integer, Integer>, Integer> {
+		private static final long serialVersionUID = 1L;
+		private static final OneShotLatch notifiedAbortedLatch = new OneShotLatch();
+		private static final AtomicInteger notifiedAbortedTimes = new AtomicInteger(0);
+
+		public NormalMap() {
+			super(new NormalMapFunction());
+		}
+
+		@Override
+		public void notifyCheckpointAborted(long checkpointId) {
+			notifiedAbortedTimes.incrementAndGet();
+			notifiedAbortedLatch.trigger();
+		}
+
+		static void reset() {
+			notifiedAbortedLatch.reset();
+			notifiedAbortedTimes.set(0);
+		}
+	}
+
+	/**
+	 * Normal map function.
+	 */
+	private static class NormalMapFunction implements MapFunction<Tuple2<Integer, Integer>, Integer>, CheckpointedFunction {
+		private static final long serialVersionUID = 1L;
+		private ValueState<Integer> valueState;
+
+		@Override
+		public Integer map(Tuple2<Integer, Integer> value) throws Exception {
+			valueState.update(value.f1);
+			return value.f1;
+		}
+
+		@Override
+		public void snapshotState(FunctionSnapshotContext context) {
+		}
+
+		@Override
+		public void initializeState(FunctionInitializationContext context) throws Exception {
+			valueState = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("value", Integer.class));
+		}
+	}
+
+	/**
+	 * A decline sink.
+	 */
+	private static class DeclineSink extends StreamSink<Integer> {
+		private static final long serialVersionUID = 1L;
+		private static final OneShotLatch notifiedAbortedLatch = new OneShotLatch();
+		private static final OneShotLatch waitLatch = new OneShotLatch();
+		private static final AtomicInteger notifiedAbortedTimes = new AtomicInteger(0);
+
+		public DeclineSink() {
+			super(new SinkFunction<Integer>() {
+				private static final long serialVersionUID = 1L;
+			});
+		}
+
+		@Override
+		public void snapshotState(StateSnapshotContext context) throws Exception {
+			if (context.getCheckpointId() == DECLINE_CHECKPOINT_ID) {
+				DeclineSink.waitLatch.await();
+			}
+			super.snapshotState(context);
+		}
+
+		@Override
+		public void notifyCheckpointAborted(long checkpointId) {
+			notifiedAbortedTimes.incrementAndGet();
+			notifiedAbortedLatch.trigger();
+		}
+
+		static void reset() {
+			notifiedAbortedLatch.reset();
+			waitLatch.reset();
+			notifiedAbortedTimes.set(0);
+		}
+
+	}
+
+	/**
+	 * The snapshot strategy to create failing runnable future at the checkpoint to decline.
+	 */
+	private static class DeclineSinkFailingSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {
+
+		protected DeclineSinkFailingSnapshotStrategy() {
+			super("StuckAsyncSnapshotStrategy");
+		}
+
+		@Override
+		public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
+			long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) {
+			if (checkpointId == DECLINE_CHECKPOINT_ID) {
+				return ExceptionallyDoneFuture.of(new ExpectedTestException());
+			} else {
+				return DoneFuture.of(SnapshotResult.empty());
+			}
+		}
+	}
+
+	/**
+	 * The operator statebackend to create {@link DeclineSinkFailingSnapshotStrategy} at {@link DeclineSink}.
+	 */
+	private static class DeclineSinkFailingOperatorStateBackend extends DefaultOperatorStateBackend {
+
+		public DeclineSinkFailingOperatorStateBackend(
+			ExecutionConfig executionConfig,
+			CloseableRegistry closeStreamOnCancelRegistry,
+			AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy) {
+			super(executionConfig,
+				closeStreamOnCancelRegistry,
+				new HashMap<>(),
+				new HashMap<>(),
+				new HashMap<>(),
+				new HashMap<>(),
+				snapshotStrategy);
+		}
+	}
+
+	/**
+	 * The state backend to create {@link DeclineSinkFailingOperatorStateBackend} at {@link DeclineSink}.
+	 */
+	private static class DeclineSinkFailingStateBackend extends FsStateBackend {
+		private static final long serialVersionUID = 1L;
+
+		public DeclineSinkFailingStateBackend(Path checkpointDataUri) {
+			super(checkpointDataUri);
+		}
+
+		@Override
+		public DeclineSinkFailingStateBackend configure(ReadableConfig config, ClassLoader classLoader) {
+			return new DeclineSinkFailingStateBackend(checkpointPath);
+		}
+
+		@Override
+		public OperatorStateBackend createOperatorStateBackend(
+			Environment env,
+			String operatorIdentifier,
+			@Nonnull Collection<OperatorStateHandle> stateHandles,
+			CloseableRegistry cancelStreamRegistry) throws BackendBuildingException {
+			if (operatorIdentifier.contains(DECLINE_SINK_NAME)) {
+				return new DeclineSinkFailingOperatorStateBackend(
+					env.getExecutionConfig(),
+					cancelStreamRegistry,
+					new DeclineSinkFailingSnapshotStrategy());
+			} else {
+				return new DefaultOperatorStateBackendBuilder(
+					env.getUserClassLoader(),
+					env.getExecutionConfig(),
+					false,
+					stateHandles,
+					cancelStreamRegistry).build();
+			}
+		}
+	}
+
+	private static class TestingHaServices extends EmbeddedHaServices {
+		private final CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+		TestingHaServices(CheckpointRecoveryFactory checkpointRecoveryFactory, Executor executor) {
+			super(executor);
+			this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+		}
+
+		@Override
+		public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+			return checkpointRecoveryFactory;
+		}
+	}
+
+	/**
+	 * An extension of {@link StandaloneCompletedCheckpointStore}.
+	 */
+	private static class TestingCompletedCheckpointStore extends StandaloneCompletedCheckpointStore {
+		private static final OneShotLatch addCheckpointLatch = new OneShotLatch();
+		private static final OneShotLatch abortCheckpointLatch = new OneShotLatch();
+
+		TestingCompletedCheckpointStore() {
+			super(1);
+		}
+
+		@Override
+		public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
+			if (abortCheckpointLatch.isTriggered()) {
+				super.addCheckpoint(checkpoint);
+			} else {
+				// tell main thread that all checkpoints on task side have been finished.
+				addCheckpointLatch.trigger();
+				// wait for the main thread to throw exception so that the checkpoint would be notified as aborted.
+				abortCheckpointLatch.await();
+				throw new ExpectedTestException();
+			}
+		}
+
+		static void reset() {
+			addCheckpointLatch.reset();
+			abortCheckpointLatch.reset();
+		}
+	}
+
+	/**
+	 * Testing HA factory which needs to be public in order to be instantiatable.
+	 */
+	public static class TestingHAFactory implements HighAvailabilityServicesFactory {
+
+		@Override
+		public HighAvailabilityServices createHAServices(Configuration configuration, Executor executor) {
+			return new TestingHaServices(
+				new TestingCheckpointRecoveryFactory(new TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
+				executor);
+		}
+	}
+
+}