You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/13 12:39:45 UTC

[1/6] flink git commit: [FLINK-9703] Allow TM ports to be exposed through Mesos

Repository: flink
Updated Branches:
  refs/heads/master 37abf46f6 -> b74594c48


[FLINK-9703] Allow TM ports to be exposed through Mesos

Maintain a deterministic port ordering, so we can have expectations on which endpoint is behind which port index.

This closes #6288.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b74594c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b74594c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b74594c4

Branch: refs/heads/master
Commit: b74594c488e4c5428c5391b89c32ae6fd7d98a79
Parents: d58c8c0
Author: Rune Skou Larsen <rs...@trifork.com>
Authored: Mon Jul 9 13:54:51 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200

----------------------------------------------------------------------
 .../generated/mesos_configuration.html          |  5 ++
 .../flink/mesos/configuration/MesosOptions.java |  8 +++
 .../clusterframework/LaunchableMesosWorker.java | 34 +++++++++---
 .../LaunchableMesosWorkerTest.java              | 55 ++++++++++++++++++++
 4 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b74594c4/docs/_includes/generated/mesos_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/mesos_configuration.html b/docs/_includes/generated/mesos_configuration.html
index 16a2388..cd0ae24 100644
--- a/docs/_includes/generated/mesos_configuration.html
+++ b/docs/_includes/generated/mesos_configuration.html
@@ -62,5 +62,10 @@
             <td style="word-wrap: break-word;">(none)</td>
             <td>Mesos framework user</td>
         </tr>
+        <tr>
+            <td><h5>mesos.resourcemanager.tasks.port-assignments</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Comma-separated list of configuration keys which represent a configurable port.All port keys will dynamically get a port assigned through Mesos.</td>
+        </tr>
     </tbody>
 </table>

http://git-wip-us.apache.org/repos/asf/flink/blob/b74594c4/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
index 6c802fa..753923f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -120,4 +120,12 @@ public class MesosOptions {
 			.withDescription("Enables SSL for the Flink artifact server. Note that security.ssl.enabled also needs to" +
 				" be set to true encryption to enable encryption.");
 
+	/**
+	 * Config parameter to configure which configuration keys will dynamically get a port assigned through Mesos.
+	 */
+	public static final ConfigOption<String> PORT_ASSIGNMENTS = key("mesos.resourcemanager.tasks.port-assignments")
+		.defaultValue("")
+		.withDescription("Comma-separated list of configuration keys which represent a configurable port." +
+			"All port keys will dynamically get a port assigned through Mesos.");
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b74594c4/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 93c90b7..bb15aee 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -30,8 +30,6 @@ import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
-
 import com.netflix.fenzo.ConstraintEvaluator;
 import com.netflix.fenzo.TaskRequest;
 import com.netflix.fenzo.VMTaskFitnessCalculator;
@@ -41,8 +39,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -53,6 +53,7 @@ import scala.Option;
 
 import static org.apache.flink.mesos.Utils.rangeValues;
 import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
 
 /**
  * Implements the launch of a Mesos worker.
@@ -66,7 +67,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 	/**
 	 * The set of configuration keys to be dynamically configured with a port allocated from Mesos.
 	 */
-	private static final String[] TM_PORT_KEYS = {
+	static final String[] TM_PORT_KEYS = {
 		"taskmanager.rpc.port",
 		"taskmanager.data.port"};
 
@@ -147,7 +148,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
 
 		@Override
 		public int getPorts() {
-			return TM_PORT_KEYS.length;
+			return extractPortKeys(containerSpec.getDynamicConfiguration()).size();
 		}
 
 		@Override
@@ -235,9 +236,10 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		}
 
 		// take needed ports for the TM
-		List<Protos.Resource> portResources = allocation.takeRanges("ports", TM_PORT_KEYS.length, roles);
+		Set<String> tmPortKeys = extractPortKeys(containerSpec.getDynamicConfiguration());
+		List<Protos.Resource> portResources = allocation.takeRanges("ports", tmPortKeys.size(), roles);
 		taskInfo.addAllResources(portResources);
-		Iterator<String> portsToAssign = Iterators.forArray(TM_PORT_KEYS);
+		Iterator<String> portsToAssign = tmPortKeys.iterator();
 		rangeValues(portResources).forEach(port -> dynamicProperties.setLong(portsToAssign.next(), port));
 		if (portsToAssign.hasNext()) {
 			throw new IllegalArgumentException("insufficient # of ports assigned");
@@ -332,6 +334,26 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		return taskInfo.build();
 	}
 
+	/**
+	 * Get the port keys representing the TM's configured endpoints. This includes mandatory TM endpoints such as
+	 * data and rpc as well as optionally configured endpoints for services such as prometheus reporter
+	 *
+	 * @param config to extract the port keys from
+	 * @return A deterministically ordered Set of port keys to expose from the TM container
+	 */
+	static Set<String> extractPortKeys(Configuration config) {
+		final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
+
+		final String portKeys = config.getString(PORT_ASSIGNMENTS);
+
+		Arrays.stream(portKeys.split(","))
+			.map(String::trim)
+			.peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
+			.forEach(tmPortKeys::add);
+
+		return tmPortKeys;
+	}
+
 	@Override
 	public String toString() {
 		return "LaunchableMesosWorker{" +

http://git-wip-us.apache.org/repos/asf/flink/blob/b74594c4/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
new file mode 100644
index 0000000..6784e427
--- /dev/null
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test that mesos config are extracted correctly from the configuration.
+ */
+public class LaunchableMesosWorkerTest extends TestLogger {
+
+	@Test
+	public void canGetPortKeys() {
+		// Setup
+		Configuration config = new Configuration();
+		config.setString(PORT_ASSIGNMENTS, "someport.here,anotherport");
+
+		// Act
+		Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
+
+		// Assert
+		assertEquals("Must get right number of port keys", 4, portKeys.size());
+		Iterator<String> iterator = portKeys.iterator();
+		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
+		assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
+		assertEquals("port key must be correct", "someport.here", iterator.next());
+		assertEquals("port key must be correct", "anotherport", iterator.next());
+	}
+
+}


[2/6] flink git commit: [FLINK-9503] Migrate integration tests for iterative aggregators

Posted by tr...@apache.org.
[FLINK-9503] Migrate integration tests for iterative aggregators

This closes #6129.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d783c628
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d783c628
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d783c628

Branch: refs/heads/master
Commit: d783c6282e7362fae29b35e61b1522912eab2c36
Parents: fc49801
Author: yanghua <ya...@gmail.com>
Authored: Thu Jun 7 00:23:25 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200

----------------------------------------------------------------------
 .../aggregators/AggregatorsITCase.java          | 95 ++++++++------------
 1 file changed, 39 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d783c628/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index bd42ac2..e449ab8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -35,10 +35,8 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -48,6 +46,9 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.FileWriter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
@@ -63,35 +64,24 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 	private static final int parallelism = 2;
 	private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
 
-	private static String testString = "Et tu, Brute?";
-	private static String testName = "testing_caesar";
-	private static String testPath;
-
 	public AggregatorsITCase(TestExecutionMode mode){
 		super(mode);
 	}
 
-	private String resultPath;
-	private String expected;
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
 
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
+	@Test
+	public void testDistributedCacheWithIterations() throws Exception{
+		final String testString = "Et tu, Brute?";
+		final String testName = "testing_caesar";
 
-	@Before
-	public void before() throws Exception{
 		final File folder = tempFolder.newFolder();
 		final File resultFile = new File(folder, UUID.randomUUID().toString());
-		testPath = resultFile.toString();
-		resultPath = resultFile.toURI().toString();
-	}
 
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
+		String testPath = resultFile.toString();
+		String resultPath = resultFile.toURI().toString();
 
-	@Test
-	public void testDistributedCacheWithIterations() throws Exception{
 		File tempFile = new File(testPath);
 		try (FileWriter writer = new FileWriter(tempFile)) {
 			writer.write(testString);
@@ -117,7 +107,6 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 			}
 		}).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat<Long>());
 		env.execute();
-		expected = testString; // this will be a useless verification now.
 	}
 
 	@Test
@@ -141,12 +130,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				new NegativeElementsConvergenceCriterion());
 
 		DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
-		iteration.closeWith(updatedDs).writeAsText(resultPath);
-		env.execute();
+		List<Integer> result = iteration.closeWith(updatedDs).collect();
+		Collections.sort(result);
 
-		expected =  "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+		List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+		assertEquals(expected, result);
 	}
 
 	@Test
@@ -170,12 +159,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				new NegativeElementsConvergenceCriterion());
 
 		DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam());
-		iteration.closeWith(updatedDs).writeAsText(resultPath);
-		env.execute();
+		List<Integer> result = iteration.closeWith(updatedDs).collect();
+		Collections.sort(result);
 
-		expected =  "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+		List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+		assertEquals(expected, result);
 	}
 
 	@Test
@@ -199,12 +188,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				new NegativeElementsConvergenceCriterionWithParam(3));
 
 		DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
-		iteration.closeWith(updatedDs).writeAsText(resultPath);
-		env.execute();
+		List<Integer> result = iteration.closeWith(updatedDs).collect();
+		Collections.sort(result);
 
-		expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+		List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+		assertEquals(expected, result);
 	}
 
 	@Test
@@ -231,14 +220,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				.where(0).equalTo(0).flatMap(new UpdateFilter());
 
 		DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
-		DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
-		result.writeAsText(resultPath);
+		List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+		Collections.sort(result);
 
-		env.execute();
+		List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);
 
-		expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
-				+ "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
-				+ "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+		assertEquals(expected, result);
 	}
 
 	@Test
@@ -265,14 +252,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				.where(0).equalTo(0).flatMap(new UpdateFilter());
 
 		DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
-		DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
-		result.writeAsText(resultPath);
+		List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+		Collections.sort(result);
 
-		env.execute();
+		List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);
 
-		expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
-				+ "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
-				+ "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+		assertEquals(result, expected);
 	}
 
 	@Test
@@ -303,14 +288,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
 				.where(0).equalTo(0).projectFirst(0, 1);
 
 		DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
-		DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
-		result.writeAsText(resultPath);
+		List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+		Collections.sort(result);
 
-		env.execute();
+		List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
 
-		expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
-				+ "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
-				+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+		assertEquals(expected, result);
 	}
 
 	@SuppressWarnings("serial")


[4/6] flink git commit: [FLINK-9004][tests] Implement Jepsen tests to test job availability.

Posted by tr...@apache.org.
[FLINK-9004][tests] Implement Jepsen tests to test job availability.

Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network
partitions, etc. The Flink cluster under test is automatically deployed on YARN
(session & job mode) and Mesos.

Provide Dockerfiles for local test development.

This closes #6240.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d58c8c05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d58c8c05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d58c8c05

Branch: refs/heads/master
Commit: d58c8c05f0b86bdb74cb6e450848690e309011d6
Parents: d783c62
Author: gyao <ga...@data-artisans.com>
Authored: Mon Mar 5 22:23:33 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200

----------------------------------------------------------------------
 flink-jepsen/.gitignore                         |  18 ++
 flink-jepsen/README.md                          |  70 ++++++
 flink-jepsen/bin/.gitkeep                       |   0
 flink-jepsen/docker/.gitignore                  |   3 +
 flink-jepsen/docker/Dockerfile-control          |  45 ++++
 flink-jepsen/docker/Dockerfile-db               |  39 ++++
 flink-jepsen/docker/docker-compose.yml          |  52 +++++
 flink-jepsen/docker/run-tests.sh                |  31 +++
 flink-jepsen/docker/up.sh                       |  31 +++
 flink-jepsen/project.clj                        |  28 +++
 flink-jepsen/scripts/run-tests.sh               |  43 ++++
 flink-jepsen/src/jepsen/flink/checker.clj       | 128 ++++++++++
 flink-jepsen/src/jepsen/flink/client.clj        | 150 ++++++++++++
 flink-jepsen/src/jepsen/flink/db.clj            | 232 +++++++++++++++++++
 flink-jepsen/src/jepsen/flink/flink.clj         | 110 +++++++++
 flink-jepsen/src/jepsen/flink/generator.clj     |  39 ++++
 flink-jepsen/src/jepsen/flink/hadoop.clj        | 139 +++++++++++
 flink-jepsen/src/jepsen/flink/mesos.clj         | 165 +++++++++++++
 flink-jepsen/src/jepsen/flink/nemesis.clj       | 163 +++++++++++++
 flink-jepsen/src/jepsen/flink/utils.clj         |  48 ++++
 flink-jepsen/src/jepsen/flink/zookeeper.clj     |  29 +++
 flink-jepsen/test/jepsen/flink/checker_test.clj |  82 +++++++
 flink-jepsen/test/jepsen/flink/client_test.clj  |  37 +++
 flink-jepsen/test/jepsen/flink/utils_test.clj   |  39 ++++
 .../test/jepsen/flink/zookeeper_test.clj        |  28 +++
 25 files changed, 1749 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/.gitignore
