You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/07/18 08:02:21 UTC

[flink] branch release-1.9 updated: [FLINK-13256] Ensure periodical checkpointing continues when regional failover aborts pending checkpoints

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new b7bfafc  [FLINK-13256] Ensure periodical checkpointing continues when regional failover aborts pending checkpoints
b7bfafc is described below

commit b7bfafca14cd6995a6a59d68d14b4bc3cf91cfb5
Author: Yun Tang <my...@live.com>
AuthorDate: Thu Jul 18 09:58:21 2019 +0200

    [FLINK-13256] Ensure periodical checkpointing continues when regional failover aborts pending checkpoints
    
    This closes #9128.
    
    (cherry picked from commit 1ec34249a0303ae64d049d177057ef9b6c413ab5)
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 139 ++++++++-------
 .../executiongraph/failover/FailoverRegion.java    |   2 +-
 .../FailoverStrategyCheckpointCoordinatorTest.java | 186 +++++++++++++++++++++
 3 files changed, 264 insertions(+), 63 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3dc5c1d..682685c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -478,32 +478,9 @@ public class CheckpointCoordinator {
 					throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
 				}
 
-				// if too many checkpoints are currently in progress, we need to mark that a request is queued
-				if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
-					triggerRequestQueued = true;
-					if (currentPeriodicTrigger != null) {
-						currentPeriodicTrigger.cancel(false);
-						currentPeriodicTrigger = null;
-					}
-					throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
-				}
+				checkConcurrentCheckpoints();
 
-				// make sure the minimum interval between checkpoints has passed
-				final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
-				final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
-
-				if (durationTillNextMillis > 0) {
-					if (currentPeriodicTrigger != null) {
-						currentPeriodicTrigger.cancel(false);
-						currentPeriodicTrigger = null;
-					}
-					// Reassign the new trigger to the currentPeriodicTrigger
-					currentPeriodicTrigger = timer.scheduleAtFixedRate(
-							new ScheduledTrigger(),
-							durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
-
-					throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
-				}
+				checkMinPauseBetweenCheckpoints();
 			}
 		}
 
@@ -623,32 +600,9 @@ public class CheckpointCoordinator {
 							throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
 						}
 
-						if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
-							triggerRequestQueued = true;
-							if (currentPeriodicTrigger != null) {
-								currentPeriodicTrigger.cancel(false);
-								currentPeriodicTrigger = null;
-							}
-							throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
-						}
-
-						// make sure the minimum interval between checkpoints has passed
-						final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
-						final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
-
-						if (durationTillNextMillis > 0) {
-							if (currentPeriodicTrigger != null) {
-								currentPeriodicTrigger.cancel(false);
-								currentPeriodicTrigger = null;
-							}
+						checkConcurrentCheckpoints();
 
-							// Reassign the new trigger to the currentPeriodicTrigger
-							currentPeriodicTrigger = timer.scheduleAtFixedRate(
-									new ScheduledTrigger(),
-									durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
-
-							throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
-						}
+						checkMinPauseBetweenCheckpoints();
 					}
 
 					LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
