You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2018/03/07 14:03:22 UTC

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/5654

    [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase

    R: @zentol 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink jira-8487-zookeeper-it-case-release-13

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5654.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5654
    
----
commit 62c6695c61c628ef842c6a29dbb517d71e50ca59
Author: Aljoscha Krettek <al...@...>
Date:   2018-03-03T08:34:56Z

    [FLINK-8487] Verify ZooKeeper checkpoint store behaviour with ITCase

----


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r172972454
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    +
    +		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
    +	}
    +
    +	/**
    +	 * Requests the {@link JobStatus} of the job with the given {@link JobID}.
    +	 */
    +	private Future<JobStatus> getJobStatus(
    +		final ActorGateway jobManager,
    +		final JobID jobId,
    +		final FiniteDuration timeout) {
    +
    +		scala.concurrent.Future<Object> response =
    +			jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
    +
    +		FlinkFuture<Object> flinkFuture = new FlinkFuture<>(response);
    +
    +		return flinkFuture.thenApply(new ApplyFunction<Object, JobStatus>() {
    +			@Override
    +			public JobStatus apply(Object value) {
    +				if (value instanceof JobManagerMessages.CurrentJobStatus) {
    +					return ((JobManagerMessages.CurrentJobStatus) value).status();
    +				} else if (value instanceof JobManagerMessages.JobNotFound) {
    +					throw new RuntimeException(
    +						new IllegalStateException("Could not find job with JobId " + jobId));
    +				} else {
    +					throw new RuntimeException(
    +						new IllegalStateException("Unknown JobManager response of type " + value.getClass()));
    +				}
    +			}
    +		});
    +	}
    +
    +	private static class UnboundedSource implements SourceFunction<String> {
    +		private boolean running = true;
    +
    +		@Override
    +		public void run(SourceContext<String> ctx) throws Exception {
    +			while (running) {
    +				ctx.collect("hello");
    +				// don't overdo it ... ;-)
    +				Thread.sleep(50);
    +				if (CheckpointBlockingFunction.afterMessWithZooKeeper.get()) {
    +					break;
    +				}
    +			}
    +		}
    +
    +		@Override
    +		public void cancel() {
    +			running = false;
    +		}
    +	}
    +
    +	private static class CheckpointBlockingFunction
    +			extends RichMapFunction<String, String>
    +			implements CheckpointedFunction {
    +
    +		// verify that we only call initializeState()
    +		// once with isRestored() == false. All other invocations must have isRestored() == true. This
    +		// verifies that we don't restart a job from scratch in case the CompletedCheckpoints can't
    +		// be read.
    +		static AtomicInteger allowedInitializeCallsWithoutRestore = new AtomicInteger(1);
    +
    +		// we count when we see restores that are not allowed. We only
    +		// allow restores once we messed with the HA directory and moved it back again
    +		static AtomicInteger illegalRestores = new AtomicInteger(0);
    +		static AtomicInteger successfulRestores = new AtomicInteger(0);
    +
    +		// whether we are after the phase where we messed with the ZooKeeper HA directory, i.e.
    +		// whether it's now ok for a restore to happen
    +		static AtomicBoolean afterMessWithZooKeeper = new AtomicBoolean(false);
    --- End diff --
    
    I was initially confused why all of these are Atomic booleans/integers, etc. Could they not be simple volatile variables? Not that the performance matters, but I was just wondering whether something here assumes atomicity.


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r172963487
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    +
    +		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
    +	}
    +
    +	/**
    +	 * Requests the {@link JobStatus} of the job with the given {@link JobID}.
    +	 */
    +	private Future<JobStatus> getJobStatus(
    +		final ActorGateway jobManager,
    +		final JobID jobId,
    +		final FiniteDuration timeout) {
    +
    +		scala.concurrent.Future<Object> response =
    +			jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
    +
    +		FlinkFuture<Object> flinkFuture = new FlinkFuture<>(response);
    +
    +		return flinkFuture.thenApply(new ApplyFunction<Object, JobStatus>() {
    +			@Override
    +			public JobStatus apply(Object value) {
    +				if (value instanceof JobManagerMessages.CurrentJobStatus) {
    +					return ((JobManagerMessages.CurrentJobStatus) value).status();
    +				} else if (value instanceof JobManagerMessages.JobNotFound) {
    +					throw new RuntimeException(
    +						new IllegalStateException("Could not find job with JobId " + jobId));
    +				} else {
    +					throw new RuntimeException(
    +						new IllegalStateException("Unknown JobManager response of type " + value.getClass()));
    +				}
    +			}
    +		});
    +	}
    +
    +	private static class UnboundedSource implements SourceFunction<String> {
    +		private boolean running = true;
    --- End diff --
    
    Must be volatile, to be on the safe side. Otherwise, JIT may choose to inline this...


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r173134402
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    +
    +		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
    +	}
    +
    +	/**
    +	 * Requests the {@link JobStatus} of the job with the given {@link JobID}.
    +	 */
    +	private Future<JobStatus> getJobStatus(
    +		final ActorGateway jobManager,
    +		final JobID jobId,
    +		final FiniteDuration timeout) {
    +
    +		scala.concurrent.Future<Object> response =
    +			jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
    +
    +		FlinkFuture<Object> flinkFuture = new FlinkFuture<>(response);
    +
    +		return flinkFuture.thenApply(new ApplyFunction<Object, JobStatus>() {
    +			@Override
    +			public JobStatus apply(Object value) {
    +				if (value instanceof JobManagerMessages.CurrentJobStatus) {
    +					return ((JobManagerMessages.CurrentJobStatus) value).status();
    +				} else if (value instanceof JobManagerMessages.JobNotFound) {
    +					throw new RuntimeException(
    +						new IllegalStateException("Could not find job with JobId " + jobId));
    +				} else {
    +					throw new RuntimeException(
    +						new IllegalStateException("Unknown JobManager response of type " + value.getClass()));
    +				}
    +			}
    +		});
    +	}
    +
    +	private static class UnboundedSource implements SourceFunction<String> {
    +		private boolean running = true;
    +
    +		@Override
    +		public void run(SourceContext<String> ctx) throws Exception {
    +			while (running) {
    --- End diff --
    
    I had this in an earlier version, but then I did some messing around. Fixing


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r173134300
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    +
    +		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
    +	}
    +
    +	/**
    +	 * Requests the {@link JobStatus} of the job with the given {@link JobID}.
    +	 */
    +	private Future<JobStatus> getJobStatus(
    +		final ActorGateway jobManager,
    +		final JobID jobId,
    +		final FiniteDuration timeout) {
    +
    +		scala.concurrent.Future<Object> response =
    +			jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
    +
    +		FlinkFuture<Object> flinkFuture = new FlinkFuture<>(response);
    +
    +		return flinkFuture.thenApply(new ApplyFunction<Object, JobStatus>() {
    +			@Override
    +			public JobStatus apply(Object value) {
    +				if (value instanceof JobManagerMessages.CurrentJobStatus) {
    +					return ((JobManagerMessages.CurrentJobStatus) value).status();
    +				} else if (value instanceof JobManagerMessages.JobNotFound) {
    +					throw new RuntimeException(
    +						new IllegalStateException("Could not find job with JobId " + jobId));
    +				} else {
    +					throw new RuntimeException(
    +						new IllegalStateException("Unknown JobManager response of type " + value.getClass()));
    +				}
    +			}
    +		});
    +	}
    +
    +	private static class UnboundedSource implements SourceFunction<String> {
    +		private boolean running = true;
    --- End diff --
    
    fixing


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha closed the pull request at:

    https://github.com/apache/flink/pull/5654


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r173142937
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    --- End diff --
    
    Yes, I'll remove that.


---

[GitHub] flink issue #5654: [FLINK-8487] Verify ZooKeeper checkpoint store behaviour ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5654
  
    merged


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r172972736
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    +
    +		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
    --- End diff --
    
    assertEquals? ;-)


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r172972120
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    +
    +		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
    +	}
    +
    +	/**
    +	 * Requests the {@link JobStatus} of the job with the given {@link JobID}.
    +	 */
    +	private Future<JobStatus> getJobStatus(
    +		final ActorGateway jobManager,
    +		final JobID jobId,
    +		final FiniteDuration timeout) {
    +
    +		scala.concurrent.Future<Object> response =
    +			jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
    +
    +		FlinkFuture<Object> flinkFuture = new FlinkFuture<>(response);
    +
    +		return flinkFuture.thenApply(new ApplyFunction<Object, JobStatus>() {
    +			@Override
    +			public JobStatus apply(Object value) {
    +				if (value instanceof JobManagerMessages.CurrentJobStatus) {
    +					return ((JobManagerMessages.CurrentJobStatus) value).status();
    +				} else if (value instanceof JobManagerMessages.JobNotFound) {
    +					throw new RuntimeException(
    +						new IllegalStateException("Could not find job with JobId " + jobId));
    +				} else {
    +					throw new RuntimeException(
    +						new IllegalStateException("Unknown JobManager response of type " + value.getClass()));
    +				}
    +			}
    +		});
    +	}
    +
    +	private static class UnboundedSource implements SourceFunction<String> {
    +		private boolean running = true;
    +
    +		@Override
    +		public void run(SourceContext<String> ctx) throws Exception {
    +			while (running) {
    +				ctx.collect("hello");
    +				// don't overdo it ... ;-)
    +				Thread.sleep(50);
    +				if (CheckpointBlockingFunction.afterMessWithZooKeeper.get()) {
    +					break;
    +				}
    +			}
    +		}
    +
    +		@Override
    +		public void cancel() {
    +			running = false;
    +		}
    +	}
    +
    +	private static class CheckpointBlockingFunction
    +			extends RichMapFunction<String, String>
    +			implements CheckpointedFunction {
    +
    +		// verify that we only call initializeState()
    +		// once with isRestored() == false. All other invocations must have isRestored() == true. This
    +		// verifies that we don't restart a job from scratch in case the CompletedCheckpoints can't
    +		// be read.
    +		static AtomicInteger allowedInitializeCallsWithoutRestore = new AtomicInteger(1);
    +
    +		// we count when we see restores that are not allowed. We only
    +		// allow restores once we messed with the HA directory and moved it back again
    +		static AtomicInteger illegalRestores = new AtomicInteger(0);
    +		static AtomicInteger successfulRestores = new AtomicInteger(0);
    +
    +		// whether we are after the phase where we messed with the ZooKeeper HA directory, i.e.
    +		// whether it's now ok for a restore to happen
    +		static AtomicBoolean afterMessWithZooKeeper = new AtomicBoolean(false);
    +
    +		static AtomicBoolean failedAlready = new AtomicBoolean(false);
    +
    +		// also have some state to write to the checkpoint
    +		private final ValueStateDescriptor<String> stateDescriptor =
    +			new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
    +
    +		@Override
    +		public String map(String value) throws Exception {
    +			getRuntimeContext().getState(stateDescriptor).update("42");
    +			return value;
    +		}
    +
    +		@Override
    +		public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +			if (context.getCheckpointId() > 5) {
    +				waitForCheckpointLatch.trigger();
    +				failInCheckpointLatch.await();
    +				if (!failedAlready.getAndSet(true)) {
    +					throw new RuntimeException("Failing on purpose.");
    +				}
    +			}
    +		}
    +
    +		@Override
    +		public void initializeState(FunctionInitializationContext context) {
    +			if (!context.isRestored()) {
    --- End diff --
    
    I am wondering if you can get rid of the `allowedInitializeCallsWithoutRestore` completely and simply make this an `assertEquals(afterMessWithZooKeeper, context.isRestored())`.


---

[GitHub] flink issue #5654: [FLINK-8487] Verify ZooKeeper checkpoint store behaviour ...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5654
  
    @StephanEwen pushed some changes


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r173142759
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    +
    +		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
    +	}
    +
    +	/**
    +	 * Requests the {@link JobStatus} of the job with the given {@link JobID}.
    +	 */
    +	private Future<JobStatus> getJobStatus(
    +		final ActorGateway jobManager,
    +		final JobID jobId,
    +		final FiniteDuration timeout) {
    +
    +		scala.concurrent.Future<Object> response =
    +			jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
    +
    +		FlinkFuture<Object> flinkFuture = new FlinkFuture<>(response);
    +
    +		return flinkFuture.thenApply(new ApplyFunction<Object, JobStatus>() {
    +			@Override
    +			public JobStatus apply(Object value) {
    +				if (value instanceof JobManagerMessages.CurrentJobStatus) {
    +					return ((JobManagerMessages.CurrentJobStatus) value).status();
    +				} else if (value instanceof JobManagerMessages.JobNotFound) {
    +					throw new RuntimeException(
    +						new IllegalStateException("Could not find job with JobId " + jobId));
    +				} else {
    +					throw new RuntimeException(
    +						new IllegalStateException("Unknown JobManager response of type " + value.getClass()));
    +				}
    +			}
    +		});
    +	}
    +
    +	private static class UnboundedSource implements SourceFunction<String> {
    +		private boolean running = true;
    +
    +		@Override
    +		public void run(SourceContext<String> ctx) throws Exception {
    +			while (running) {
    +				ctx.collect("hello");
    +				// don't overdo it ... ;-)
    +				Thread.sleep(50);
    +				if (CheckpointBlockingFunction.afterMessWithZooKeeper.get()) {
    +					break;
    +				}
    +			}
    +		}
    +
    +		@Override
    +		public void cancel() {
    +			running = false;
    +		}
    +	}
    +
    +	private static class CheckpointBlockingFunction
    +			extends RichMapFunction<String, String>
    +			implements CheckpointedFunction {
    +
    +		// verify that we only call initializeState()
    +		// once with isRestored() == false. All other invocations must have isRestored() == true. This
    +		// verifies that we don't restart a job from scratch in case the CompletedCheckpoints can't
    +		// be read.
    +		static AtomicInteger allowedInitializeCallsWithoutRestore = new AtomicInteger(1);
    +
    +		// we count when we see restores that are not allowed. We only
    +		// allow restores once we messed with the HA directory and moved it back again
    +		static AtomicInteger illegalRestores = new AtomicInteger(0);
    +		static AtomicInteger successfulRestores = new AtomicInteger(0);
    +
    +		// whether we are after the phase where we messed with the ZooKeeper HA directory, i.e.
    +		// whether it's now ok for a restore to happen
    +		static AtomicBoolean afterMessWithZooKeeper = new AtomicBoolean(false);
    +
    +		static AtomicBoolean failedAlready = new AtomicBoolean(false);
    +
    +		// also have some state to write to the checkpoint
    +		private final ValueStateDescriptor<String> stateDescriptor =
    +			new ValueStateDescriptor<>("state", StringSerializer.INSTANCE);
    +
    +		@Override
    +		public String map(String value) throws Exception {
    +			getRuntimeContext().getState(stateDescriptor).update("42");
    +			return value;
    +		}
    +
    +		@Override
    +		public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +			if (context.getCheckpointId() > 5) {
    +				waitForCheckpointLatch.trigger();
    +				failInCheckpointLatch.await();
    +				if (!failedAlready.getAndSet(true)) {
    +					throw new RuntimeException("Failing on purpose.");
    +				}
    +			}
    +		}
    +
    +		@Override
    +		public void initializeState(FunctionInitializationContext context) {
    +			if (!context.isRestored()) {
    --- End diff --
    
    No, this is exactly the thing we want to test. If we didn't have this check we would allow the case where ZooKeeper cannot read any of the state handles and will start the job from scratch.
    
    There might be other ways around it but I like this explicit way. What do you think?


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r172972943
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    +
    +		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
    --- End diff --
    
    Or drop the `illegalRestores` - may not be necessary, see above. 


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r172972678
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    --- End diff --
    
    If you are in state `FINISHED` here, you must have successfully restored. If you did not successfully restore, this latch will block forever and the test will time out.


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r173143151
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    +
    +		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
    --- End diff --
    
    I think it is necessary, without more in-depth re-writing of the test, see comment above.
    
    Regarding `assertThat`, I like it because it reads more like a sentence but I can also change it.


---

[GitHub] flink pull request #5654: [FLINK-8487] Verify ZooKeeper checkpoint store beh...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5654#discussion_r172977111
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java ---
    @@ -0,0 +1,387 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.test.checkpointing;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    +import org.apache.flink.api.common.state.ValueStateDescriptor;
    +import org.apache.flink.api.common.typeutils.base.StringSerializer;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.runtime.concurrent.ApplyFunction;
    +import org.apache.flink.runtime.concurrent.Future;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobgraph.JobStatus;
    +import org.apache.flink.runtime.messages.JobManagerMessages;
    +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.testingUtils.TestingUtils;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.SourceFunction;
    +import org.apache.flink.test.util.TestBaseUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.curator.test.TestingServer;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.File;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import scala.concurrent.Await;
    +import scala.concurrent.duration.Deadline;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import static org.hamcrest.core.Is.is;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertThat;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Integration tests for {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
    + */
    +public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
    +
    +	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +	private static final int NUM_JMS = 1;
    +	private static final int NUM_TMS = 1;
    +	private static final int NUM_SLOTS_PER_TM = 1;
    +
    +	@ClassRule
    +	public static final TemporaryFolder temporaryFolder = new TemporaryFolder();
    +
    +	private static File haStorageDir;
    +
    +	private static TestingServer zkServer;
    +
    +	private static LocalFlinkMiniCluster cluster = null;
    +
    +	private static OneShotLatch waitForCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch failInCheckpointLatch = new OneShotLatch();
    +	private static OneShotLatch successfulRestoreLatch = new OneShotLatch();
    +
    +	@BeforeClass
    +	public static void setup() throws Exception {
    +		zkServer = new TestingServer();
    +
    +		Configuration config = new Configuration();
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
    +		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
    +		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
    +
    +		haStorageDir = temporaryFolder.newFolder();
    +
    +		config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haStorageDir.toString());
    +		config.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
    +		config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
    +		config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
    +
    +		cluster = TestBaseUtils.startCluster(config, false);
    +	}
    +
    +	@AfterClass
    +	public static void tearDown() throws Exception {
    +		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    +
    +		zkServer.stop();
    +		zkServer.close();
    +	}
    +
    +	/**
    +	 * Verify that we don't start a job from scratch if we cannot restore any of the
    +	 * CompletedCheckpoints.
    +	 *
    +	 * <p>Synchronization for the different steps and things we want to observe happens via
    +	 * latches in the test method and the methods of {@link CheckpointBlockingFunction}.
    +	 *
    +	 * <p>The test follows these steps:
    +	 * <ol>
    +	 *     <li>Start job and block on a latch until we have done some checkpoints
    +	 *     <li>Block in the special function
    +	 *     <li>Move away the contents of the ZooKeeper HA directory to make restoring from
    +	 *       checkpoints impossible
    +	 *     <li>Unblock the special function, which now induces a failure
    +	 *     <li>Make sure that the job does not recover successfully
    +	 *     <li>Move back the HA directory
    +	 *     <li>Make sure that the job recovers, we use a latch to ensure that the operator
    +	 *       restored successfully
    +	 * </ol>
    +	 */
    +	@Test(timeout = 120_000L)
    +	public void testRestoreBehaviourWithFaultyStateHandles() throws Exception {
    +		CheckpointBlockingFunction.allowedInitializeCallsWithoutRestore.set(1);
    +		CheckpointBlockingFunction.successfulRestores.set(0);
    +		CheckpointBlockingFunction.illegalRestores.set(0);
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(false);
    +		CheckpointBlockingFunction.failedAlready.set(false);
    +
    +		waitForCheckpointLatch = new OneShotLatch();
    +		failInCheckpointLatch = new OneShotLatch();
    +		successfulRestoreLatch = new OneShotLatch();
    +
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setParallelism(1);
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
    +		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
    +
    +		File checkpointLocation = temporaryFolder.newFolder();
    +		env.setStateBackend(new FsStateBackend(checkpointLocation.toURI()));
    +
    +		DataStreamSource<String> source = env.addSource(new UnboundedSource());
    +
    +		source
    +			.keyBy(new KeySelector<String, String>() {
    +				@Override
    +				public String getKey(String value) {
    +					return value;
    +				}
    +			})
    +			.map(new CheckpointBlockingFunction());
    +
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobID = Preconditions.checkNotNull(jobGraph.getJobID());
    +
    +		// Retrieve the job manager
    +		final ActorGateway jobManager = Await.result(cluster.leaderGateway().future(), deadline.timeLeft());
    +
    +		cluster.submitJobDetached(jobGraph);
    +
    +		// wait until we did some checkpoints
    +		waitForCheckpointLatch.await();
    +
    +		// mess with the HA directory so that the job cannot restore
    +		File movedCheckpointLocation = temporaryFolder.newFolder();
    +		int numCheckpoints = 0;
    +		File[] files = haStorageDir.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(movedCheckpointLocation, file.getName())));
    +				numCheckpoints++;
    +			}
    +		}
    +		assertTrue(numCheckpoints > 0);
    +
    +		failInCheckpointLatch.trigger();
    +
    +		// Ensure that we see at least one cycle where the job tries to restart and fails.
    +		Future<JobStatus> jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call(){
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus){
    +					return jobStatus == JobStatus.RESTARTING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.RESTARTING, jobStatusFuture.get());
    +
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FAILING;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FAILING, jobStatusFuture.get());
    +
    +		// move back the HA directory so that the job can restore
    +		CheckpointBlockingFunction.afterMessWithZooKeeper.set(true);
    +
    +		files = movedCheckpointLocation.listFiles();
    +		assertNotNull(files);
    +		for (File file : files) {
    +			if (file.getName().startsWith("completedCheckpoint")) {
    +				assertTrue(file.renameTo(new File(haStorageDir, file.getName())));
    +			}
    +		}
    +
    +		// now the job should be able to go to RUNNING again and then eventually to FINISHED
    +		jobStatusFuture = FutureUtils.retrySuccessful(
    +			new Callable<Future<JobStatus>>() {
    +				@Override
    +				public Future<JobStatus> call() {
    +					return getJobStatus(jobManager, jobID, TEST_TIMEOUT);
    +				}
    +			},
    +			new FilterFunction<JobStatus>() {
    +				@Override
    +				public boolean filter(JobStatus jobStatus) {
    +					return jobStatus == JobStatus.FINISHED;
    +				}
    +			},
    +			deadline,
    +			TestingUtils.defaultExecutor());
    +		assertEquals(JobStatus.FINISHED, jobStatusFuture.get());
    +
    +		// make sure we saw a successful restore
    +		successfulRestoreLatch.await();
    +
    +		assertThat("We saw illegal restores.", CheckpointBlockingFunction.illegalRestores.get(), is(0));
    +	}
    +
    +	/**
    +	 * Requests the {@link JobStatus} of the job with the given {@link JobID}.
    +	 */
    +	private Future<JobStatus> getJobStatus(
    +		final ActorGateway jobManager,
    +		final JobID jobId,
    +		final FiniteDuration timeout) {
    +
    +		scala.concurrent.Future<Object> response =
    +			jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
    +
    +		FlinkFuture<Object> flinkFuture = new FlinkFuture<>(response);
    +
    +		return flinkFuture.thenApply(new ApplyFunction<Object, JobStatus>() {
    +			@Override
    +			public JobStatus apply(Object value) {
    +				if (value instanceof JobManagerMessages.CurrentJobStatus) {
    +					return ((JobManagerMessages.CurrentJobStatus) value).status();
    +				} else if (value instanceof JobManagerMessages.JobNotFound) {
    +					throw new RuntimeException(
    +						new IllegalStateException("Could not find job with JobId " + jobId));
    +				} else {
    +					throw new RuntimeException(
    +						new IllegalStateException("Unknown JobManager response of type " + value.getClass()));
    +				}
    +			}
    +		});
    +	}
    +
    +	private static class UnboundedSource implements SourceFunction<String> {
    +		private boolean running = true;
    +
    +		@Override
    +		public void run(SourceContext<String> ctx) throws Exception {
    +			while (running) {
    --- End diff --
    
    `while (running && !CheckpointBlockingFunction.afterMessWithZooKeeper.get())`?


---