----------------------------------------------------------------------
diff --git a/flink-jepsen/.gitignore b/flink-jepsen/.gitignore
new file mode 100644
index 0000000..ed5eca5
--- /dev/null
+++ b/flink-jepsen/.gitignore
@@ -0,0 +1,18 @@
+*.class
+*.iml
+*.jar
+*.retry
+.DS_Store
+.hg/
+.hgignore
+.idea/
+/.lein-*
+/.nrepl-port
+/checkouts
+/classes
+/target
+pom.xml
+pom.xml.asc
+store
+bin/*
+!bin/.gitkeep

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/README.md
----------------------------------------------------------------------
diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
new file mode 100644
index 0000000..9343246
--- /dev/null
+++ b/flink-jepsen/README.md
@@ -0,0 +1,70 @@
+# flink-jepsen
+
+A Clojure project based on the [Jepsen](https://github.com/jepsen-io/jepsen) framework to find bugs in the
+distributed coordination of Apache Flink®.
+
+## Test Coverage
+Jepsen is a framework built to test the behavior of distributed systems
+under faults. The tests in this particular project deploy Flink on either YARN or Mesos, submit a
+job, and examine the availability of the job after injecting faults.
+A job is said to be available if all the tasks of the job are running.
+The faults that can be currently introduced to the Flink cluster include:
+* Killing of TaskManager/JobManager processes
+* Stopping HDFS NameNode
+* Network partitions
+
+There are many more properties other than job availability that could be
+verified but are not yet covered by this test suite, e.g., end-to-end exactly-once processing
+semantics.
+
+## Usage
+See the [Jepsen documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment)
+for how to set up the environment to run tests. The script under `scripts/run-tests.sh` documents how to invoke
+tests. The Flink job used for testing is located under
+`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build the job first and copy
+the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's
+root.
+
+### Docker
+
+To simplify development, we have prepared Dockerfiles and a Docker Compose template
+so that you can run the tests locally in containers. To build the images
+and start the containers, simply run:
+
+    $ cd docker
+    $ ./up.sh
+
+After the containers started, open a new terminal window and run `docker exec -it jepsen-control bash`.
+This will allow you to run arbitrary commands on the control node.
+To start the tests, you can use the `run-tests.sh` script in the `docker` directory,
+which expects the number of test iterations, and a URI to a Flink distribution, e.g.,
+
+    ./docker/run-tests.sh 1 https://example.com/flink-dist.tgz
+
+The project's root is mounted as a volume to all containers under the path `/jepsen`.
+This means that changes to the test sources are immediately reflected in the control node container.
+Moreover, this allows you to test locally built Flink distributions by copying the tarball to the
+project's root and passing a URI with the `file://` scheme to the `run-tests.sh` script, e.g.,
+`file:///jepsen/flink-dist.tgz`.
+
+#### Memory Requirements
+
+The tests have high memory demands due to the many processes that are started by the control node.
+For example, to test Flink on YARN in a HA setup, we require ZooKeeper, HDFS NameNode,
+HDFS DataNode, YARN NodeManager, and YARN ResourceManager, in addition to the Flink processes.
+We found that the tests can be run comfortably in Docker containers on a machine with 32 GiB RAM. 
+
+### Checking the Output of Tests
+
+Consult the `jepsen.log` file for the particular test run in the `store` folder. The final output of every test will be either
+
+    Everything looks good! ヽ('ー`)ノ
+
+or
+
+    Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
+
+depending on whether the test passed or not. If neither output is generated, the test did not finish
+properly due to problems of the environment, bugs in Jepsen or in the test suite, etc.
+
+In addition, the test directories contain all relevant log files aggregated from all hosts.

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/bin/.gitkeep
----------------------------------------------------------------------
diff --git a/flink-jepsen/bin/.gitkeep b/flink-jepsen/bin/.gitkeep
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/.gitignore
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/.gitignore b/flink-jepsen/docker/.gitignore
new file mode 100644
index 0000000..6ff5a8e
--- /dev/null
+++ b/flink-jepsen/docker/.gitignore
@@ -0,0 +1,3 @@
+id_rsa
+id_rsa.pub
+nodes

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/Dockerfile-control
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/Dockerfile-control b/flink-jepsen/docker/Dockerfile-control
new file mode 100644
index 0000000..96198e3
--- /dev/null
+++ b/flink-jepsen/docker/Dockerfile-control
@@ -0,0 +1,45 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM debian:jessie
+
+RUN echo "deb http://http.debian.net/debian jessie-backports main" >> /etc/apt/sources.list && \
+    apt-get update && \
+    apt-get install -y -t jessie-backports openjdk-8-jdk && \
+    apt-get install -qqy \
+            less \
+            libjna-java \
+            gnuplot \
+            openjdk-8-jdk \
+            openssh-client \
+            vim \
+            wget
+
+ENV LEIN_ROOT true
+RUN wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein && \
+    mv lein /usr/bin && \
+    chmod +x /usr/bin/lein && \
+    lein self-install
+
+ADD id_rsa /root/.ssh/
+ADD id_rsa.pub /root/.ssh/
+RUN touch ~/.ssh/known_hosts
+
+WORKDIR /jepsen
+
+CMD tail -f /dev/null

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/Dockerfile-db
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/Dockerfile-db b/flink-jepsen/docker/Dockerfile-db
new file mode 100644
index 0000000..1555329
--- /dev/null
+++ b/flink-jepsen/docker/Dockerfile-db
@@ -0,0 +1,39 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM debian:jessie
+
+RUN echo "deb http://http.debian.net/debian jessie-backports main" >> /etc/apt/sources.list && \
+    apt-get update && \
+    apt-get install -y -t jessie-backports openjdk-8-jdk && \
+    apt-get install -y apt-utils bzip2 curl faketime iproute iptables iputils-ping less libzip2 logrotate man man-db net-tools ntpdate psmisc python rsyslog sudo sysvinit sysvinit-core sysvinit-utils tar unzip vim wget
+
+RUN apt-get update && \
+    apt-get -y install openssh-server && \
+    mkdir -p /var/run/sshd && \
+    sed -i "s/UsePrivilegeSeparation.*/UsePrivilegeSeparation no/g" /etc/ssh/sshd_config && \
+    sed -i "s/PermitRootLogin without-password/PermitRootLogin yes/g" /etc/ssh/sshd_config
+
+ADD id_rsa.pub /root
+RUN mkdir -p /root/.ssh/ && \
+    touch /root/.ssh/authorized_keys && \
+    chmod 600 /root/.ssh/authorized_keys && \
+    cat /root/id_rsa.pub >> /root/.ssh/authorized_keys
+
+EXPOSE 22
+CMD exec /usr/sbin/sshd -D

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/docker-compose.yml
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/docker-compose.yml b/flink-jepsen/docker/docker-compose.yml
new file mode 100644
index 0000000..3d2bdbd
--- /dev/null
+++ b/flink-jepsen/docker/docker-compose.yml
@@ -0,0 +1,52 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+version: '2'
+services:
+  control:
+    volumes:
+      - ${JEPSEN_ROOT}:/jepsen
+
+    container_name: jepsen-control
+    hostname: control
+    build:
+      context: ./
+      dockerfile: Dockerfile-control
+    privileged: true
+    links:
+      - n1
+      - n2
+      - n3
+
+  n1:
+    build:
+      context: ./
+      dockerfile: Dockerfile-db
+    privileged: true
+    container_name: jepsen-n1
+    hostname: n1
+    volumes:
+      - ${JEPSEN_ROOT}:/jepsen
+  n2:
+    extends: n1
+    container_name: jepsen-n2
+    hostname: n2
+  n3:
+    extends: n1
+    container_name: jepsen-n3
+    hostname: n3

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/run-tests.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/run-tests.sh b/flink-jepsen/docker/run-tests.sh
new file mode 100755
index 0000000..8b2b1e6
--- /dev/null
+++ b/flink-jepsen/docker/run-tests.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+dockerdir=$(dirname $0)
+dockerdir=$(cd ${dockerdir}; pwd)
+
+cat <<EOF > ${dockerdir}/nodes
+n1
+n2
+n3
+EOF
+
+common_jepsen_args+=(--nodes-file ${dockerdir}/nodes)
+
+. ${dockerdir}/../scripts/run-tests.sh ${1} ${2} 1

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/up.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/up.sh b/flink-jepsen/docker/up.sh
new file mode 100755
index 0000000..5479b3b
--- /dev/null
+++ b/flink-jepsen/docker/up.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -e
+
+dockerdir=$(dirname $0)
+dockerdir=$(cd ${dockerdir}; pwd)
+
+if [ ! -f ./id_rsa ]; then
+    ssh-keygen -t rsa -N "" -f ./id_rsa
+fi
+
+export JEPSEN_ROOT=${dockerdir}/../
+docker-compose build
+docker-compose -f docker-compose.yml up --force-recreate

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/project.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/project.clj b/flink-jepsen/project.clj
new file mode 100644
index 0000000..78935d7
--- /dev/null
+++ b/flink-jepsen/project.clj
@@ -0,0 +1,28 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(defproject jepsen.flink "0.1.0-SNAPSHOT"
+  :license {:name "Apache License"
+            :url  "http://www.apache.org/licenses/LICENSE-2.0"}
+  :main jepsen.flink.flink
+  :dependencies [[org.clojure/clojure "1.9.0"],
+                 [cheshire "5.8.0"]
+                 [clj-http "3.8.0"]
+                 [jepsen "0.1.10"],
+                 [jepsen.zookeeper "0.1.0"]
+                 [org.clojure/data.xml "0.0.8"]
+                 [zookeeper-clj "0.9.4" :exclusions [org.slf4j/slf4j-log4j12]]]
+  :profiles {:test {:dependencies [[clj-http-fake "1.0.3"]]}})

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/scripts/run-tests.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/scripts/run-tests.sh b/flink-jepsen/scripts/run-tests.sh
new file mode 100755
index 0000000..e448124
--- /dev/null
+++ b/flink-jepsen/scripts/run-tests.sh
@@ -0,0 +1,43 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -euo pipefail
+
+scripts=$(dirname $0)
+scripts=$(cd ${scripts}; pwd)
+
+parallelism=${3}
+
+common_jepsen_args+=(--ha-storage-dir hdfs:///flink
+--job-jar ${scripts}/../bin/DataStreamAllroundTestProgram.jar
+--tarball ${2}
+--job-args "--environment.parallelism ${parallelism} --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"
+--ssh-private-key ~/.ssh/id_rsa)
+
+for i in $(seq 1 ${1})
+do
+  echo "Executing run #${i} of ${1}"
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-job
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-job
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-job
+  echo
+done

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/checker.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj b/flink-jepsen/src/jepsen/flink/checker.clj
new file mode 100644
index 0000000..02cc863
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -0,0 +1,128 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.checker
+  (:require [jepsen
+             [checker :as checker]
+             [util :as ju]]
+            [knossos.model :as model])
+  (:import (knossos.model Model)))
+
+(defn stoppable-op? [op]
+  (clojure.string/includes? (name (:f op)) "-start"))
+
+(defn stop-op? [op]
+  (clojure.string/includes? (name (:f op)) "-stop"))
+
+(defn strip-op-suffix [op]
+  (clojure.string/replace (name (:f op)) #"-start|-stop" ""))
+
+(def safe-inc
+  (fnil inc 0))
+
+(defn nemeses-active?
+  [active-nemeses]
+  (->> (vals active-nemeses)
+       (reduce +)
+       pos?))
+
+(defn dissoc-if
+  [f m]
+  (->> (remove f m)
+       (into {})))
+
+(defn zero-value?
+  [[_ v]]
+  (zero? v))
+
+(defrecord
+  JobRunningWithinGracePeriod
+  ^{:doc "A Model which is consistent iff. the Flink job became available within
+  `job-recovery-grace-period` seconds after the last fault injected by the nemesis.
+  Note that some faults happen at a single point in time (e.g., killing of processes). Other faults,
+  such as network splits, happen during a period of time, and can thus be interleaving. As long as
+  there are active faults, the job is allowed not to be available."}
+  [active-nemeses                                           ; stores active failures
+   healthy-count                                            ; how many consecutive times was the job running?
+   last-failure                                             ; timestamp when the last failure was injected/ended
+   healthy-threshold                                        ; after how many times is the job considered healthy
+   job-recovery-grace-period]                               ; after how many seconds should the job be recovered
+  Model
+  (step [this op]
+    (case (:process op)
+      :nemesis (cond
+                 (nil? (:value op)) this
+                 (stoppable-op? op) (assoc
+                                      this
+                                      :active-nemeses (update active-nemeses
+                                                              (strip-op-suffix op)
+                                                              safe-inc))
+                 (stop-op? op) (assoc
+                                 this
+                                 :active-nemeses (dissoc-if zero-value?
+                                                            (update active-nemeses (strip-op-suffix op) dec))
+                                 :last-failure (:time op))
+                 :else (assoc this :last-failure (:time op)))
+      (case (:f op)
+        :job-running? (case (:type op)
+                        :info this                          ; ignore :info operations
+                        :fail this                          ; ignore :fail operations
+                        :invoke this                        ; ignore :invoke operations
+                        :ok (if (:value op)                 ; check if job is running
+                              (assoc                        ; job is running
+                                this
+                                :healthy-count
+                                (inc healthy-count))
+                              (if (and                      ; job is not running
+                                    (not (nemeses-active? active-nemeses))
+                                    (< healthy-count healthy-threshold)
+                                    (> (ju/nanos->secs (- (:time op) last-failure)) job-recovery-grace-period))
+                                ; job is not running but it should be running
+                                ; because grace period passed
+                                (model/inconsistent "Job is not running.")
+                                (conj this
+                                      [:healthy-count 0]))))
+        ; ignore other client operations
+        this))))
+
+(defn job-running-within-grace-period
+  [job-running-healthy-threshold job-recovery-grace-period]
+  (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold job-recovery-grace-period))
+
+(defn job-running-checker
+  []
+  (reify
+    checker/Checker
+    (check [_ test model history _]
+      (let [final (reduce model/step (assoc model :last-failure (:time (first history))) history)
+            result-map (conj {}
+                             (find test :nemesis-gen)
+                             (find test :deployment-mode))]
+        (if (or (model/inconsistent? final) (zero? (:healthy-count final 0)))
+          (into result-map {:valid? false
+                            :error  (:msg final)})
+          (into result-map {:valid?      true
+                            :final-model final}))))))
+
+(defn get-job-running-history
+  [history]
+  (->>
+    history
+    (remove #(= (:process %) :nemesis))
+    (remove #(= (:type %) :invoke))
+    (map :value)
+    (map boolean)
+    (remove nil?)))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/client.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/client.clj b/flink-jepsen/src/jepsen/flink/client.clj
new file mode 100644
index 0000000..905dc48
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -0,0 +1,150 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.client
+  (:require [clj-http.client :as http]
+            [clojure.tools.logging :refer :all]
+            [jepsen.client :as client]
+            [jepsen.flink.zookeeper :as fz]
+            [jepsen.flink.utils :as fu]
+            [zookeeper :as zk])
+  (:import (java.io ByteArrayInputStream ObjectInputStream)))
+
+(defn connect-zk-client!
+  [connection-string]
+  (zk/connect connection-string :timeout-msec 60000))
+
+(defn read-url
+  [bytes]
+  (with-open [object-input-stream (ObjectInputStream. (ByteArrayInputStream. bytes))]
+    (.readUTF object-input-stream)))
+
+(defn wait-for-zk-operation
+  [zk-client operation path]
+  (let [p (promise)]
+    (letfn [(iter [_]
+              (when-let [res (operation zk-client path :watcher iter)]
+                (deliver p res)))
+            ]
+      (iter nil)
+      p)))
+
+(defn wait-for-path-to-exist
+  [zk-client path]
+  (info "Waiting for path" path "in ZK.")
+  (wait-for-zk-operation zk-client zk/exists path))
+
+(defn wait-for-children-to-exist
+  [zk-client path]
+  (wait-for-zk-operation zk-client zk/children path))
+
+(defn find-application-id
+  [zk-client]
+  (do
+    (->
+      (wait-for-path-to-exist zk-client "/flink")
+      (deref))
+    (->
+      (wait-for-children-to-exist zk-client "/flink")
+      (deref)
+      (first))))
+
+(defn watch-node-bytes
+  [zk-client path callback]
+  (when (zk/exists zk-client path :watcher (fn [_] (watch-node-bytes zk-client path callback)))
+    (->>
+      (zk/data zk-client path :watcher (fn [_] (watch-node-bytes zk-client path callback)))
+      :data
+      (callback))))
+
+(defn make-job-manager-url [test]
+  (let [rest-url-atom (atom nil)
+        zk-client (connect-zk-client! (fz/zookeeper-quorum test))
+        init-future (future
+                      (let [application-id (find-application-id zk-client)
+                            path (str "/flink/" application-id "/leader/rest_server_lock")
+                            _ (->
+                                (wait-for-path-to-exist zk-client path)
+                                (deref))]
+                        (info "Determined application id to be" application-id)
+                        (watch-node-bytes zk-client path
+                                          (fn [bytes]
+                                            (let [url (read-url bytes)]
+                                              (info "Leading REST url changed to" url)
+                                              (reset! rest-url-atom url))))))]
+    {:rest-url-atom rest-url-atom
+     :closer        (fn [] (zk/close zk-client))
+     :init-future   init-future}))
+
+(defn list-jobs!
+  [base-url]
+  (->>
+    (http/get (str base-url "/jobs") {:as :json})
+    :body
+    :jobs
+    (map :id)))
+
+(defn get-job-details!
+  [base-url job-id]
+  (assert base-url)
+  (assert job-id)
+  (let [job-details (->
+                      (http/get (str base-url "/jobs/" job-id) {:as :json})
+                      :body)]
+    (assert (:vertices job-details) "Job does not have vertices")
+    job-details))
+
+(defn job-running?
+  [base-url job-id]
+  (->>
+    (get-job-details! base-url job-id)
+    :vertices
+    (map :status)
+    (every? #(= "RUNNING" %))))
+
+(defrecord Client
+  [deploy-cluster! closer rest-url init-future job-id]
+  client/Client
+  (open! [this test node]
+    (let [{:keys [rest-url-atom closer init-future]} (make-job-manager-url test)]
+      (assoc this :closer closer :rest-url rest-url-atom :init-future init-future :job-id (atom nil))))
+
+  (setup! [this test] this)
+
+  (invoke! [this test op]
+    (case (:f op)
+      :submit (do
+                (deploy-cluster! test)
+                (deref init-future)
+                (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
+                                     :fallback (fn [e] (do
+                                                         (fatal e "Could not get running jobs.")
+                                                         (System/exit 1))))
+                      num-jobs (count jobs)]
+                  (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
+                  (reset! job-id (first jobs)))
+                (assoc op :type :ok))
+      :job-running? (let [base-url @rest-url]
+                      (if base-url
+                        (try
+                          (assoc op :type :ok :value (job-running? base-url @job-id))
+                          (catch Exception e (do
+                                               (warn e "Get job details from" base-url "failed.")
+                                               (assoc op :type :fail))))
+                        (assoc op :type :fail :value "Cluster not deployed yet.")))))
+
+  (teardown! [this test])
+  (close! [this test] (closer)))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/db.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/db.clj b/flink-jepsen/src/jepsen/flink/db.clj
new file mode 100644
index 0000000..ff934e1
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -0,0 +1,232 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.db
+  (:require [clj-http.client :as http]
+            [clojure.java.io]
+            [clojure.string :as str]
+            [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]
+             [db :as db]
+             [util :refer [meh]]
+             [zookeeper :as zk]]
+            [jepsen.control.util :as cu]
+            [jepsen.flink.hadoop :as hadoop]
+            [jepsen.flink.mesos :as mesos]
+            [jepsen.flink.utils :as fu]
+            [jepsen.flink.zookeeper :refer :all]))
+
+(def install-dir "/opt/flink")
+(def upload-dir "/tmp")
+(def log-dir (str install-dir "/log"))
+(def conf-file (str install-dir "/conf/flink-conf.yaml"))
+(def masters-file (str install-dir "/conf/masters"))
+
+(def default-flink-dist-url "https://archive.apache.org/dist/flink/flink-1.5.0/flink-1.5.0-bin-hadoop28-scala_2.11.tgz")
+(def hadoop-dist-url "https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz")
+(def deb-zookeeper-package "3.4.9-3+deb8u1")
+(def deb-mesos-package "1.5.0-2.0.2")
+(def deb-marathon-package "1.6.322")
+
+(def taskmanager-slots 1)
+(def master-count 1)
+
+(defn flink-configuration
+  [test]
+  {:high-availability                  "zookeeper"
+   :high-availability.zookeeper.quorum (zookeeper-quorum test)
+   :high-availability.storageDir       (str (:ha-storage-dir test) "/ha")
+   :state.savepoints.dir               (str (:ha-storage-dir test) "/savepoints")
+   :web.port                           8081
+   :rest.bind-address                  "0.0.0.0"
+   :taskmanager.numberOfTaskSlots      taskmanager-slots
+   :yarn.application-attempts          99999
+   :slotmanager.taskmanager-timeout    10000
+   :state.backend.local-recovery       "true"
+   :taskmanager.registration.timeout   "30 s"})
+
+(defn master-nodes
+  [test]
+  (take master-count (sort (:nodes test))))
+
+(defn write-configuration!
+  "Writes the flink-conf.yaml and masters file to the flink conf directory"
+  [test]
+  (let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
+                                         (seq (flink-configuration test))))
+        m (clojure.string/join "\n" (master-nodes test))]
+    (c/exec :echo c :> conf-file)
+    (c/exec :echo m :> masters-file)
+    ;; TODO: write log4j.properties properly
+    (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, file/' " install-dir "/conf/log4j.properties")))))
+
+(defn install-flink!
+  [test]
+  (let [url (:tarball test)]
+    (info "Installing Flink from" url)
+    (cu/install-archive! url install-dir)
+    (info "Enable S3 FS")
+    (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/lib")))
+    (c/upload (:job-jar test) upload-dir)
+    (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar test)))) install-dir)
+    (write-configuration! test)))
+
+(defn teardown-flink!
+  []
+  (info "Tearing down Flink")
+  (meh (c/exec :rm :-rf install-dir))
+  (meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
+
+(defn get-log-files!
+  []
+  (if (cu/exists? log-dir) (cu/ls-full log-dir) []))
+
+(defn flink-db
+  [test]
+  (reify db/DB
+    (setup! [_ test node]
+      (c/su
+        (install-flink! test)))
+
+    (teardown! [_ test node]
+      (c/su
+        (teardown-flink!)))
+
+    db/LogFiles
+    (log-files [_ test node]
+      (concat
+        (get-log-files!)))))
+
+(defn combined-db
+  [dbs]
+  (reify db/DB
+    (setup! [_ test node]
+      (c/su
+        (doall (map #(db/setup! % test node) dbs))))
+    (teardown! [_ test node]
+      (c/su
+        (doall (map #(db/teardown! % test node) dbs))))
+    db/LogFiles
+    (log-files [_ test node]
+      (flatten (map #(db/log-files % test node) dbs)))))
+
+;;; YARN
+
+(defn flink-yarn-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        flink (flink-db test)]
+    (combined-db [hadoop zk flink])))
+
+(defn exec-flink!
+  [test cmd args]
+  (c/su
+    (c/exec (c/lit (str
+                     "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+                     "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+                     install-dir "/bin/flink " cmd " " args)))))
+
+(defn flink-run-cli-args
+  "Returns the CLI args that should be passed to 'flink run'"
+  [test]
+  (concat
+    ["-d"]
+    (if (:main-class test)
+      [(str "-c " (:main-class test))]
+      [])
+    (if (= :yarn-job (:deployment-mode test))
+      ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]
+      [])))
+
+(defn submit-job!
+  ([test] (submit-job! test []))
+  ([test cli-args]
+   (exec-flink! test "run" (clojure.string/join
+                             " "
+                             (concat cli-args
+                                     (flink-run-cli-args test)
+                                     [(str install-dir "/" (last (str/split (:job-jar test) #"/")))
+                                      (:job-args test)])))))
+
+(defn first-node
+  [test]
+  (-> test :nodes sort first))
+
+(defn start-yarn-session!
+  [test]
+  (let [node (first-node test)]
+    (c/on node
+          (info "Starting YARN session from" node)
+          (c/su
+            (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+                                "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
+                                " " install-dir "/bin/yarn-session.sh -d -jm 2048m -tm 2048m")))
+            (submit-job! test)))))
+
+(defn start-yarn-job!
+  [test]
+  (c/on (first-node test)
+        (c/su
+          (submit-job! test))))
+
+;;; Mesos
+
+(defn flink-mesos-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        mesos (mesos/db deb-mesos-package deb-marathon-package)
+        flink (flink-db test)]
+    (combined-db [hadoop zk mesos flink])))
+
+(defn submit-job-with-retry!
+  [test]
+  (fu/retry
+    (partial submit-job! test)
+    :fallback (fn [e] (do
+                        (fatal e "Could not submit job.")
+                        (System/exit 1)))))
+
+(defn start-mesos-session!
+  [test]
+  (c/su
+    (let [r (fu/retry (fn []
+                        (http/post
+                          (str (mesos/marathon-base-url test) "/v2/apps")
+                          {:form-params  {:id   "flink"
+                                          :cmd  (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+                                                     "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+                                                     install-dir "/bin/mesos-appmaster.sh "
+                                                     "-Dmesos.master=" (zookeeper-uri
+                                                                         test
+                                                                         mesos/zk-namespace) " "
+                                                     "-Djobmanager.rpc.address=$(hostname -f) "
+                                                     "-Djobmanager.heap.mb=2048 "
+                                                     "-Djobmanager.rpc.port=6123 "
+                                                     "-Djobmanager.web.port=8081 "
+                                                     "-Dmesos.resourcemanager.tasks.mem=2048 "
+                                                     "-Dtaskmanager.heap.mb=2048 "
+                                                     "-Dtaskmanager.numberOfTaskSlots=2 "
+                                                     "-Dmesos.resourcemanager.tasks.cpus=1 "
+                                                     "-Drest.bind-address=$(hostname -f) ")
+                                          :cpus 1.0
+                                          :mem  2048}
+                           :content-type :json})))]
+      (info "Submitted Flink Application via Marathon" r)
+      (c/on (-> test :nodes sort first)
+            (submit-job-with-retry! test)))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/flink.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj b/flink-jepsen/src/jepsen/flink/flink.clj
new file mode 100644
index 0000000..d5d4157
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -0,0 +1,110 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.flink
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [cli :as cli]
+             [generator :as gen]
+             [tests :as tests]]
+            [jepsen.os.debian :as debian]
+            [jepsen.flink.client :refer :all]
+            [jepsen.flink.checker :as flink-checker]
+            [jepsen.flink.db :as fdb]
+            [jepsen.flink.nemesis :as fn])
+  (:import (jepsen.flink.client Client)))
+
+(def flink-test-config
+  {:yarn-session  {:db                  (fdb/flink-yarn-db)
+                   :deployment-strategy fdb/start-yarn-session!}
+   :yarn-job      {:db                  (fdb/flink-yarn-db)
+                   :deployment-strategy fdb/start-yarn-job!}
+   :mesos-session {:db                  (fdb/flink-mesos-db)
+                   :deployment-strategy fdb/start-mesos-session!}})
+
+(defn client-gen
+  []
+  (->
+    (cons {:type :invoke, :f :submit, :value nil}
+          (cycle [{:type :invoke, :f :job-running?, :value nil}
+                  (gen/sleep 5)]))
+    (gen/seq)
+    (gen/singlethreaded)))
+
+(defn flink-test
+  [opts]
+  (merge tests/noop-test
+         (let [{:keys [db deployment-strategy]} (-> opts :deployment-mode flink-test-config)
+               {:keys [job-running-healthy-threshold job-recovery-grace-period]} opts]
+           {:name      "Apache Flink"
+            :os        debian/os
+            :db        db
+            :nemesis   (fn/nemesis)
+            :model     (flink-checker/job-running-within-grace-period
+                         job-running-healthy-threshold
+                         job-recovery-grace-period)
+            :generator (let [stop (atom nil)]
+                         (->> (fn/stoppable-generator stop (client-gen))
+                              (gen/nemesis
+                                (fn/stop-generator stop
+                                                   ((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
+                                                   job-running-healthy-threshold
+                                                   job-recovery-grace-period))))
+            :client    (Client. deployment-strategy nil nil nil nil)
+            :checker   (flink-checker/job-running-checker)})
+         (assoc opts :concurrency 1)))
+
+(defn keys-as-allowed-values-help-text
+  "Takes a map and returns a string explaining which values are allowed.
+  This is a CLI helper function."
+  [m]
+  (->> (keys m)
+       (map name)
+       (clojure.string/join ", ")
+       (str "Must be one of: ")))
+
+(defn -main
+  [& args]
+  (cli/run!
+    (merge
+      (cli/single-test-cmd
+        {:test-fn  flink-test
+         :tarball  fdb/default-flink-dist-url
+         :opt-spec [[nil "--ha-storage-dir DIR" "high-availability.storageDir"]
+                    [nil "--job-jar JAR" "Path to the job jar"]
+                    [nil "--job-args ARGS" "CLI arguments for the flink job"]
+                    [nil "--main-class CLASS" "Job main class"]
+                    [nil "--nemesis-gen GEN" (str "Which nemesis should be used?"
+                                                  (keys-as-allowed-values-help-text fn/nemesis-generator-factories))
+                     :parse-fn keyword
+                     :default :kill-task-managers
+                     :validate [#(fn/nemesis-generator-factories (keyword %))
+                                (keys-as-allowed-values-help-text fn/nemesis-generator-factories)]]
+                    [nil "--deployment-mode MODE" (keys-as-allowed-values-help-text flink-test-config)
+                     :parse-fn keyword
+                     :default :yarn-session
+                     :validate [#(flink-test-config (keyword %))
+                                (keys-as-allowed-values-help-text flink-test-config)]]
+                    [nil "--job-running-healthy-threshold TIMES" "Number of consecutive times the job must be running to be considered healthy."
+                     :default 5
+                     :parse-fn #(Long/parseLong %)
+                     :validate [pos? "Must be positive"]]
+                    [nil "--job-recovery-grace-period SECONDS" "Time period in which the job must become healthy."
+                     :default 180
+                     :parse-fn #(Long/parseLong %)
+                     :validate [pos? "Must be positive" (fn [v] (<= 60 v)) "Should be greater than 60"]]]})
+      (cli/serve-cmd))
+    args))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/generator.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/generator.clj b/flink-jepsen/src/jepsen/flink/generator.clj
new file mode 100644
index 0000000..af928c4
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/generator.clj
@@ -0,0 +1,39 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.generator
+  (:require [jepsen.util :as util]
+            [jepsen.generator :as gen]))
+
+(gen/defgenerator TimeLimitGen
+                  [dt source deadline-atom]
+                  [dt (when-let [deadline @deadline-atom]
+                        (util/nanos->secs deadline)) source]
+                  (gen/op [_ test process]
+                          (compare-and-set! deadline-atom nil (+ (util/linear-time-nanos)
+                                                                 (util/secs->nanos dt)))
+                          (when (<= (util/linear-time-nanos) @deadline-atom)
+                            (gen/op source test process))))
+
+;; In Jepsen 0.1.9 jepsen.generator/time-limit was re-written to interrupt Threads.
+;; Unfortunately the logic has race conditions which can cause spurious failures
+;; (https://github.com/jepsen-io/jepsen/issues/268).
+;;
+;; In our tests we do not need interrupts. Therefore, we use a time-limit implementation that is
+;; similar to the one shipped with Jepsen 0.1.8.
+(defn time-limit
+  [dt source]
+  (TimeLimitGen. dt source (atom nil)))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/hadoop.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/hadoop.clj b/flink-jepsen/src/jepsen/flink/hadoop.clj
new file mode 100644
index 0000000..f633d07
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/hadoop.clj
@@ -0,0 +1,139 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.hadoop
+  (:require [clojure.data.xml :as xml]
+            [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]
+             [db :as db]]
+            [jepsen.control.util :as cu]))
+
+(def install-dir "/opt/hadoop")
+(def hadoop-conf-dir (str install-dir "/etc/hadoop"))
+(def yarn-log-dir "/tmp/logs/yarn")
+
+(defn name-node
+  [nodes]
+  (first (sort nodes)))
+
+(defn resource-manager
+  [nodes]
+  (second (sort nodes)))
+
+(defn data-nodes
+  [nodes]
+  (drop 2 (sort nodes)))
+
+(defn yarn-site-config
+  [test]
+  {:yarn.resourcemanager.hostname        (resource-manager (:nodes test))
+   :yarn.log-aggregation-enable          "true"
+   :yarn.nodemanager.resource.cpu-vcores "8"
+   :yarn.resourcemanager.am.max-attempts "99999"
+   :yarn.nodemanager.log-dirs            yarn-log-dir})
+
+(defn core-site-config
+  [test]
+  {:fs.defaultFS (str "hdfs://" (name-node (:nodes test)) ":9000")})
+
+(defn property-value
+  [property value]
+  (xml/element :property {}
+               [(xml/element :name {} (name property))
+                (xml/element :value {} value)]))
+
+(defn write-config!
+  [^String config-file config]
+  (info "Writing config" config-file)
+  (let [config-xml (xml/indent-str
+                     (xml/element :configuration
+                                  {}
+                                  (map (fn [[k v]] (property-value k v)) (seq config))))]
+    (c/exec :echo config-xml :> config-file)
+    ))
+
+(defn start-name-node!
+  [test node]
+  (when (= node (name-node (:nodes test)))
+    (info "Start NameNode daemon.")
+    (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :start :namenode)))
+
+(defn start-name-node-formatted!
+  [test node]
+  (when (= node (name-node (:nodes test)))
+    (info "Format HDFS")
+    (c/exec (str install-dir "/bin/hdfs") :namenode :-format :-force :-clusterId "0000000")
+    (start-name-node! test node)))
+
+(defn stop-name-node!
+  []
+  (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :stop :namenode))
+
+(defn start-data-node!
+  [test node]
+  (when (some #{node} (data-nodes (:nodes test)))
+    (info "Start DataNode")
+    (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :start :datanode)))
+
+(defn start-resource-manager!
+  [test node]
+  (when (= node (resource-manager (:nodes test)))
+    (info "Start ResourceManager")
+    (c/exec (str install-dir "/sbin/yarn-daemon.sh") :--config hadoop-conf-dir :start :resourcemanager)))
+
+(defn start-node-manager!
+  [test node]
+  (when (some #{node} (data-nodes (:nodes test)))
+    (info "Start NodeManager")
+    (c/exec (str install-dir "/sbin/yarn-daemon.sh") :--config hadoop-conf-dir :start :nodemanager)))
+
+(defn find-files!
+  [dir]
+  (->>
+    (clojure.string/split (c/exec :find dir :-type :f) #"\n")
+    (remove clojure.string/blank?)))
+
+(defn db
+  [url]
+  (reify db/DB
+    (setup! [_ test node]
+      (info "Install Hadoop from" url)
+      (c/su
+        (cu/install-archive! url install-dir)
+        (write-config! (str install-dir "/etc/hadoop/yarn-site.xml") (yarn-site-config test))
+        (write-config! (str install-dir "/etc/hadoop/core-site.xml") (core-site-config test))
+        (c/exec :echo (c/lit "export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64") :>> (str install-dir "/etc/hadoop/hadoop-env.sh"))
+        (start-name-node-formatted! test node)
+        (start-data-node! test node)
+        (start-resource-manager! test node)
+        (start-node-manager! test node)))
+
+    (teardown! [_ test node]
+      (info "Teardown Hadoop")
+      (c/su
+        (cu/grepkill! "hadoop")
+        (c/exec (c/lit (str "rm -rf /tmp/hadoop-* ||:")))))
+
+    db/LogFiles
+    (log-files [_ _ _]
+      (c/su
+        (concat (find-files! (str install-dir "/logs"))
+                (if (cu/exists? yarn-log-dir)
+                  (do
+                    (c/exec :chmod :-R :777 yarn-log-dir)
+                    (find-files! yarn-log-dir))
+                  []))))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/mesos.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/mesos.clj b/flink-jepsen/src/jepsen/flink/mesos.clj
new file mode 100644
index 0000000..74b2c0d
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/mesos.clj
@@ -0,0 +1,165 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.mesos
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]
+             [db :as db]
+             [util :as util :refer [meh]]]
+            [jepsen.control.util :as cu]
+            [jepsen.os.debian :as debian]
+            [jepsen.flink.zookeeper :refer [zookeeper-uri]]))
+
+;;; Mesos
+
+(def master-count 1)
+(def master-pidfile "/var/run/mesos/master.pid")
+(def slave-pidfile "/var/run/mesos/slave.pid")
+(def master-dir "/var/lib/mesos/master")
+(def slave-dir "/var/lib/mesos/slave")
+(def log-dir "/var/log/mesos")
+(def master-bin "/usr/sbin/mesos-master")
+(def slave-bin "/usr/sbin/mesos-slave")
+(def zk-namespace :mesos)
+
+;;; Marathon
+
+(def marathon-bin "/usr/bin/marathon")
+(def zk-marathon-namespace "marathon")
+(def marathon-pidfile "/var/run/mesos/marathon.pid")
+(def marathon-rest-port 8080)
+
+(defn install!
+  [test node mesos-version marathon-version]
+  (c/su
+    (debian/add-repo! :mesosphere
+                      "deb http://repos.mesosphere.com/debian jessie main"
+                      "keyserver.ubuntu.com"
+                      "E56151BF")
+    (debian/install {:mesos    mesos-version
+                     :marathon marathon-version})
+    (c/exec :mkdir :-p "/var/run/mesos")
+    (c/exec :mkdir :-p master-dir)
+    (c/exec :mkdir :-p slave-dir)))
+
+;;; Mesos functions
+
+(defn start-master!
+  [test node]
+  (when (some #{node} (take master-count (sort (:nodes test))))
+    (info node "Starting mesos master")
+    (c/su
+      (c/exec :start-stop-daemon
+              :--background
+              :--chdir master-dir
+              :--exec "/usr/bin/env"
+              :--make-pidfile
+              :--no-close
+              :--oknodo
+              :--pidfile master-pidfile
+              :--start
+              :--
+              "GLOG_v=1"
+              master-bin
+              (str "--hostname=" (name node))
+              (str "--log_dir=" log-dir)
+              (str "--offer_timeout=30secs")
+              (str "--quorum=" (util/majority master-count))
+              (str "--registry_fetch_timeout=120secs")
+              (str "--registry_store_timeout=5secs")
+              (str "--work_dir=" master-dir)
+              (str "--zk=" (zookeeper-uri test zk-namespace))
+              :>> (str log-dir "/master.stdout")
+              (c/lit "2>&1")))))
+
+(defn start-slave!
+  [test node]
+  (when-not (some #{node} (take master-count (sort (:nodes test))))
+    (info node "Starting mesos slave")
+    (c/su
+      (c/exec :start-stop-daemon :--start
+              :--background
+              :--chdir slave-dir
+              :--exec slave-bin
+              :--make-pidfile
+              :--no-close
+              :--pidfile slave-pidfile
+              :--oknodo
+              :--
+              (str "--hostname=" (name node))
+              (str "--log_dir=" log-dir)
+              (str "--master=" (zookeeper-uri test zk-namespace))
+              (str "--recovery_timeout=30secs")
+              (str "--work_dir=" slave-dir)
+              :>> (str log-dir "/slave.stdout")
+              (c/lit "2>&1")))))
+
+(defn stop-master!
+  [node]
+  (info node "Stopping mesos master")
+  (meh (c/exec :killall :-9 :mesos-master))
+  (meh (c/exec :rm :-rf master-pidfile)))
+
+(defn stop-slave!
+  [node]
+  (info node "Stopping mesos slave")
+  (meh (c/exec :killall :-9 :mesos-slave))
+  (meh (c/exec :rm :-rf slave-pidfile)))
+
+;;; Marathon functions
+
+(defn start-marathon!
+  [test node]
+  (when (= node (first (sort (:nodes test))))
+    (info "Start marathon")
+    (c/su
+      (c/exec :start-stop-daemon :--start
+              :--background
+              :--exec marathon-bin
+              :--make-pidfile
+              :--no-close
+              :--pidfile marathon-pidfile
+              :--
+              (c/lit (str "--hostname " node))
+              (c/lit (str "--master " (zookeeper-uri test zk-namespace)))
+              (c/lit (str "--zk " (zookeeper-uri test zk-marathon-namespace)))
+              :>> (str log-dir "/marathon.stdout")
+              (c/lit "2>&1")))))
+
+(defn stop-marathon!
+  []
+  (cu/grepkill! "marathon"))
+
+(defn marathon-base-url
+  [test]
+  (str "http://" (first (sort (:nodes test))) ":" marathon-rest-port))
+
+(defn db
+  [mesos-version marathon-version]
+  (reify db/DB
+    (setup! [this test node]
+      (install! test node mesos-version marathon-version)
+      (start-master! test node)
+      (start-slave! test node)
+      (start-marathon! test node))
+    (teardown! [this test node]
+      (stop-slave! node)
+      (stop-master! node)
+      (stop-marathon!))
+    db/LogFiles
+    (log-files [_ test node]
+      (if (cu/exists? log-dir) (cu/ls-full log-dir) []))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/nemesis.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj b/flink-jepsen/src/jepsen/flink/nemesis.clj
new file mode 100644
index 0000000..3047eeb
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -0,0 +1,163 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.nemesis
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]
+             [generator :as gen]
+             [nemesis :as nemesis]
+             [util :as ju]]
+            [jepsen.control.util :as cu]
+            [jepsen.flink.client :refer :all]
+            [jepsen.flink.checker :as flink-checker]
+            [jepsen.flink.generator :as fgen]
+            [jepsen.flink.hadoop :as fh]
+            [jepsen.flink.zookeeper :refer :all]))
+
+(def job-submit-grace-period
+  "Period after job submission in which job managers must not fail."
+  60)
+
+(defn kill-processes
+  ([pattern] (kill-processes rand-nth pattern))
+  ([targeter pattern]
+   (reify nemesis/Nemesis
+     (setup! [this test] this)
+     (invoke! [this test op]
+       (let [nodes (-> test :nodes targeter ju/coll)]
+         (c/on-many nodes
+                    (c/su (cu/grepkill! pattern)))
+         (assoc op :value nodes)))
+     (teardown! [this test]))))
+
+(defn- non-empty-random-sample
+  [coll]
+  (let [sample (random-sample 0.5 coll)]
+    (if (empty? sample)
+      (first (shuffle coll))
+      sample)))
+
+(defn kill-taskmanager
+  ([] (kill-taskmanager identity))
+  ([targeter]
+   (kill-processes targeter "TaskExecutorRunner")))
+
+(defn kill-jobmanager
+  []
+  (kill-processes identity "ClusterEntrypoint"))
+
+(defn start-stop-name-node
+  "Nemesis stopping and starting the HDFS NameNode."
+  []
+  (nemesis/node-start-stopper
+    fh/name-node
+    (fn [test node] (c/su (fh/stop-name-node!)))
+    (fn [test node] (c/su (fh/start-name-node! test node)))))
+
+;;; Generators
+
+(defn stoppable-generator
+  [stop source]
+  (reify gen/Generator
+    (op [gen test process]
+      (if @stop
+        nil
+        (gen/op source test process)))))
+
+(defn take-last-with-default
+  [n default coll]
+  (->>
+    (cycle [default])
+    (concat (reverse coll))
+    (take n)
+    (reverse)))
+
+(defn stop-generator
+  [stop source job-running-healthy-threshold job-recovery-grace-period]
+  (gen/concat source
+              (let [t (atom nil)]
+                (reify gen/Generator
+                  (op [_ test process]
+                    (when (nil? @t)
+                      (compare-and-set! t nil (ju/relative-time-nanos)))
+                    (let [history (->>
+                                    (:active-histories test)
+                                    deref
+                                    first
+                                    deref)
+                          job-running-history (->>
+                                                history
+                                                (filter (fn [op] (>= (- (:time op) @t) 0)))
+                                                (flink-checker/get-job-running-history)
+                                                (take-last-with-default job-running-healthy-threshold false))]
+                      (if (or
+                            (and
+                              (every? true? job-running-history))
+                            (> (ju/relative-time-nanos) (+ @t (ju/secs->nanos job-recovery-grace-period))))
+                        (do
+                          (reset! stop true)
+                          nil)
+                        (do
+                          (Thread/sleep 1000)
+                          (recur test process)))))))))
+
+(defn kill-taskmanagers-gen
+  [time-limit dt op]
+  (fgen/time-limit time-limit (gen/stagger dt (gen/seq (cycle [{:type :info, :f op}])))))
+
+(defn kill-taskmanagers-bursts-gen
+  [time-limit]
+  (fgen/time-limit time-limit
+                  (gen/seq (cycle (concat (repeat 20 {:type :info, :f :kill-task-managers})
+                                          [(gen/sleep 300)])))))
+
+(defn kill-jobmanagers-gen
+  [time-limit]
+  (fgen/time-limit (+ time-limit job-submit-grace-period)
+                  (gen/seq (cons (gen/sleep job-submit-grace-period)
+                                 (cycle [{:type :info, :f :kill-job-manager}])))))
+
+(defn fail-name-node-during-recovery
+  []
+  (gen/seq [(gen/sleep job-submit-grace-period)
+            {:type :info, :f :partition-start}
+            {:type :info, :f :fail-name-node-start}
+            (gen/sleep 20)
+            {:type :info, :f :partition-stop}
+            (gen/sleep 60)
+            {:type :info, :f :fail-name-node-stop}]))
+
+(def nemesis-generator-factories
+  {:kill-task-managers             (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-task-managers))
+   :kill-single-task-manager       (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-single-task-manager))
+   :kill-random-task-managers      (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-random-task-managers))
+   :kill-task-managers-bursts      (fn [opts] (kill-taskmanagers-bursts-gen (:time-limit opts)))
+   :kill-job-managers              (fn [opts] (kill-jobmanagers-gen (:time-limit opts)))
+   :fail-name-node-during-recovery (fn [_] (fail-name-node-during-recovery))
+   :utopia                         (fn [_] (gen/sleep 60))})
+
+(defn nemesis
+  []
+  (nemesis/compose
+    {{:partition-start :start
+      :partition-stop  :stop}            (nemesis/partition-random-halves)
+     {:fail-name-node-start :start
+      :fail-name-node-stop  :stop}       (start-stop-name-node)
+     {:kill-task-managers :start}        (kill-taskmanager)
+     {:kill-single-task-manager :start}  (kill-taskmanager (fn [coll] (rand-nth coll)))
+     {:kill-random-task-managers :start} (kill-taskmanager non-empty-random-sample)
+     {:kill-job-manager :start}          (kill-jobmanager)}))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/utils.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj b/flink-jepsen/src/jepsen/flink/utils.clj
new file mode 100644
index 0000000..3fd9f96
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -0,0 +1,48 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.utils
+  (:require [clojure.tools.logging :refer :all]))
+
+(defn retry
+  "Runs a function op and retries on exception.
+
+  The following options are supported:
+
+  :on-retry - A function called for every retry with an exception and the attempt number as arguments.
+  :success - A function called with the result of op.
+  :fallback – A function with an exception as the first argument that is called if all retries are exhausted.
+  :retries - Number of total retries.
+  :delay – The time between retries."
+  ([op & {:keys [on-retry success fallback retries delay]
+          :or   {on-retry (fn [exception attempt] (warn "Retryable operation failed:"
+                                                        (.getMessage exception)))
+                 success  identity
+                 fallback :default
+                 retries  10
+                 delay    2000}
+          :as   keys}]
+   (let [r (try
+             (op)
+             (catch Exception e (if (< 0 retries)
+                                  {:exception e}
+                                  (fallback e))))]
+     (if (:exception r)
+       (do
+         (on-retry (:exception r) retries)
+         (Thread/sleep delay)
+         (recur op (assoc keys :retries (dec retries))))
+       (success r)))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/zookeeper.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/zookeeper.clj b/flink-jepsen/src/jepsen/flink/zookeeper.clj
new file mode 100644
index 0000000..8b4c319
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/zookeeper.clj
@@ -0,0 +1,29 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.zookeeper)
+
+(defn zookeeper-quorum
+  "Returns the zk quorum string, e.g., host1:2181,host2:2181"
+  [test]
+  (->> test
+       :nodes
+       (map #(str % ":2181"))
+       (clojure.string/join ",")))
+
+(defn zookeeper-uri
+  ([test] (zookeeper-uri test ""))
+  ([test namespace] (str "zk://" (zookeeper-quorum test) "/" (name namespace))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/checker_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj b/flink-jepsen/test/jepsen/flink/checker_test.clj
new file mode 100644
index 0000000..7389bbc
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -0,0 +1,82 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.checker-test
+  (:require [clojure.test :refer :all]
+            [jepsen
+             [checker :as checker]]
+            [jepsen.flink.checker :refer :all]))
+
+(deftest get-job-running-history-test
+  (let [history [{:type :info, :f :kill-random-subset-task-managers, :process :nemesis, :time 121898381144, :value '("172.31.33.170")}
+                 {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127443701575}
+                 {:type :ok, :f :job-running?, :value false, :process 0, :time 127453553462}
+                 {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127453553463}
+                 {:type :ok, :f :job-running?, :value true, :process 0, :time 127453553464}]]
+    (is (= (get-job-running-history history) [false true]))))
+
+(deftest job-running-checker-test
+  (let [checker (job-running-checker)
+        test {}
+        model (job-running-within-grace-period 3 60)
+        opts {}
+        check (fn [history] (checker/check checker test model history opts))]
+    (testing "Job is not running after grace period."
+      (is (= (:valid? (check
+                        [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}])) false)))
+    (testing "Job is running after grace period."
+      (is (= (:valid? (check
+                        [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000001}])) true)))
+    (testing "Should tolerate non-running job during failures."
+      (is (= (:valid? (check
+                        [{:type :info, :f :partition-start, :process :nemesis, :time -1}
+                         {:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
+                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
+                         {:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
+                         {:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true)))
+    (testing "Should respect healthy threshold."
+      (is (= (:valid? (check
+                        [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
+                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000003}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true))
+      (is (= (:valid? (check
+                        [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000002}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) false)))
+    (testing "Job was not deployed successfully."
+      (is (= (:valid? (check [{:type :invoke, :f :job-running?, :value nil, :process 45, :time 239150413307}
+                              {:type :info, :f :job-running?, :value nil, :process 45, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])) false)))))
+
+(deftest safe-inc-test
+  (is (= (safe-inc nil) 1))
+  (is (= (safe-inc 1) 2)))
+
+(deftest nemeses-active?-test
+  (is (= (nemeses-active? {:partition-start 2 :fail-name-node-start 0}) true))
+  (is (= (nemeses-active? {:partition-start 0}) false)))
+
+(deftest dissoc-if-test
+  (is (= (:a (dissoc-if #(-> (first %) (= :b)) {:a 1 :b 2})) 1)))
+
+(deftest zero-value?-test
+  (is (= (zero-value? [:test 0]) true))
+  (is (= (zero-value? [:test 1]) false)))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/client_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/client_test.clj b/flink-jepsen/test/jepsen/flink/client_test.clj
new file mode 100644
index 0000000..b4373bf
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/client_test.clj
@@ -0,0 +1,37 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.client-test
+  (:require [clojure.test :refer :all]
+            [clj-http.fake :as fake]
+            [jepsen.flink.client :refer :all]))
+
+(deftest read-url-test
+  (is (= "https://www.asdf.de" (read-url (byte-array [0xAC 0xED 0x00 0x05 0x77 0x15 0x00 0x13 0x68 0x74 0x74 0x70 0x73 0x3A 0x2F 0x2F 0x77 0x77 0x77 0x2E 0x61 0x73 0x64 0x66 0x2E 0x64 0x65])))))
+
+(deftest job-running?-test
+  (fake/with-fake-routes
+    {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c"
+     (fn [request] {:status  200
+                    :headers {}
+                    :body    "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"w
 rite-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd544
 06c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
+     "http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196d"
+     (fn [request] {:status  200
+                    :headers {}
+                    :body    "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"CREATED\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"w
 rite-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd544
 06c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})}
+
+    (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196c") true))
+    (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196d") false))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/utils_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/utils_test.clj b/flink-jepsen/test/jepsen/flink/utils_test.clj
new file mode 100644
index 0000000..607f90d
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/utils_test.clj
@@ -0,0 +1,39 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.utils-test
+  (:require [clojure.test :refer :all])
+  (:require [jepsen.flink.utils :refer [retry]]))
+
+(deftest retry-test
+  (testing "Single failure then result."
+    (let [counter (atom 0)
+          failing-once (fn [] (if (= @counter 0)
+                                (do (swap! counter inc)
+                                    (throw (Exception. "Expected")))
+                                "result"))]
+      (is (= "result" (retry failing-once :delay 0)))))
+
+  (testing "Exhaust all attempts."
+    (let [failing-always (fn [] (throw (Exception. "Expected")))]
+      (is (nil? (retry failing-always :retries 1 :delay 0)))))
+
+  (testing "Propagate exception."
+    (let [failing-always (fn [] (throw (Exception. "Expected")))]
+      (is (thrown-with-msg? Exception #"Expected" (retry failing-always
+                                                         :retries 1
+                                                         :delay 0
+                                                         :fallback (fn [e] (throw e))))))))


[3/6] flink git commit: [FLINK-9004][tests] Implement Jepsen tests to test job availability.

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/zookeeper_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/zookeeper_test.clj b/flink-jepsen/test/jepsen/flink/zookeeper_test.clj
new file mode 100644
index 0000000..b5cc3d5
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/zookeeper_test.clj
@@ -0,0 +1,28 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.zookeeper-test
+  (:require [clojure.test :refer :all])
+  (:require [jepsen.flink.zookeeper :refer :all]))
+
+(deftest zookeeper-quorum-test
+  (is (= (zookeeper-quorum {:nodes ["n1" "n2" "n3"]}) "n1:2181,n2:2181,n3:2181")))
+
+(deftest zookeeper-uri-test
+  (is (= (zookeeper-uri {:nodes ["n1" "n2" "n3"]}) "zk://n1:2181,n2:2181,n3:2181/"))
+  (is (= (zookeeper-uri {:nodes ["n1"]}) "zk://n1:2181/"))
+  (is (= (zookeeper-uri {:nodes ["n1" "n2" "n3"]} :mesos) "zk://n1:2181,n2:2181,n3:2181/mesos"))
+  (is (= (zookeeper-uri {:nodes ["n1" "n2" "n3"]} "mesos") "zk://n1:2181,n2:2181,n3:2181/mesos")))


[5/6] flink git commit: [FLINK-9807][tests] Parameterize EventTimeWindowCheckpointITCase & LocalRecoveryITCase

Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index e153b4b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for memory backend.
- */
-public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
-	@Override
-	protected StateBackendEnum getStateBackend() {
-		return StateBackendEnum.MEM;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index e6d5b9e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for fully synchronous RocksDB backend.
- */
-public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
-	@Override
-	protected StateBackendEnum getStateBackend() {
-		return StateBackendEnum.ROCKSDB_FULLY_ASYNC;
-	}
-
-	@Override
-	protected int numElementsPerKey() {
-		return 3000;
-	}
-
-	@Override
-	protected int windowSize() {
-		return 1000;
-	}
-
-	@Override
-	protected int windowSlide() {
-		return 100;
-	}
-
-	@Override
-	protected int numKeys() {
-		return 100;
-	}
-}


[6/6] flink git commit: [FLINK-9807][tests] Parameterize EventTimeWindowCheckpointITCase & LocalRecoveryITCase

Posted by tr...@apache.org.
[FLINK-9807][tests] Parameterize EventTimeWindowCheckpointITCase & LocalRecoveryITCase

This closes #6305.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc49801d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc49801d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc49801d

Branch: refs/heads/master
Commit: fc49801d4723413d3a09ecb85d60d39078056c68
Parents: 37abf46
Author: klion26 <qc...@gmail.com>
Authored: Sun Jul 8 11:52:41 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200

----------------------------------------------------------------------
 ...tractEventTimeWindowCheckpointingITCase.java | 889 ------------------
 .../AbstractLocalRecoveryITCase.java            |  99 --
 ...ckendEventTimeWindowCheckpointingITCase.java |  30 -
 ...ckendEventTimeWindowCheckpointingITCase.java |  29 -
 .../EventTimeAllWindowCheckpointingITCase.java  |   2 +-
 .../EventTimeWindowCheckpointingITCase.java     | 931 +++++++++++++++++++
 ...ckendEventTimeWindowCheckpointingITCase.java |  29 -
 ...ckendEventTimeWindowCheckpointingITCase.java |  50 -
 ...ckendEventTimeWindowCheckpointingITCase.java |  50 -
 .../checkpointing/LocalRecoveryHeapITCase.java  |  30 -
 .../test/checkpointing/LocalRecoveryITCase.java | 110 +++
 .../LocalRecoveryRocksDBFullITCase.java         |  30 -
 .../LocalRecoveryRocksDBIncrementalITCase.java  |  30 -
 ...ckendEventTimeWindowCheckpointingITCase.java |  30 -
 ...ckendEventTimeWindowCheckpointingITCase.java |  50 -
 15 files changed, 1042 insertions(+), 1347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index df74450..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,889 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.curator.test.TestingServer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * This verifies that checkpointing works correctly with event time windows. This is more
- * strict than {@link WindowCheckpointingITCase} because for event-time the contents
- * of the emitted windows are deterministic.
- *
- * <p>Split into multiple test classes in order to decrease the runtime per backend
- * and not run into CI infrastructure limits like no std output being emitted for
- * I/O heavy variants.
- */
-@SuppressWarnings("serial")
-public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLogger {
-
-	private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
-	private static final int PARALLELISM = 4;
-
-	private TestingServer zkServer;
-
-	public MiniClusterResource miniClusterResource;
-
-	@ClassRule
-	public static TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Rule
-	public TestName name = new TestName();
-
-	private AbstractStateBackend stateBackend;
-
-	enum StateBackendEnum {
-		MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
-	}
-
-	protected abstract StateBackendEnum getStateBackend();
-
-	protected final MiniClusterResource getMiniClusterResource() {
-		return new MiniClusterResource(
-			new MiniClusterResourceConfiguration.Builder()
-				.setConfiguration(getConfigurationSafe())
-				.setNumberTaskManagers(2)
-				.setNumberSlotsPerTaskManager(PARALLELISM / 2)
-				.build());
-	}
-
-	private Configuration getConfigurationSafe() {
-		try {
-			return getConfiguration();
-		} catch (Exception e) {
-			throw new AssertionError("Could not initialize test.", e);
-		}
-	}
-
-	private Configuration getConfiguration() throws Exception {
-
-		// print a message when starting a test method to avoid Travis' <tt>"Maven produced no
-		// output for xxx seconds."</tt> messages
-		System.out.println(
-			"Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
-
-		// Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
-		StateBackendEnum stateBackendEnum = getStateBackend();
-		if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
-			zkServer = new TestingServer();
-			zkServer.start();
-		}
-
-		Configuration config = createClusterConfig();
-
-		switch (stateBackendEnum) {
-			case MEM:
-				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
-				break;
-			case FILE: {
-				String backups = tempFolder.newFolder().getAbsolutePath();
-				this.stateBackend = new FsStateBackend("file://" + backups, false);
-				break;
-			}
-			case MEM_ASYNC:
-				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
-				break;
-			case FILE_ASYNC: {
-				String backups = tempFolder.newFolder().getAbsolutePath();
-				this.stateBackend = new FsStateBackend("file://" + backups, true);
-				break;
-			}
-			case ROCKSDB_FULLY_ASYNC: {
-				String rocksDb = tempFolder.newFolder().getAbsolutePath();
-				String backups = tempFolder.newFolder().getAbsolutePath();
-				RocksDBStateBackend rdb = new RocksDBStateBackend(new FsStateBackend("file://" + backups));
-				rdb.setDbStoragePath(rocksDb);
-				this.stateBackend = rdb;
-				break;
-			}
-			case ROCKSDB_INCREMENTAL:
-			case ROCKSDB_INCREMENTAL_ZK: {
-				String rocksDb = tempFolder.newFolder().getAbsolutePath();
-				String backups = tempFolder.newFolder().getAbsolutePath();
-				// we use the fs backend with small threshold here to test the behaviour with file
-				// references, not self contained byte handles
-				RocksDBStateBackend rdb =
-					new RocksDBStateBackend(
-						new FsStateBackend(
-							new Path("file://" + backups).toUri(), 16),
-						true);
-				rdb.setDbStoragePath(rocksDb);
-				this.stateBackend = rdb;
-				break;
-			}
-			default:
-				throw new IllegalStateException("No backend selected.");
-		}
-		return config;
-	}
-
-	protected Configuration createClusterConfig() throws IOException {
-		TemporaryFolder temporaryFolder = new TemporaryFolder();
-		temporaryFolder.create();
-		final File haDir = temporaryFolder.newFolder();
-
-		Configuration config = new Configuration();
-		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
-		// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
-		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
-		config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
-
-		if (zkServer != null) {
-			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
-			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
-			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
-		}
-		return config;
-	}
-
-	@Before
-	public void setupTestCluster() throws Exception {
-		miniClusterResource = getMiniClusterResource();
-		miniClusterResource.before();
-	}
-
-	@After
-	public void stopTestCluster() throws IOException {
-		if (miniClusterResource != null) {
-			miniClusterResource.after();
-			miniClusterResource = null;
-		}
-
-		if (zkServer != null) {
-			zkServer.stop();
-			zkServer = null;
-		}
-
-		//Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no output
-		// for xxx seconds."</tt> messages.
-		System.out.println(
-			"Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testTumblingTimeWindow() {
-		final int numElementsPerKey = numElementsPerKey();
-		final int windowSize = windowSize();
-		final int numKeys = numKeys();
-		FailingSource.reset();
-
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(PARALLELISM);
-			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
-			env.getConfig().disableSysoutLogging();
-			env.setStateBackend(this.stateBackend);
-			env.getConfig().setUseSnapshotCompression(true);
-
-			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
-					.rebalance()
-					.keyBy(0)
-					.timeWindow(Time.of(windowSize, MILLISECONDS))
-					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
-						private boolean open = false;
-
-						@Override
-						public void open(Configuration parameters) {
-							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-							open = true;
-						}
-
-						@Override
-						public void apply(
-								Tuple tuple,
-								TimeWindow window,
-								Iterable<Tuple2<Long, IntType>> values,
-								Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
-							// validate that the function has been opened properly
-							assertTrue(open);
-
-							int sum = 0;
-							long key = -1;
-
-							for (Tuple2<Long, IntType> value : values) {
-								sum += value.f1.value;
-								key = value.f0;
-							}
-							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
-						}
-					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
-
-			tryExecute(env, "Tumbling Window Test");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
-		doTestTumblingTimeWindowWithKVState(PARALLELISM);
-	}
-
-	@Test
-	public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
-		doTestTumblingTimeWindowWithKVState(1 << 15);
-	}
-
-	public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
-		final int numElementsPerKey = numElementsPerKey();
-		final int windowSize = windowSize();
-		final int numKeys = numKeys();
-		FailingSource.reset();
-
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(PARALLELISM);
-			env.setMaxParallelism(maxParallelism);
-			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
-			env.getConfig().disableSysoutLogging();
-			env.setStateBackend(this.stateBackend);
-			env.getConfig().setUseSnapshotCompression(true);
-
-			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
-					.rebalance()
-					.keyBy(0)
-					.timeWindow(Time.of(windowSize, MILLISECONDS))
-					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
-						private boolean open = false;
-
-						private ValueState<Integer> count;
-
-						@Override
-						public void open(Configuration parameters) {
-							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-							open = true;
-							count = getRuntimeContext().getState(
-									new ValueStateDescriptor<>("count", Integer.class, 0));
-						}
-
-						@Override
-						public void apply(
-								Tuple tuple,
-								TimeWindow window,
-								Iterable<Tuple2<Long, IntType>> values,
-								Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
-
-							// the window count state starts with the key, so that we get
-							// different count results for each key
-							if (count.value() == 0) {
-								count.update(tuple.<Long>getField(0).intValue());
-							}
-
-							// validate that the function has been opened properly
-							assertTrue(open);
-
-							count.update(count.value() + 1);
-							out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
-						}
-					})
-					.addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
-
-			tryExecute(env, "Tumbling Window Test");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSlidingTimeWindow() {
-		final int numElementsPerKey = numElementsPerKey();
-		final int windowSize = windowSize();
-		final int windowSlide = windowSlide();
-		final int numKeys = numKeys();
-		FailingSource.reset();
-
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setMaxParallelism(2 * PARALLELISM);
-			env.setParallelism(PARALLELISM);
-			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
-			env.getConfig().disableSysoutLogging();
-			env.setStateBackend(this.stateBackend);
-			env.getConfig().setUseSnapshotCompression(true);
-
-			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
-					.rebalance()
-					.keyBy(0)
-					.timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
-					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
-						private boolean open = false;
-
-						@Override
-						public void open(Configuration parameters) {
-							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-							open = true;
-						}
-
-						@Override
-						public void apply(
-								Tuple tuple,
-								TimeWindow window,
-								Iterable<Tuple2<Long, IntType>> values,
-								Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
-							// validate that the function has been opened properly
-							assertTrue(open);
-
-							int sum = 0;
-							long key = -1;
-
-							for (Tuple2<Long, IntType> value : values) {
-								sum += value.f1.value;
-								key = value.f0;
-							}
-							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
-						}
-					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
-
-			tryExecute(env, "Tumbling Window Test");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPreAggregatedTumblingTimeWindow() {
-		final int numElementsPerKey = numElementsPerKey();
-		final int windowSize = windowSize();
-		final int numKeys = numKeys();
-		FailingSource.reset();
-
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(PARALLELISM);
-			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
-			env.getConfig().disableSysoutLogging();
-			env.setStateBackend(this.stateBackend);
-			env.getConfig().setUseSnapshotCompression(true);
-
-			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
-					.rebalance()
-					.keyBy(0)
-					.timeWindow(Time.of(windowSize, MILLISECONDS))
-					.reduce(
-							new ReduceFunction<Tuple2<Long, IntType>>() {
-
-								@Override
-								public Tuple2<Long, IntType> reduce(
-										Tuple2<Long, IntType> a,
-										Tuple2<Long, IntType> b) {
-									return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
-								}
-							},
-							new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
-						private boolean open = false;
-
-						@Override
-						public void open(Configuration parameters) {
-							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-							open = true;
-						}
-
-						@Override
-						public void apply(
-								Tuple tuple,
-								TimeWindow window,
-								Iterable<Tuple2<Long, IntType>> input,
-								Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
-							// validate that the function has been opened properly
-							assertTrue(open);
-
-							for (Tuple2<Long, IntType> in: input) {
-								out.collect(new Tuple4<>(in.f0,
-										window.getStart(),
-										window.getEnd(),
-										in.f1));
-							}
-						}
-					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
-
-			tryExecute(env, "Tumbling Window Test");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPreAggregatedSlidingTimeWindow() {
-		final int numElementsPerKey = numElementsPerKey();
-		final int windowSize = windowSize();
-		final int windowSlide = windowSlide();
-		final int numKeys = numKeys();
-		FailingSource.reset();
-
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(PARALLELISM);
-			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-			env.enableCheckpointing(100);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
-			env.getConfig().disableSysoutLogging();
-			env.setStateBackend(this.stateBackend);
-			env.getConfig().setUseSnapshotCompression(true);
-
-			env
-					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
-					.rebalance()
-					.keyBy(0)
-					.timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
-					.reduce(
-							new ReduceFunction<Tuple2<Long, IntType>>() {
-
-								@Override
-								public Tuple2<Long, IntType> reduce(
-										Tuple2<Long, IntType> a,
-										Tuple2<Long, IntType> b) {
-
-									// validate that the function has been opened properly
-									return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
-								}
-							},
-							new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
-						private boolean open = false;
-
-						@Override
-						public void open(Configuration parameters) {
-							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
-							open = true;
-						}
-
-						@Override
-						public void apply(
-								Tuple tuple,
-								TimeWindow window,
-								Iterable<Tuple2<Long, IntType>> input,
-								Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
-							// validate that the function has been opened properly
-							assertTrue(open);
-
-							for (Tuple2<Long, IntType> in: input) {
-								out.collect(new Tuple4<>(in.f0,
-										window.getStart(),
-										window.getEnd(),
-										in.f1));
-							}
-						}
-					})
-					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
-
-			tryExecute(env, "Tumbling Window Test");
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
-			implements ListCheckpointed<Integer>, CheckpointListener {
-		private static volatile boolean failedBefore = false;
-
-		private final int numKeys;
-		private final int numElementsToEmit;
-		private final int failureAfterNumElements;
-
-		private volatile int numElementsEmitted;
-		private volatile int numSuccessfulCheckpoints;
-		private volatile boolean running = true;
-
-		private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
-			this.numKeys = numKeys;
-			this.numElementsToEmit = numElementsToEmitPerKey;
-			this.failureAfterNumElements = failureAfterNumElements;
-		}
-
-		@Override
-		public void open(Configuration parameters) {
-			// non-parallel source
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
-			// we loop longer than we have elements, to permit delayed checkpoints
-			// to still cause a failure
-			while (running) {
-
-				if (!failedBefore) {
-					// delay a bit, if we have not failed before
-					Thread.sleep(1);
-					if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
-						// cause a failure if we have not failed before and have reached
-						// enough completed checkpoints and elements
-						failedBefore = true;
-						throw new Exception("Artificial Failure");
-					}
-				}
-
-				if (numElementsEmitted < numElementsToEmit &&
-						(failedBefore || numElementsEmitted <= failureAfterNumElements)) {
-					// the function failed before, or we are in the elements before the failure
-					synchronized (ctx.getCheckpointLock()) {
-						int next = numElementsEmitted++;
-						for (long i = 0; i < numKeys; i++) {
-							ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
-						}
-						ctx.emitWatermark(new Watermark(next));
-					}
-				}
-				else {
-
-					// if our work is done, delay a bit to prevent busy waiting
-					Thread.sleep(1);
-				}
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-
-		@Override
-		public void notifyCheckpointComplete(long checkpointId) {
-			numSuccessfulCheckpoints++;
-		}
-
-		@Override
-		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.numElementsEmitted);
-		}
-
-		@Override
-		public void restoreState(List<Integer> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
-			}
-			this.numElementsEmitted = state.get(0);
-		}
-
-		public static void reset() {
-			failedBefore = false;
-		}
-	}
-
-	private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-			implements ListCheckpointed<HashMap<Long, Integer>> {
-
-		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
-		private final int numKeys;
-		private final int numWindowsExpected;
-
-		private ValidatingSink(int numKeys, int numWindowsExpected) {
-			this.numKeys = numKeys;
-			this.numWindowsExpected = numWindowsExpected;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			// this sink can only work with DOP 1
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-
-			// it can happen that a checkpoint happens when the complete success state is
-			// already set. In that case we restart with the final state and would never
-			// finish because no more elements arrive.
-			if (windowCounts.size() == numKeys) {
-				boolean seenAll = true;
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount != numWindowsExpected) {
-						seenAll = false;
-						break;
-					}
-				}
-				if (seenAll) {
-					throw new SuccessException();
-				}
-			}
-		}
-
-		@Override
-		public void close() throws Exception {
-			boolean seenAll = true;
-			if (windowCounts.size() == numKeys) {
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount < numWindowsExpected) {
-						seenAll = false;
-						break;
-					}
-				}
-			}
-			assertTrue("The sink must see all expected windows.", seenAll);
-		}
-
-		@Override
-		public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
-			// verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
-			// the sum should be "sum (start .. end-1)"
-
-			int expectedSum = 0;
-			for (long i = value.f1; i < value.f2; i++) {
-				// only sum up positive vals, to filter out the negative start of the
-				// first sliding windows
-				if (i > 0) {
-					expectedSum += i;
-				}
-			}
-
-			assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
-
-			Integer curr = windowCounts.get(value.f0);
-			if (curr != null) {
-				windowCounts.put(value.f0, curr + 1);
-			}
-			else {
-				windowCounts.put(value.f0, 1);
-			}
-
-			if (windowCounts.size() == numKeys) {
-				boolean seenAll = true;
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount < numWindowsExpected) {
-						seenAll = false;
-						break;
-					} else if (windowCount > numWindowsExpected) {
-						fail("Window count to high: " + windowCount);
-					}
-				}
-
-				if (seenAll) {
-					// exit
-					throw new SuccessException();
-				}
-
-			}
-		}
-
-		@Override
-		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.windowCounts);
-		}
-
-		@Override
-		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
-			}
-			windowCounts.putAll(state.get(0));
-		}
-	}
-
-	// Sink for validating the stateful window counts
-	private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
-			implements ListCheckpointed<HashMap<Long, Integer>> {
-
-		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
-		private final int numKeys;
-		private final int numWindowsExpected;
-
-		private CountValidatingSink(int numKeys, int numWindowsExpected) {
-			this.numKeys = numKeys;
-			this.numWindowsExpected = numWindowsExpected;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			// this sink can only work with DOP 1
-			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-		}
-
-		@Override
-		public void close() throws Exception {
-			boolean seenAll = true;
-			if (windowCounts.size() == numKeys) {
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount < numWindowsExpected) {
-						seenAll = false;
-						break;
-					}
-				}
-			}
-			assertTrue("The source must see all expected windows.", seenAll);
-		}
-
-		@Override
-		public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
-			Integer curr = windowCounts.get(value.f0);
-			if (curr != null) {
-				windowCounts.put(value.f0, curr + 1);
-			}
-			else {
-				windowCounts.put(value.f0, 1);
-			}
-
-			// verify the contents of that window, the contents should be:
-			// (key + num windows so far)
-
-			assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
-
-			boolean seenAll = true;
-			if (windowCounts.size() == numKeys) {
-				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount < numWindowsExpected) {
-						seenAll = false;
-						break;
-					} else if (windowCount > numWindowsExpected) {
-						fail("Window count to high: " + windowCount);
-					}
-				}
-
-				if (seenAll) {
-					// exit
-					throw new SuccessException();
-				}
-
-			}
-		}
-
-		@Override
-		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
-			return Collections.singletonList(this.windowCounts);
-		}
-
-		@Override
-		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
-			if (state.isEmpty() || state.size() > 1) {
-				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
-			}
-			this.windowCounts.putAll(state.get(0));
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private static class IntType {
-
-		public int value;
-
-		public IntType() {}
-
-		public IntType(int value) {
-			this.value = value;
-		}
-	}
-
-	protected int numElementsPerKey() {
-		return 300;
-	}
-
-	protected int windowSize() {
-		return 100;
-	}
-
-	protected int windowSlide() {
-		return 100;
-	}
-
-	protected int numKeys() {
-		return 20;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
deleted file mode 100644
index 4e454d7..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.io.IOException;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
-
-/**
- * This test delegates to instances of {@link AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured
- * to use local recovery.
- *
- * <p>TODO: This class must be refactored to properly extend {@link AbstractEventTimeWindowCheckpointingITCase}.
- */
-public abstract class AbstractLocalRecoveryITCase extends TestLogger {
-
-	private final StateBackendEnum backendEnum;
-	private final boolean localRecoveryEnabled;
-
-	@Rule
-	public TestName testName = new TestName();
-
-	AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, boolean localRecoveryEnabled) {
-		this.backendEnum = backendEnum;
-		this.localRecoveryEnabled = localRecoveryEnabled;
-	}
-
-	@Test
-	public final void executeTest() throws Exception {
-		AbstractEventTimeWindowCheckpointingITCase.tempFolder.create();
-		AbstractEventTimeWindowCheckpointingITCase windowChkITCase =
-			new AbstractEventTimeWindowCheckpointingITCase() {
-				@Override
-				protected StateBackendEnum getStateBackend() {
-					return backendEnum;
-				}
-
-				@Override
-				protected Configuration createClusterConfig() throws IOException {
-					Configuration config = super.createClusterConfig();
-
-					config.setBoolean(
-						CheckpointingOptions.LOCAL_RECOVERY,
-						localRecoveryEnabled);
-
-					return config;
-				}
-			};
-
-		executeTest(windowChkITCase);
-	}
-
-	private void executeTest(AbstractEventTimeWindowCheckpointingITCase delegate) throws Exception {
-		delegate.name = testName;
-		try {
-			delegate.setupTestCluster();
-			try {
-				delegate.testTumblingTimeWindow();
-				delegate.stopTestCluster();
-			} catch (Exception e) {
-				delegate.stopTestCluster();
-			}
-
-			delegate.setupTestCluster();
-			try {
-				delegate.testSlidingTimeWindow();
-				delegate.stopTestCluster();
-			} catch (Exception e) {
-				delegate.stopTestCluster();
-			}
-		} finally {
-			delegate.tempFolder.delete();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index c4b06d4..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for asynchronous file backend.
- */
-public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
-	@Override
-	protected StateBackendEnum getStateBackend() {
-		return StateBackendEnum.FILE_ASYNC;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index 2cc5b01..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for asynchronous memory backend.
- */
-public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-	@Override
-	protected StateBackendEnum getStateBackend() {
-		return StateBackendEnum.MEM_ASYNC;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index d05bafb..9e14b26 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -58,7 +58,7 @@ import static org.junit.Assert.fail;
 /**
  * This verifies that checkpointing works correctly with event time windows.
  *
- * <p>This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
+ * <p>This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
  */
 @SuppressWarnings("serial")
 public class EventTimeAllWindowCheckpointingITCase extends TestLogger {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..c3d93d7
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,931 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This verifies that checkpointing works correctly with event time windows. This is more
+ * strict than {@link WindowCheckpointingITCase} because for event-time the contents
+ * of the emitted windows are deterministic.
+ *
+ * <p>Split into multiple test classes in order to decrease the runtime per backend
+ * and not run into CI infrastructure limits like no std output being emitted for
+ * I/O heavy variants.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class EventTimeWindowCheckpointingITCase extends TestLogger {
+
+	private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
+	private static final int PARALLELISM = 4;
+
+	private TestingServer zkServer;
+
+	public MiniClusterResource miniClusterResource;
+
+	@ClassRule
+	public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Rule
+	public TestName name = new TestName();
+
+	private AbstractStateBackend stateBackend;
+
+	@Parameterized.Parameter
+	public StateBackendEnum stateBackendEnum;
+
+	enum StateBackendEnum {
+		MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
+	}
+
+	@Parameterized.Parameters(name = "statebackend type ={0}")
+	public static Collection<StateBackendEnum> parameter() {
+		return Arrays.asList(StateBackendEnum.values());
+	}
+
+	protected StateBackendEnum getStateBackend() {
+		return this.stateBackendEnum;
+	}
+
+	protected final MiniClusterResource getMiniClusterResource() {
+		return new MiniClusterResource(
+			new MiniClusterResourceConfiguration.Builder()
+				.setConfiguration(getConfigurationSafe())
+				.setNumberTaskManagers(2)
+				.setNumberSlotsPerTaskManager(PARALLELISM / 2)
+				.build());
+	}
+
+	private Configuration getConfigurationSafe() {
+		try {
+			return getConfiguration();
+		} catch (Exception e) {
+			throw new AssertionError("Could not initialize test.", e);
+		}
+	}
+
+	private Configuration getConfiguration() throws Exception {
+
+		// print a message when starting a test method to avoid Travis' <tt>"Maven produced no
+		// output for xxx seconds."</tt> messages
+		System.out.println(
+			"Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
+
+		// Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
+		StateBackendEnum stateBackendEnum = getStateBackend();
+		if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
+			zkServer = new TestingServer();
+			zkServer.start();
+		}
+
+		Configuration config = createClusterConfig();
+
+		switch (stateBackendEnum) {
+			case MEM:
+				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
+				break;
+			case FILE: {
+				String backups = tempFolder.newFolder().getAbsolutePath();
+				this.stateBackend = new FsStateBackend("file://" + backups, false);
+				break;
+			}
+			case MEM_ASYNC:
+				this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
+				break;
+			case FILE_ASYNC: {
+				String backups = tempFolder.newFolder().getAbsolutePath();
+				this.stateBackend = new FsStateBackend("file://" + backups, true);
+				break;
+			}
+			case ROCKSDB_FULLY_ASYNC: {
+				String rocksDb = tempFolder.newFolder().getAbsolutePath();
+				String backups = tempFolder.newFolder().getAbsolutePath();
+				RocksDBStateBackend rdb = new RocksDBStateBackend(new FsStateBackend("file://" + backups));
+				rdb.setDbStoragePath(rocksDb);
+				this.stateBackend = rdb;
+				break;
+			}
+			case ROCKSDB_INCREMENTAL:
+			case ROCKSDB_INCREMENTAL_ZK: {
+				String rocksDb = tempFolder.newFolder().getAbsolutePath();
+				String backups = tempFolder.newFolder().getAbsolutePath();
+				// we use the fs backend with small threshold here to test the behaviour with file
+				// references, not self contained byte handles
+				RocksDBStateBackend rdb =
+					new RocksDBStateBackend(
+						new FsStateBackend(
+							new Path("file://" + backups).toUri(), 16),
+						true);
+				rdb.setDbStoragePath(rocksDb);
+				this.stateBackend = rdb;
+				break;
+			}
+			default:
+				throw new IllegalStateException("No backend selected.");
+		}
+		return config;
+	}
+
+	protected Configuration createClusterConfig() throws IOException {
+		TemporaryFolder temporaryFolder = new TemporaryFolder();
+		temporaryFolder.create();
+		final File haDir = temporaryFolder.newFolder();
+
+		Configuration config = new Configuration();
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
+		// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
+		config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
+		config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
+
+		if (zkServer != null) {
+			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
+		}
+		return config;
+	}
+
+	@Before
+	public void setupTestCluster() throws Exception {
+		miniClusterResource = getMiniClusterResource();
+		miniClusterResource.before();
+	}
+
+	@After
+	public void stopTestCluster() throws IOException {
+		if (miniClusterResource != null) {
+			miniClusterResource.after();
+			miniClusterResource = null;
+		}
+
+		if (zkServer != null) {
+			zkServer.stop();
+			zkServer = null;
+		}
+
+		//Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no output
+		// for xxx seconds."</tt> messages.
+		System.out.println(
+			"Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testTumblingTimeWindow() {
+		final int numElementsPerKey = numElementsPerKey();
+		final int windowSize = windowSize();
+		final int numKeys = numKeys();
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.getConfig().disableSysoutLogging();
+			env.setStateBackend(this.stateBackend);
+			env.getConfig().setUseSnapshotCompression(true);
+
+			env
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+					.rebalance()
+					.keyBy(0)
+					.timeWindow(Time.of(windowSize, MILLISECONDS))
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								Tuple tuple,
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							int sum = 0;
+							long key = -1;
+
+							for (Tuple2<Long, IntType> value : values) {
+								sum += value.f1.value;
+								key = value.f0;
+							}
+							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+						}
+					})
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
+		doTestTumblingTimeWindowWithKVState(PARALLELISM);
+	}
+
+	@Test
+	public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
+		doTestTumblingTimeWindowWithKVState(1 << 15);
+	}
+
+	public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
+		final int numElementsPerKey = numElementsPerKey();
+		final int windowSize = windowSize();
+		final int numKeys = numKeys();
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(PARALLELISM);
+			env.setMaxParallelism(maxParallelism);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.getConfig().disableSysoutLogging();
+			env.setStateBackend(this.stateBackend);
+			env.getConfig().setUseSnapshotCompression(true);
+
+			env
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+					.rebalance()
+					.keyBy(0)
+					.timeWindow(Time.of(windowSize, MILLISECONDS))
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+						private boolean open = false;
+
+						private ValueState<Integer> count;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+							count = getRuntimeContext().getState(
+									new ValueStateDescriptor<>("count", Integer.class, 0));
+						}
+
+						@Override
+						public void apply(
+								Tuple tuple,
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
+
+							// the window count state starts with the key, so that we get
+							// different count results for each key
+							if (count.value() == 0) {
+								count.update(tuple.<Long>getField(0).intValue());
+							}
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							count.update(count.value() + 1);
+							out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
+						}
+					})
+					.addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testSlidingTimeWindow() {
+		final int numElementsPerKey = numElementsPerKey();
+		final int windowSize = windowSize();
+		final int windowSlide = windowSlide();
+		final int numKeys = numKeys();
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setMaxParallelism(2 * PARALLELISM);
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.getConfig().disableSysoutLogging();
+			env.setStateBackend(this.stateBackend);
+			env.getConfig().setUseSnapshotCompression(true);
+
+			env
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+					.rebalance()
+					.keyBy(0)
+					.timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
+					.apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								Tuple tuple,
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> values,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							int sum = 0;
+							long key = -1;
+
+							for (Tuple2<Long, IntType> value : values) {
+								sum += value.f1.value;
+								key = value.f0;
+							}
+							out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+						}
+					})
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPreAggregatedTumblingTimeWindow() {
+		final int numElementsPerKey = numElementsPerKey();
+		final int windowSize = windowSize();
+		final int numKeys = numKeys();
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.getConfig().disableSysoutLogging();
+			env.setStateBackend(this.stateBackend);
+			env.getConfig().setUseSnapshotCompression(true);
+
+			env
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+					.rebalance()
+					.keyBy(0)
+					.timeWindow(Time.of(windowSize, MILLISECONDS))
+					.reduce(
+							new ReduceFunction<Tuple2<Long, IntType>>() {
+
+								@Override
+								public Tuple2<Long, IntType> reduce(
+										Tuple2<Long, IntType> a,
+										Tuple2<Long, IntType> b) {
+									return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+								}
+							},
+							new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								Tuple tuple,
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> input,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							for (Tuple2<Long, IntType> in: input) {
+								out.collect(new Tuple4<>(in.f0,
+										window.getStart(),
+										window.getEnd(),
+										in.f1));
+							}
+						}
+					})
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPreAggregatedSlidingTimeWindow() {
+		final int numElementsPerKey = numElementsPerKey();
+		final int windowSize = windowSize();
+		final int windowSlide = windowSlide();
+		final int numKeys = numKeys();
+		FailingSource.reset();
+
+		try {
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(PARALLELISM);
+			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+			env.enableCheckpointing(100);
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+			env.getConfig().disableSysoutLogging();
+			env.setStateBackend(this.stateBackend);
+			env.getConfig().setUseSnapshotCompression(true);
+
+			env
+					.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+					.rebalance()
+					.keyBy(0)
+					.timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
+					.reduce(
+							new ReduceFunction<Tuple2<Long, IntType>>() {
+
+								@Override
+								public Tuple2<Long, IntType> reduce(
+										Tuple2<Long, IntType> a,
+										Tuple2<Long, IntType> b) {
+
+									// validate that the function has been opened properly
+									return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+								}
+							},
+							new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+						private boolean open = false;
+
+						@Override
+						public void open(Configuration parameters) {
+							assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+							open = true;
+						}
+
+						@Override
+						public void apply(
+								Tuple tuple,
+								TimeWindow window,
+								Iterable<Tuple2<Long, IntType>> input,
+								Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+							// validate that the function has been opened properly
+							assertTrue(open);
+
+							for (Tuple2<Long, IntType> in: input) {
+								out.collect(new Tuple4<>(in.f0,
+										window.getStart(),
+										window.getEnd(),
+										in.f1));
+							}
+						}
+					})
+					.addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+
+			tryExecute(env, "Tumbling Window Test");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
+			implements ListCheckpointed<Integer>, CheckpointListener {
+		private static volatile boolean failedBefore = false;
+
+		private final int numKeys;
+		private final int numElementsToEmit;
+		private final int failureAfterNumElements;
+
+		private volatile int numElementsEmitted;
+		private volatile int numSuccessfulCheckpoints;
+		private volatile boolean running = true;
+
+		private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
+			this.numKeys = numKeys;
+			this.numElementsToEmit = numElementsToEmitPerKey;
+			this.failureAfterNumElements = failureAfterNumElements;
+		}
+
+		@Override
+		public void open(Configuration parameters) {
+			// non-parallel source
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
+			// we loop longer than we have elements, to permit delayed checkpoints
+			// to still cause a failure
+			while (running) {
+
+				if (!failedBefore) {
+					// delay a bit, if we have not failed before
+					Thread.sleep(1);
+					if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
+						// cause a failure if we have not failed before and have reached
+						// enough completed checkpoints and elements
+						failedBefore = true;
+						throw new Exception("Artificial Failure");
+					}
+				}
+
+				if (numElementsEmitted < numElementsToEmit &&
+						(failedBefore || numElementsEmitted <= failureAfterNumElements)) {
+					// the function failed before, or we are in the elements before the failure
+					synchronized (ctx.getCheckpointLock()) {
+						int next = numElementsEmitted++;
+						for (long i = 0; i < numKeys; i++) {
+							ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+						}
+						ctx.emitWatermark(new Watermark(next));
+					}
+				}
+				else {
+
+					// if our work is done, delay a bit to prevent busy waiting
+					Thread.sleep(1);
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			numSuccessfulCheckpoints++;
+		}
+
+		@Override
+		public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.numElementsEmitted);
+		}
+
+		@Override
+		public void restoreState(List<Integer> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.numElementsEmitted = state.get(0);
+		}
+
+		public static void reset() {
+			failedBefore = false;
+		}
+	}
+
+	private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+			implements ListCheckpointed<HashMap<Long, Integer>> {
+
+		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+		private final int numKeys;
+		private final int numWindowsExpected;
+
+		private ValidatingSink(int numKeys, int numWindowsExpected) {
+			this.numKeys = numKeys;
+			this.numWindowsExpected = numWindowsExpected;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// this sink can only work with DOP 1
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+
+			// it can happen that a checkpoint happens when the complete success state is
+			// already set. In that case we restart with the final state and would never
+			// finish because no more elements arrive.
+			if (windowCounts.size() == numKeys) {
+				boolean seenAll = true;
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount != numWindowsExpected) {
+						seenAll = false;
+						break;
+					}
+				}
+				if (seenAll) {
+					throw new SuccessException();
+				}
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			boolean seenAll = true;
+			if (windowCounts.size() == numKeys) {
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount < numWindowsExpected) {
+						seenAll = false;
+						break;
+					}
+				}
+			}
+			assertTrue("The sink must see all expected windows.", seenAll);
+		}
+
+		@Override
+		public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+			// verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
+			// the sum should be "sum (start .. end-1)"
+
+			int expectedSum = 0;
+			for (long i = value.f1; i < value.f2; i++) {
+				// only sum up positive vals, to filter out the negative start of the
+				// first sliding windows
+				if (i > 0) {
+					expectedSum += i;
+				}
+			}
+
+			assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
+
+			Integer curr = windowCounts.get(value.f0);
+			if (curr != null) {
+				windowCounts.put(value.f0, curr + 1);
+			}
+			else {
+				windowCounts.put(value.f0, 1);
+			}
+
+			if (windowCounts.size() == numKeys) {
+				boolean seenAll = true;
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount < numWindowsExpected) {
+						seenAll = false;
+						break;
+					} else if (windowCount > numWindowsExpected) {
+						fail("Window count to high: " + windowCount);
+					}
+				}
+
+				if (seenAll) {
+					// exit
+					throw new SuccessException();
+				}
+
+			}
+		}
+
+		@Override
+		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.windowCounts);
+		}
+
+		@Override
+		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			windowCounts.putAll(state.get(0));
+		}
+	}
+
+	// Sink for validating the stateful window counts
+	private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+			implements ListCheckpointed<HashMap<Long, Integer>> {
+
+		private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+		private final int numKeys;
+		private final int numWindowsExpected;
+
+		private CountValidatingSink(int numKeys, int numWindowsExpected) {
+			this.numKeys = numKeys;
+			this.numWindowsExpected = numWindowsExpected;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			// this sink can only work with DOP 1
+			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+		}
+
+		@Override
+		public void close() throws Exception {
+			boolean seenAll = true;
+			if (windowCounts.size() == numKeys) {
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount < numWindowsExpected) {
+						seenAll = false;
+						break;
+					}
+				}
+			}
+			assertTrue("The source must see all expected windows.", seenAll);
+		}
+
+		@Override
+		public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+			Integer curr = windowCounts.get(value.f0);
+			if (curr != null) {
+				windowCounts.put(value.f0, curr + 1);
+			}
+			else {
+				windowCounts.put(value.f0, 1);
+			}
+
+			// verify the contents of that window, the contents should be:
+			// (key + num windows so far)
+
+			assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
+
+			boolean seenAll = true;
+			if (windowCounts.size() == numKeys) {
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount < numWindowsExpected) {
+						seenAll = false;
+						break;
+					} else if (windowCount > numWindowsExpected) {
+						fail("Window count to high: " + windowCount);
+					}
+				}
+
+				if (seenAll) {
+					// exit
+					throw new SuccessException();
+				}
+
+			}
+		}
+
+		@Override
+		public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Collections.singletonList(this.windowCounts);
+		}
+
+		@Override
+		public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+			if (state.isEmpty() || state.size() > 1) {
+				throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+			}
+			this.windowCounts.putAll(state.get(0));
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static class IntType {
+
+		public int value;
+
+		public IntType() {}
+
+		public IntType(int value) {
+			this.value = value;
+		}
+	}
+
+	private int numElementsPerKey() {
+		switch (this.stateBackendEnum) {
+			case ROCKSDB_FULLY_ASYNC:
+			case ROCKSDB_INCREMENTAL:
+			case ROCKSDB_INCREMENTAL_ZK:
+				return 3000;
+			default:
+				return 300;
+		}
+	}
+
+	private int windowSize() {
+		switch (this.stateBackendEnum) {
+			case ROCKSDB_FULLY_ASYNC:
+			case ROCKSDB_INCREMENTAL:
+			case ROCKSDB_INCREMENTAL_ZK:
+				return 1000;
+			default:
+				return 100;
+		}
+	}
+
+	private int windowSlide() {
+		return 100;
+	}
+
+	private int numKeys() {
+		switch (this.stateBackendEnum) {
+			case ROCKSDB_FULLY_ASYNC:
+			case ROCKSDB_INCREMENTAL:
+			case ROCKSDB_INCREMENTAL_ZK:
+				return 100;
+			default:
+				return 20;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index eab6153..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for file backend.
- */
-public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-	@Override
-	protected StateBackendEnum getStateBackend() {
-		return StateBackendEnum.FILE;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index ed43ad6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for incremental RocksDB backend.
- */
-public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
-	@Override
-	protected StateBackendEnum getStateBackend() {
-		return StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-	}
-
-	@Override
-	protected int numElementsPerKey() {
-		return 3000;
-	}
-
-	@Override
-	protected int windowSize() {
-		return 1000;
-	}
-
-	@Override
-	protected int windowSlide() {
-		return 100;
-	}
-
-	@Override
-	protected int numKeys() {
-		return 100;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index 1276a00..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for incremental RocksDB backend.
- */
-public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
-	@Override
-	protected StateBackendEnum getStateBackend() {
-		return StateBackendEnum.ROCKSDB_INCREMENTAL;
-	}
-
-	@Override
-	protected int numElementsPerKey() {
-		return 3000;
-	}
-
-	@Override
-	protected int windowSize() {
-		return 1000;
-	}
-
-	@Override
-	protected int windowSlide() {
-		return 100;
-	}
-
-	@Override
-	protected int numKeys() {
-		return 100;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
deleted file mode 100644
index 6749366..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
-
-/**
- * Tests file-based local recovery with the HeapBackend.
- */
-public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase {
-	public LocalRecoveryHeapITCase() {
-		super(FILE_ASYNC, true);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
new file mode 100644
index 0000000..5374765
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
+
+/**
+ * This test delegates to instances of {@link EventTimeWindowCheckpointingITCase} that have been reconfigured
+ * to use local recovery.
+ *
+ * <p>TODO: This class must be refactored to properly extend {@link EventTimeWindowCheckpointingITCase}.
+ */
+@RunWith(Parameterized.class)
+public class LocalRecoveryITCase extends TestLogger {
+
+	private final boolean localRecoveryEnabled = true;
+
+	@Rule
+	public TestName testName = new TestName();
+
+	@Parameterized.Parameter
+	public StateBackendEnum backendEnum;
+
+	@Parameterized.Parameters(name = "statebackend type ={0}")
+	public static Collection<StateBackendEnum> parameter() {
+		return Arrays.asList(ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL_ZK, FILE_ASYNC);
+	}
+
+	@Test
+	public final void executeTest() throws Exception {
+		EventTimeWindowCheckpointingITCase.tempFolder.create();
+		EventTimeWindowCheckpointingITCase windowChkITCase =
+			new EventTimeWindowCheckpointingITCase() {
+
+				@Override
+				protected StateBackendEnum getStateBackend() {
+					return backendEnum;
+				}
+
+				@Override
+				protected Configuration createClusterConfig() throws IOException {
+					Configuration config = super.createClusterConfig();
+
+					config.setBoolean(
+						CheckpointingOptions.LOCAL_RECOVERY,
+						localRecoveryEnabled);
+
+					return config;
+				}
+			};
+
+		executeTest(windowChkITCase);
+	}
+
+	private void executeTest(EventTimeWindowCheckpointingITCase delegate) throws Exception {
+		delegate.name = testName;
+		try {
+			delegate.setupTestCluster();
+			try {
+				delegate.testTumblingTimeWindow();
+				delegate.stopTestCluster();
+			} catch (Exception e) {
+				delegate.stopTestCluster();
+			}
+
+			delegate.setupTestCluster();
+			try {
+				delegate.testSlidingTimeWindow();
+				delegate.stopTestCluster();
+			} catch (Exception e) {
+				delegate.stopTestCluster();
+			}
+		} finally {
+			delegate.tempFolder.delete();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
deleted file mode 100644
index 2d12ae2..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
-
-/**
- * Tests file-based local recovery with the RocksDB state-backend.
- */
-public class LocalRecoveryRocksDBFullITCase extends AbstractLocalRecoveryITCase {
-	public LocalRecoveryRocksDBFullITCase() {
-		super(ROCKSDB_FULLY_ASYNC, true);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
deleted file mode 100644
index 718d4a3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-
-/**
- * Tests file-based local recovery with the RocksDB state-backend and incremental checkpointing enabled.
- */
-public class LocalRecoveryRocksDBIncrementalITCase extends AbstractLocalRecoveryITCase {
-	public LocalRecoveryRocksDBIncrementalITCase() {
-		super(ROCKSDB_INCREMENTAL_ZK, true);
-	}
-}