@@ -1025,9 +979,7 @@ public class CheckpointCoordinator {
 				if (currentPeriodicTrigger != null) {
 					currentPeriodicTrigger.cancel(false);
 				}
-				currentPeriodicTrigger = timer.scheduleAtFixedRate(
-						new ScheduledTrigger(),
-						0L, baseInterval, TimeUnit.MILLISECONDS);
+				currentPeriodicTrigger = scheduleTriggerWithDelay(0L);
 			}
 			else {
 				timer.execute(new ScheduledTrigger());
@@ -1224,6 +1176,11 @@ public class CheckpointCoordinator {
 		return checkpointTimeout;
 	}
 
+	@VisibleForTesting
+	boolean isCurrentPeriodicTriggerAvailable() {
+		return currentPeriodicTrigger != null;
+	}
+
 	/**
 	 * Returns whether periodic checkpointing has been configured.
 	 *
@@ -1247,10 +1204,7 @@ public class CheckpointCoordinator {
 			stopCheckpointScheduler();
 
 			periodicScheduling = true;
-			long initialDelay = ThreadLocalRandom.current().nextLong(
-				minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
-			currentPeriodicTrigger = timer.scheduleAtFixedRate(
-					new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
+			currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
 		}
 	}
 
@@ -1284,6 +1238,54 @@ public class CheckpointCoordinator {
 		}
 	}
 
+	/**
+	 * If too many checkpoints are currently in progress, we need to mark that a request is queued
+	 *
+	 * @throws CheckpointException If too many checkpoints are currently in progress.
+	 */
+	private void checkConcurrentCheckpoints() throws CheckpointException {
+		if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
+			triggerRequestQueued = true;
+			if (currentPeriodicTrigger != null) {
+				currentPeriodicTrigger.cancel(false);
+				currentPeriodicTrigger = null;
+			}
+			throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
+		}
+	}
+
+	/**
+	 * Make sure the minimum interval between checkpoints has passed
+	 *
+	 * @throws CheckpointException If the minimum interval between checkpoints has not passed.
+	 */
+	private void checkMinPauseBetweenCheckpoints() throws CheckpointException {
+		final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
+		final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
+
+		if (durationTillNextMillis > 0) {
+			if (currentPeriodicTrigger != null) {
+				currentPeriodicTrigger.cancel(false);
+				currentPeriodicTrigger = null;
+			}
+			// Reassign the new trigger to the currentPeriodicTrigger
+			currentPeriodicTrigger = scheduleTriggerWithDelay(durationTillNextMillis);
+
+			throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
+		}
+	}
+
+	private long getRandomInitDelay() {
+		return ThreadLocalRandom.current().nextLong(
+			minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
+	}
+
+	private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
+		return timer.scheduleAtFixedRate(
+			new ScheduledTrigger(),
+			initDelay, baseInterval, TimeUnit.MILLISECONDS);
+	}
+
 	// ------------------------------------------------------------------------
 	//  job status listener that schedules / cancels periodic checkpoints
 	// ------------------------------------------------------------------------
@@ -1391,6 +1393,13 @@ public class CheckpointCoordinator {
 	}
 
 	private void failPendingCheckpoint(
+		final PendingCheckpoint pendingCheckpoint,
+		final CheckpointFailureReason reason) {
+
+		failPendingCheckpoint(pendingCheckpoint, reason, null);
+	}
+
+	private void failPendingCheckpoint(
 			final PendingCheckpoint pendingCheckpoint,
 			final CheckpointFailureReason reason,
 			final Throwable cause) {
@@ -1398,12 +1407,18 @@ public class CheckpointCoordinator {
 		CheckpointException exception = new CheckpointException(reason, cause);
 		pendingCheckpoint.abort(reason, cause);
 		failureManager.handleCheckpointException(exception, pendingCheckpoint.getCheckpointId());
-	}
 
-	private void failPendingCheckpoint(
-			final PendingCheckpoint pendingCheckpoint,
-			final CheckpointFailureReason reason) {
+		if (!shutdown && periodicScheduling && currentPeriodicTrigger == null) {
+			synchronized (lock) {
+				if (pendingCheckpoints.isEmpty() || allPendingCheckpointsDiscarded()) {
+					triggerRequestQueued = false;
+					currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
+				}
+			}
+		}
+	}
 
-		failPendingCheckpoint(pendingCheckpoint, reason, null);
+	private boolean allPendingCheckpointsDiscarded() {
+		return pendingCheckpoints.values().stream().allMatch(PendingCheckpoint::isDiscarded);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 2c0c148..9a93bb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -209,7 +209,7 @@ public class FailoverRegion {
 			if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
 				// if we have checkpointed state, reload it into the executions
 				if (executionGraph.getCheckpointCoordinator() != null) {
-					// we restart the checkpoint scheduler for
+					// we abort pending checkpoints for
 					// i) enable new checkpoint could be triggered without waiting for last checkpoint expired.
 					// ii) ensure the EXACTLY_ONCE semantics if needed.
 					executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
new file mode 100644
index 0000000..1d77716
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.mock.Whitebox;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.NeverCompleteFuture;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the interaction between the {@link FailoverStrategy} and the {@link CheckpointCoordinator}.
+ */
+public class FailoverStrategyCheckpointCoordinatorTest extends TestLogger {
+	private ManuallyTriggeredScheduledExecutor manualThreadExecutor;
+
+	@Before
+	public void setUp() {
+		manualThreadExecutor = new ManuallyTriggeredScheduledExecutor();
+	}
+
+	/**
+	 * Tests that {@link CheckpointCoordinator#abortPendingCheckpoints(CheckpointException)}
+	 * called by {@link AdaptedRestartPipelinedRegionStrategyNG} or {@link FailoverRegion} could handle
+	 * the {@code currentPeriodicTrigger} null situation well.
+	 */
+	@Test
+	public void testAbortPendingCheckpointsWithTriggerValidation() {
+		final int maxConcurrentCheckpoints = ThreadLocalRandom.current().nextInt(10) + 1;
+		ExecutionVertex executionVertex = mockExecutionVertex();
+		CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
+			Integer.MAX_VALUE,
+			Integer.MAX_VALUE,
+			0,
+			maxConcurrentCheckpoints,
+			CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
+			true,
+			false,
+			0);
+		CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(
+			new JobID(),
+			checkpointCoordinatorConfiguration,
+			new ExecutionVertex[] { executionVertex },
+			new ExecutionVertex[] { executionVertex },
+			new ExecutionVertex[] { executionVertex },
+			new StandaloneCheckpointIDCounter(),
+			new StandaloneCompletedCheckpointStore(1),
+			new MemoryStateBackend(),
+			Executors.directExecutor(),
+			SharedStateRegistry.DEFAULT_FACTORY,
+			mock(CheckpointFailureManager.class));
+
+		// switch current execution's state to running to allow checkpoint could be triggered.
+		mockExecutionRunning(executionVertex);
+
+		// use manual checkpoint timer to trigger period checkpoints as we expect.
+		ManualCheckpointTimer manualCheckpointTimer = new ManualCheckpointTimer(manualThreadExecutor);
+		// set the init delay as 0 to ensure first checkpoint could be triggered once we trigger the manual executor
+		// this is used to avoid the randomness of when to trigger the first checkpoint (introduced via FLINK-9352)
+		manualCheckpointTimer.setManualDelay(0L);
+		Whitebox.setInternalState(checkpointCoordinator, "timer", manualCheckpointTimer);
+
+		checkpointCoordinator.startCheckpointScheduler();
+		assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+		manualThreadExecutor.triggerAll();
+		manualThreadExecutor.triggerScheduledTasks();
+		assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+		for (int i = 1; i < maxConcurrentCheckpoints; i++) {
+			checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
+			assertEquals(i + 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+			assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+		}
+
+		// as we only support limited concurrent checkpoints, after checkpoint triggered more than the limits,
+		// the currentPeriodicTrigger would been assigned as null.
+		checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
+		assertFalse(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+		assertEquals(maxConcurrentCheckpoints, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+		checkpointCoordinator.abortPendingCheckpoints(
+			new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
+		// after aborting checkpoints, we ensure currentPeriodicTrigger still available.
+		assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
+		assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
+	}
+
+	private ExecutionVertex mockExecutionVertex() {
+		ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
+		ExecutionVertex executionVertex = mock(ExecutionVertex.class);
+		Execution execution = Mockito.mock(Execution.class);
+		when(execution.getAttemptId()).thenReturn(executionAttemptID);
+		when(executionVertex.getCurrentExecutionAttempt()).thenReturn(execution);
+		return executionVertex;
+	}
+
+	private void mockExecutionRunning(ExecutionVertex executionVertex) {
+		when(executionVertex.getCurrentExecutionAttempt().getState()).thenReturn(ExecutionState.RUNNING);
+	}
+
+	public static class ManualCheckpointTimer extends ScheduledThreadPoolExecutor {
+		private final ScheduledExecutor scheduledExecutor;
+		private long manualDelay = 0;
+
+		ManualCheckpointTimer(final ScheduledExecutor scheduledExecutor) {
+			super(0);
+			this.scheduledExecutor = scheduledExecutor;
+		}
+
+		void setManualDelay(long manualDelay) {
+			this.manualDelay = manualDelay;
+		}
+
+		@Override
+		public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+			// used as checkpoint canceller, as we don't want pending checkpoint cancelled, this should never be scheduled.
+			return new NeverCompleteFuture(delay);
+		}
+
+		@Override
+		public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+			// used to schedule periodic checkpoints.
+			// this would use configured 'manualDelay' to let the task schedule with the wanted delay.
+			return scheduledExecutor.scheduleWithFixedDelay(command, manualDelay, period, unit);
+		}
+
+		@Override
+		public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void execute(Runnable command) {
+			scheduledExecutor.execute(command);
+		}
+	}
+}