You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/13 12:39:45 UTC
[1/6] flink git commit: [FLINK-9703] Allow TM ports to be exposed
through Mesos
Repository: flink
Updated Branches:
refs/heads/master 37abf46f6 -> b74594c48
[FLINK-9703] Allow TM ports to be exposed through Mesos
Maintain a deterministic port ordering, so we can have expectations on which endpoint is behind which port index.
This closes #6288.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b74594c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b74594c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b74594c4
Branch: refs/heads/master
Commit: b74594c488e4c5428c5391b89c32ae6fd7d98a79
Parents: d58c8c0
Author: Rune Skou Larsen <rs...@trifork.com>
Authored: Mon Jul 9 13:54:51 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200
----------------------------------------------------------------------
.../generated/mesos_configuration.html | 5 ++
.../flink/mesos/configuration/MesosOptions.java | 8 +++
.../clusterframework/LaunchableMesosWorker.java | 34 +++++++++---
.../LaunchableMesosWorkerTest.java | 55 ++++++++++++++++++++
4 files changed, 96 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b74594c4/docs/_includes/generated/mesos_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/mesos_configuration.html b/docs/_includes/generated/mesos_configuration.html
index 16a2388..cd0ae24 100644
--- a/docs/_includes/generated/mesos_configuration.html
+++ b/docs/_includes/generated/mesos_configuration.html
@@ -62,5 +62,10 @@
<td style="word-wrap: break-word;">(none)</td>
<td>Mesos framework user</td>
</tr>
+ <tr>
+ <td><h5>mesos.resourcemanager.tasks.port-assignments</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Comma-separated list of configuration keys which represent a configurable port.All port keys will dynamically get a port assigned through Mesos.</td>
+ </tr>
</tbody>
</table>
http://git-wip-us.apache.org/repos/asf/flink/blob/b74594c4/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
index 6c802fa..753923f 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
@@ -120,4 +120,12 @@ public class MesosOptions {
.withDescription("Enables SSL for the Flink artifact server. Note that security.ssl.enabled also needs to" +
" be set to true encryption to enable encryption.");
+ /**
+ * Config parameter to configure which configuration keys will dynamically get a port assigned through Mesos.
+ */
+ public static final ConfigOption<String> PORT_ASSIGNMENTS = key("mesos.resourcemanager.tasks.port-assignments")
+ .defaultValue("")
+ .withDescription("Comma-separated list of configuration keys which represent a configurable port." +
+ "All port keys will dynamically get a port assigned through Mesos.");
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b74594c4/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 93c90b7..bb15aee 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -30,8 +30,6 @@ import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.util.Preconditions;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
-
import com.netflix.fenzo.ConstraintEvaluator;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.VMTaskFitnessCalculator;
@@ -41,8 +39,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -53,6 +53,7 @@ import scala.Option;
import static org.apache.flink.mesos.Utils.rangeValues;
import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
/**
* Implements the launch of a Mesos worker.
@@ -66,7 +67,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
/**
* The set of configuration keys to be dynamically configured with a port allocated from Mesos.
*/
- private static final String[] TM_PORT_KEYS = {
+ static final String[] TM_PORT_KEYS = {
"taskmanager.rpc.port",
"taskmanager.data.port"};
@@ -147,7 +148,7 @@ public class LaunchableMesosWorker implements LaunchableTask {
@Override
public int getPorts() {
- return TM_PORT_KEYS.length;
+ return extractPortKeys(containerSpec.getDynamicConfiguration()).size();
}
@Override
@@ -235,9 +236,10 @@ public class LaunchableMesosWorker implements LaunchableTask {
}
// take needed ports for the TM
- List<Protos.Resource> portResources = allocation.takeRanges("ports", TM_PORT_KEYS.length, roles);
+ Set<String> tmPortKeys = extractPortKeys(containerSpec.getDynamicConfiguration());
+ List<Protos.Resource> portResources = allocation.takeRanges("ports", tmPortKeys.size(), roles);
taskInfo.addAllResources(portResources);
- Iterator<String> portsToAssign = Iterators.forArray(TM_PORT_KEYS);
+ Iterator<String> portsToAssign = tmPortKeys.iterator();
rangeValues(portResources).forEach(port -> dynamicProperties.setLong(portsToAssign.next(), port));
if (portsToAssign.hasNext()) {
throw new IllegalArgumentException("insufficient # of ports assigned");
@@ -332,6 +334,26 @@ public class LaunchableMesosWorker implements LaunchableTask {
return taskInfo.build();
}
+ /**
+ * Get the port keys representing the TM's configured endpoints. This includes mandatory TM endpoints such as
+ * data and rpc as well as optionally configured endpoints for services such as prometheus reporter
+ *
+ * @param config to extract the port keys from
+ * @return A deterministically ordered Set of port keys to expose from the TM container
+ */
+ static Set<String> extractPortKeys(Configuration config) {
+ final LinkedHashSet<String> tmPortKeys = new LinkedHashSet<>(Arrays.asList(TM_PORT_KEYS));
+
+ final String portKeys = config.getString(PORT_ASSIGNMENTS);
+
+ Arrays.stream(portKeys.split(","))
+ .map(String::trim)
+ .peek(key -> LOG.debug("Adding port key " + key + " to mesos request"))
+ .forEach(tmPortKeys::add);
+
+ return tmPortKeys;
+ }
+
@Override
public String toString() {
return "LaunchableMesosWorker{" +
http://git-wip-us.apache.org/repos/asf/flink/blob/b74594c4/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
new file mode 100644
index 0000000..6784e427
--- /dev/null
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorkerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.apache.flink.mesos.configuration.MesosOptions.PORT_ASSIGNMENTS;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test that mesos config are extracted correctly from the configuration.
+ */
+public class LaunchableMesosWorkerTest extends TestLogger {
+
+ @Test
+ public void canGetPortKeys() {
+ // Setup
+ Configuration config = new Configuration();
+ config.setString(PORT_ASSIGNMENTS, "someport.here,anotherport");
+
+ // Act
+ Set<String> portKeys = LaunchableMesosWorker.extractPortKeys(config);
+
+ // Assert
+ assertEquals("Must get right number of port keys", 4, portKeys.size());
+ Iterator<String> iterator = portKeys.iterator();
+ assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[0], iterator.next());
+ assertEquals("port key must be correct", LaunchableMesosWorker.TM_PORT_KEYS[1], iterator.next());
+ assertEquals("port key must be correct", "someport.here", iterator.next());
+ assertEquals("port key must be correct", "anotherport", iterator.next());
+ }
+
+}
[2/6] flink git commit: [FLINK-9503] Migrate integration tests for
iterative aggregators
Posted by tr...@apache.org.
[FLINK-9503] Migrate integration tests for iterative aggregators
This closes #6129.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d783c628
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d783c628
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d783c628
Branch: refs/heads/master
Commit: d783c6282e7362fae29b35e61b1522912eab2c36
Parents: fc49801
Author: yanghua <ya...@gmail.com>
Authored: Thu Jun 7 00:23:25 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200
----------------------------------------------------------------------
.../aggregators/AggregatorsITCase.java | 95 ++++++++------------
1 file changed, 39 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d783c628/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
index bd42ac2..e449ab8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
@@ -35,10 +35,8 @@ import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
@@ -48,6 +46,9 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -63,35 +64,24 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
private static final int parallelism = 2;
private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
- private static String testString = "Et tu, Brute?";
- private static String testName = "testing_caesar";
- private static String testPath;
-
public AggregatorsITCase(TestExecutionMode mode){
super(mode);
}
- private String resultPath;
- private String expected;
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
- @Rule
- public TemporaryFolder tempFolder = new TemporaryFolder();
+ @Test
+ public void testDistributedCacheWithIterations() throws Exception{
+ final String testString = "Et tu, Brute?";
+ final String testName = "testing_caesar";
- @Before
- public void before() throws Exception{
final File folder = tempFolder.newFolder();
final File resultFile = new File(folder, UUID.randomUUID().toString());
- testPath = resultFile.toString();
- resultPath = resultFile.toURI().toString();
- }
- @After
- public void after() throws Exception{
- compareResultsByLinesInMemory(expected, resultPath);
- }
+ String testPath = resultFile.toString();
+ String resultPath = resultFile.toURI().toString();
- @Test
- public void testDistributedCacheWithIterations() throws Exception{
File tempFile = new File(testPath);
try (FileWriter writer = new FileWriter(tempFile)) {
writer.write(testString);
@@ -117,7 +107,6 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
}
}).withBroadcastSet(solution, "SOLUTION")).output(new DiscardingOutputFormat<Long>());
env.execute();
- expected = testString; // this will be a useless verification now.
}
@Test
@@ -141,12 +130,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
new NegativeElementsConvergenceCriterion());
DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
- iteration.closeWith(updatedDs).writeAsText(resultPath);
- env.execute();
+ List<Integer> result = iteration.closeWith(updatedDs).collect();
+ Collections.sort(result);
- expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+ List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+ assertEquals(expected, result);
}
@Test
@@ -170,12 +159,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
new NegativeElementsConvergenceCriterion());
DataSet<Integer> updatedDs = iteration.map(new SubtractOneMapWithParam());
- iteration.closeWith(updatedDs).writeAsText(resultPath);
- env.execute();
+ List<Integer> result = iteration.closeWith(updatedDs).collect();
+ Collections.sort(result);
- expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+ List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+ assertEquals(expected, result);
}
@Test
@@ -199,12 +188,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
new NegativeElementsConvergenceCriterionWithParam(3));
DataSet<Integer> updatedDs = iteration.map(new SubtractOneMap());
- iteration.closeWith(updatedDs).writeAsText(resultPath);
- env.execute();
+ List<Integer> result = iteration.closeWith(updatedDs).collect();
+ Collections.sort(result);
- expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+ List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
+
+ assertEquals(expected, result);
}
@Test
@@ -231,14 +220,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
.where(0).equalTo(0).flatMap(new UpdateFilter());
DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
- DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
- result.writeAsText(resultPath);
+ List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+ Collections.sort(result);
- env.execute();
+ List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);
- expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
- + "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
- + "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+ assertEquals(expected, result);
}
@Test
@@ -265,14 +252,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
.where(0).equalTo(0).flatMap(new UpdateFilter());
DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
- DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
- result.writeAsText(resultPath);
+ List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+ Collections.sort(result);
- env.execute();
+ List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);
- expected = "1\n" + "2\n" + "2\n" + "3\n" + "3\n"
- + "3\n" + "4\n" + "4\n" + "4\n" + "4\n"
- + "5\n" + "5\n" + "5\n" + "5\n" + "5\n";
+ assertEquals(result, expected);
}
@Test
@@ -303,14 +288,12 @@ public class AggregatorsITCase extends MultipleProgramsTestBase {
.where(0).equalTo(0).projectFirst(0, 1);
DataSet<Tuple2<Integer, Integer>> iterationRes = iteration.closeWith(newElements, newElements);
- DataSet<Integer> result = iterationRes.map(new ProjectSecondMapper());
- result.writeAsText(resultPath);
+ List<Integer> result = iterationRes.map(new ProjectSecondMapper()).collect();
+ Collections.sort(result);
- env.execute();
+ List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
- expected = "-3\n" + "-2\n" + "-2\n" + "-1\n" + "-1\n"
- + "-1\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n";
+ assertEquals(expected, result);
}
@SuppressWarnings("serial")
[4/6] flink git commit: [FLINK-9004][tests] Implement Jepsen tests to
test job availability.
Posted by tr...@apache.org.
[FLINK-9004][tests] Implement Jepsen tests to test job availability.
Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network
partitions, etc. The Flink cluster under test is automatically deployed on YARN
(session & job mode) and Mesos.
Provide Dockerfiles for local test development.
This closes #6240.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d58c8c05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d58c8c05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d58c8c05
Branch: refs/heads/master
Commit: d58c8c05f0b86bdb74cb6e450848690e309011d6
Parents: d783c62
Author: gyao <ga...@data-artisans.com>
Authored: Mon Mar 5 22:23:33 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200
----------------------------------------------------------------------
flink-jepsen/.gitignore | 18 ++
flink-jepsen/README.md | 70 ++++++
flink-jepsen/bin/.gitkeep | 0
flink-jepsen/docker/.gitignore | 3 +
flink-jepsen/docker/Dockerfile-control | 45 ++++
flink-jepsen/docker/Dockerfile-db | 39 ++++
flink-jepsen/docker/docker-compose.yml | 52 +++++
flink-jepsen/docker/run-tests.sh | 31 +++
flink-jepsen/docker/up.sh | 31 +++
flink-jepsen/project.clj | 28 +++
flink-jepsen/scripts/run-tests.sh | 43 ++++
flink-jepsen/src/jepsen/flink/checker.clj | 128 ++++++++++
flink-jepsen/src/jepsen/flink/client.clj | 150 ++++++++++++
flink-jepsen/src/jepsen/flink/db.clj | 232 +++++++++++++++++++
flink-jepsen/src/jepsen/flink/flink.clj | 110 +++++++++
flink-jepsen/src/jepsen/flink/generator.clj | 39 ++++
flink-jepsen/src/jepsen/flink/hadoop.clj | 139 +++++++++++
flink-jepsen/src/jepsen/flink/mesos.clj | 165 +++++++++++++
flink-jepsen/src/jepsen/flink/nemesis.clj | 163 +++++++++++++
flink-jepsen/src/jepsen/flink/utils.clj | 48 ++++
flink-jepsen/src/jepsen/flink/zookeeper.clj | 29 +++
flink-jepsen/test/jepsen/flink/checker_test.clj | 82 +++++++
flink-jepsen/test/jepsen/flink/client_test.clj | 37 +++
flink-jepsen/test/jepsen/flink/utils_test.clj | 39 ++++
.../test/jepsen/flink/zookeeper_test.clj | 28 +++
25 files changed, 1749 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/.gitignore
----------------------------------------------------------------------
diff --git a/flink-jepsen/.gitignore b/flink-jepsen/.gitignore
new file mode 100644
index 0000000..ed5eca5
--- /dev/null
+++ b/flink-jepsen/.gitignore
@@ -0,0 +1,18 @@
+*.class
+*.iml
+*.jar
+*.retry
+.DS_Store
+.hg/
+.hgignore
+.idea/
+/.lein-*
+/.nrepl-port
+/checkouts
+/classes
+/target
+pom.xml
+pom.xml.asc
+store
+bin/*
+!bin/.gitkeep
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/README.md
----------------------------------------------------------------------
diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
new file mode 100644
index 0000000..9343246
--- /dev/null
+++ b/flink-jepsen/README.md
@@ -0,0 +1,70 @@
+# flink-jepsen
+
+A Clojure project based on the [Jepsen](https://github.com/jepsen-io/jepsen) framework to find bugs in the
+distributed coordination of Apache Flink®.
+
+## Test Coverage
+Jepsen is a framework built to test the behavior of distributed systems
+under faults. The tests in this particular project deploy Flink on either YARN or Mesos, submit a
+job, and examine the availability of the job after injecting faults.
+A job is said to be available if all the tasks of the job are running.
+The faults that can be currently introduced to the Flink cluster include:
+* Killing of TaskManager/JobManager processes
+* Stopping HDFS NameNode
+* Network partitions
+
+There are many more properties other than job availability that could be
+verified but are not yet covered by this test suite, e.g., end-to-end exactly-once processing
+semantics.
+
+## Usage
+See the [Jepsen documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment)
+for how to set up the environment to run tests. The script under `scripts/run-tests.sh` documents how to invoke
+tests. The Flink job used for testing is located under
+`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build the job first and copy
+the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's
+root.
+
+### Docker
+
+To simplify development, we have prepared Dockerfiles and a Docker Compose template
+so that you can run the tests locally in containers. To build the images
+and start the containers, simply run:
+
+ $ cd docker
+ $ ./up.sh
+
+After the containers started, open a new terminal window and run `docker exec -it jepsen-control bash`.
+This will allow you to run arbitrary commands on the control node.
+To start the tests, you can use the `run-tests.sh` script in the `docker` directory,
+which expects the number of test iterations, and a URI to a Flink distribution, e.g.,
+
+ ./docker/run-tests.sh 1 https://example.com/flink-dist.tgz
+
+The project's root is mounted as a volume to all containers under the path `/jepsen`.
+This means that changes to the test sources are immediately reflected in the control node container.
+Moreover, this allows you to test locally built Flink distributions by copying the tarball to the
+project's root and passing a URI with the `file://` scheme to the `run-tests.sh` script, e.g.,
+`file:///jepsen/flink-dist.tgz`.
+
+#### Memory Requirements
+
+The tests have high memory demands due to the many processes that are started by the control node.
+For example, to test Flink on YARN in a HA setup, we require ZooKeeper, HDFS NameNode,
+HDFS DataNode, YARN NodeManager, and YARN ResourceManager, in addition to the Flink processes.
+We found that the tests can be run comfortably in Docker containers on a machine with 32 GiB RAM.
+
+### Checking the Output of Tests
+
+Consult the `jepsen.log` file for the particular test run in the `store` folder. The final output of every test will be either
+
+ Everything looks good! ヽ('ー`)ノ
+
+or
+
+ Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
+
+depending on whether the test passed or not. If neither output is generated, the test did not finish
+properly due to problems of the environment, bugs in Jepsen or in the test suite, etc.
+
+In addition, the test directories contain all relevant log files aggregated from all hosts.
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/bin/.gitkeep
----------------------------------------------------------------------
diff --git a/flink-jepsen/bin/.gitkeep b/flink-jepsen/bin/.gitkeep
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/.gitignore
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/.gitignore b/flink-jepsen/docker/.gitignore
new file mode 100644
index 0000000..6ff5a8e
--- /dev/null
+++ b/flink-jepsen/docker/.gitignore
@@ -0,0 +1,3 @@
+id_rsa
+id_rsa.pub
+nodes
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/Dockerfile-control
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/Dockerfile-control b/flink-jepsen/docker/Dockerfile-control
new file mode 100644
index 0000000..96198e3
--- /dev/null
+++ b/flink-jepsen/docker/Dockerfile-control
@@ -0,0 +1,45 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM debian:jessie
+
+RUN echo "deb http://http.debian.net/debian jessie-backports main" >> /etc/apt/sources.list && \
+ apt-get update && \
+ apt-get install -y -t jessie-backports openjdk-8-jdk && \
+ apt-get install -qqy \
+ less \
+ libjna-java \
+ gnuplot \
+ openjdk-8-jdk \
+ openssh-client \
+ vim \
+ wget
+
+ENV LEIN_ROOT true
+RUN wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein && \
+ mv lein /usr/bin && \
+ chmod +x /usr/bin/lein && \
+ lein self-install
+
+ADD id_rsa /root/.ssh/
+ADD id_rsa.pub /root/.ssh/
+RUN touch ~/.ssh/known_hosts
+
+WORKDIR /jepsen
+
+CMD tail -f /dev/null
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/Dockerfile-db
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/Dockerfile-db b/flink-jepsen/docker/Dockerfile-db
new file mode 100644
index 0000000..1555329
--- /dev/null
+++ b/flink-jepsen/docker/Dockerfile-db
@@ -0,0 +1,39 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM debian:jessie
+
+RUN echo "deb http://http.debian.net/debian jessie-backports main" >> /etc/apt/sources.list && \
+ apt-get update && \
+ apt-get install -y -t jessie-backports openjdk-8-jdk && \
+ apt-get install -y apt-utils bzip2 curl faketime iproute iptables iputils-ping less libzip2 logrotate man man-db net-tools ntpdate psmisc python rsyslog sudo sysvinit sysvinit-core sysvinit-utils tar unzip vim wget
+
+RUN apt-get update && \
+ apt-get -y install openssh-server && \
+ mkdir -p /var/run/sshd && \
+ sed -i "s/UsePrivilegeSeparation.*/UsePrivilegeSeparation no/g" /etc/ssh/sshd_config && \
+ sed -i "s/PermitRootLogin without-password/PermitRootLogin yes/g" /etc/ssh/sshd_config
+
+ADD id_rsa.pub /root
+RUN mkdir -p /root/.ssh/ && \
+ touch /root/.ssh/authorized_keys && \
+ chmod 600 /root/.ssh/authorized_keys && \
+ cat /root/id_rsa.pub >> /root/.ssh/authorized_keys
+
+EXPOSE 22
+CMD exec /usr/sbin/sshd -D
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/docker-compose.yml
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/docker-compose.yml b/flink-jepsen/docker/docker-compose.yml
new file mode 100644
index 0000000..3d2bdbd
--- /dev/null
+++ b/flink-jepsen/docker/docker-compose.yml
@@ -0,0 +1,52 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+version: '2'
+services:
+ control:
+ volumes:
+ - ${JEPSEN_ROOT}:/jepsen
+
+ container_name: jepsen-control
+ hostname: control
+ build:
+ context: ./
+ dockerfile: Dockerfile-control
+ privileged: true
+ links:
+ - n1
+ - n2
+ - n3
+
+ n1:
+ build:
+ context: ./
+ dockerfile: Dockerfile-db
+ privileged: true
+ container_name: jepsen-n1
+ hostname: n1
+ volumes:
+ - ${JEPSEN_ROOT}:/jepsen
+ n2:
+ extends: n1
+ container_name: jepsen-n2
+ hostname: n2
+ n3:
+ extends: n1
+ container_name: jepsen-n3
+ hostname: n3
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/run-tests.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/run-tests.sh b/flink-jepsen/docker/run-tests.sh
new file mode 100755
index 0000000..8b2b1e6
--- /dev/null
+++ b/flink-jepsen/docker/run-tests.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+dockerdir=$(dirname $0)
+dockerdir=$(cd ${dockerdir}; pwd)
+
+cat <<EOF > ${dockerdir}/nodes
+n1
+n2
+n3
+EOF
+
+common_jepsen_args+=(--nodes-file ${dockerdir}/nodes)
+
+. ${dockerdir}/../scripts/run-tests.sh ${1} ${2} 1
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/up.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/up.sh b/flink-jepsen/docker/up.sh
new file mode 100755
index 0000000..5479b3b
--- /dev/null
+++ b/flink-jepsen/docker/up.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -e
+
+dockerdir=$(dirname $0)
+dockerdir=$(cd ${dockerdir}; pwd)
+
+if [ ! -f ./id_rsa ]; then
+ ssh-keygen -t rsa -N "" -f ./id_rsa
+fi
+
+export JEPSEN_ROOT=${dockerdir}/../
+docker-compose build
+docker-compose -f docker-compose.yml up --force-recreate
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/project.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/project.clj b/flink-jepsen/project.clj
new file mode 100644
index 0000000..78935d7
--- /dev/null
+++ b/flink-jepsen/project.clj
@@ -0,0 +1,28 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(defproject jepsen.flink "0.1.0-SNAPSHOT"
+ :license {:name "Apache License"
+ :url "http://www.apache.org/licenses/LICENSE-2.0"}
+ :main jepsen.flink.flink
+ :dependencies [[org.clojure/clojure "1.9.0"],
+ [cheshire "5.8.0"]
+ [clj-http "3.8.0"]
+ [jepsen "0.1.10"],
+ [jepsen.zookeeper "0.1.0"]
+ [org.clojure/data.xml "0.0.8"]
+ [zookeeper-clj "0.9.4" :exclusions [org.slf4j/slf4j-log4j12]]]
+ :profiles {:test {:dependencies [[clj-http-fake "1.0.3"]]}})
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/scripts/run-tests.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/scripts/run-tests.sh b/flink-jepsen/scripts/run-tests.sh
new file mode 100755
index 0000000..e448124
--- /dev/null
+++ b/flink-jepsen/scripts/run-tests.sh
@@ -0,0 +1,43 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -euo pipefail
+
+scripts=$(dirname $0)
+scripts=$(cd ${scripts}; pwd)
+
+parallelism=${3}
+
+common_jepsen_args+=(--ha-storage-dir hdfs:///flink
+--job-jar ${scripts}/../bin/DataStreamAllroundTestProgram.jar
+--tarball ${2}
+--job-args "--environment.parallelism ${parallelism} --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"
+--ssh-private-key ~/.ssh/id_rsa)
+
+for i in $(seq 1 ${1})
+do
+ echo "Executing run #${i} of ${1}"
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-job
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-job
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-job
+ echo
+done
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/checker.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj b/flink-jepsen/src/jepsen/flink/checker.clj
new file mode 100644
index 0000000..02cc863
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -0,0 +1,128 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.checker
+ (:require [jepsen
+ [checker :as checker]
+ [util :as ju]]
+ [knossos.model :as model])
+ (:import (knossos.model Model)))
+
+(defn stoppable-op? [op]
+ (clojure.string/includes? (name (:f op)) "-start"))
+
+(defn stop-op? [op]
+ (clojure.string/includes? (name (:f op)) "-stop"))
+
+(defn strip-op-suffix [op]
+ (clojure.string/replace (name (:f op)) #"-start|-stop" ""))
+
+(def safe-inc
+ (fnil inc 0))
+
+(defn nemeses-active?
+ [active-nemeses]
+ (->> (vals active-nemeses)
+ (reduce +)
+ pos?))
+
+(defn dissoc-if
+ [f m]
+ (->> (remove f m)
+ (into {})))
+
+(defn zero-value?
+ [[_ v]]
+ (zero? v))
+
+(defrecord
+ JobRunningWithinGracePeriod
+ ^{:doc "A Model which is consistent iff. the Flink job became available within
+ `job-recovery-grace-period` seconds after the last fault injected by the nemesis.
+ Note that some faults happen at a single point in time (e.g., killing of processes). Other faults,
+ such as network splits, happen during a period of time, and can thus be interleaving. As long as
+ there are active faults, the job is allowed not to be available."}
+ [active-nemeses ; stores active failures
+ healthy-count ; how many consecutive times was the job running?
+ last-failure ; timestamp when the last failure was injected/ended
+ healthy-threshold ; after how many times is the job considered healthy
+ job-recovery-grace-period] ; after how many seconds should the job be recovered
+ Model
+ (step [this op]
+ (case (:process op)
+ :nemesis (cond
+ (nil? (:value op)) this
+ (stoppable-op? op) (assoc
+ this
+ :active-nemeses (update active-nemeses
+ (strip-op-suffix op)
+ safe-inc))
+ (stop-op? op) (assoc
+ this
+ :active-nemeses (dissoc-if zero-value?
+ (update active-nemeses (strip-op-suffix op) dec))
+ :last-failure (:time op))
+ :else (assoc this :last-failure (:time op)))
+ (case (:f op)
+ :job-running? (case (:type op)
+ :info this ; ignore :info operations
+ :fail this ; ignore :fail operations
+ :invoke this ; ignore :invoke operations
+ :ok (if (:value op) ; check if job is running
+ (assoc ; job is running
+ this
+ :healthy-count
+ (inc healthy-count))
+ (if (and ; job is not running
+ (not (nemeses-active? active-nemeses))
+ (< healthy-count healthy-threshold)
+ (> (ju/nanos->secs (- (:time op) last-failure)) job-recovery-grace-period))
+ ; job is not running but it should be running
+ ; because grace period passed
+ (model/inconsistent "Job is not running.")
+ (conj this
+ [:healthy-count 0]))))
+ ; ignore other client operations
+ this))))
+
+(defn job-running-within-grace-period
+ [job-running-healthy-threshold job-recovery-grace-period]
+ (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold job-recovery-grace-period))
+
+(defn job-running-checker
+ []
+ (reify
+ checker/Checker
+ (check [_ test model history _]
+ (let [final (reduce model/step (assoc model :last-failure (:time (first history))) history)
+ result-map (conj {}
+ (find test :nemesis-gen)
+ (find test :deployment-mode))]
+ (if (or (model/inconsistent? final) (zero? (:healthy-count final 0)))
+ (into result-map {:valid? false
+ :error (:msg final)})
+ (into result-map {:valid? true
+ :final-model final}))))))
+
+(defn get-job-running-history
+ [history]
+ (->>
+ history
+ (remove #(= (:process %) :nemesis))
+ (remove #(= (:type %) :invoke))
+ (map :value)
+ (map boolean)
+ (remove nil?)))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/client.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/client.clj b/flink-jepsen/src/jepsen/flink/client.clj
new file mode 100644
index 0000000..905dc48
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -0,0 +1,150 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.client
+ (:require [clj-http.client :as http]
+ [clojure.tools.logging :refer :all]
+ [jepsen.client :as client]
+ [jepsen.flink.zookeeper :as fz]
+ [jepsen.flink.utils :as fu]
+ [zookeeper :as zk])
+ (:import (java.io ByteArrayInputStream ObjectInputStream)))
+
+(defn connect-zk-client!
+ [connection-string]
+ (zk/connect connection-string :timeout-msec 60000))
+
+(defn read-url
+ [bytes]
+ (with-open [object-input-stream (ObjectInputStream. (ByteArrayInputStream. bytes))]
+ (.readUTF object-input-stream)))
+
+(defn wait-for-zk-operation
+ [zk-client operation path]
+ (let [p (promise)]
+ (letfn [(iter [_]
+ (when-let [res (operation zk-client path :watcher iter)]
+ (deliver p res)))
+ ]
+ (iter nil)
+ p)))
+
+(defn wait-for-path-to-exist
+ [zk-client path]
+ (info "Waiting for path" path "in ZK.")
+ (wait-for-zk-operation zk-client zk/exists path))
+
+(defn wait-for-children-to-exist
+ [zk-client path]
+ (wait-for-zk-operation zk-client zk/children path))
+
+(defn find-application-id
+ [zk-client]
+ (do
+ (->
+ (wait-for-path-to-exist zk-client "/flink")
+ (deref))
+ (->
+ (wait-for-children-to-exist zk-client "/flink")
+ (deref)
+ (first))))
+
+(defn watch-node-bytes
+ [zk-client path callback]
+ (when (zk/exists zk-client path :watcher (fn [_] (watch-node-bytes zk-client path callback)))
+ (->>
+ (zk/data zk-client path :watcher (fn [_] (watch-node-bytes zk-client path callback)))
+ :data
+ (callback))))
+
+(defn make-job-manager-url [test]
+ (let [rest-url-atom (atom nil)
+ zk-client (connect-zk-client! (fz/zookeeper-quorum test))
+ init-future (future
+ (let [application-id (find-application-id zk-client)
+ path (str "/flink/" application-id "/leader/rest_server_lock")
+ _ (->
+ (wait-for-path-to-exist zk-client path)
+ (deref))]
+ (info "Determined application id to be" application-id)
+ (watch-node-bytes zk-client path
+ (fn [bytes]
+ (let [url (read-url bytes)]
+ (info "Leading REST url changed to" url)
+ (reset! rest-url-atom url))))))]
+ {:rest-url-atom rest-url-atom
+ :closer (fn [] (zk/close zk-client))
+ :init-future init-future}))
+
+(defn list-jobs!
+ [base-url]
+ (->>
+ (http/get (str base-url "/jobs") {:as :json})
+ :body
+ :jobs
+ (map :id)))
+
+(defn get-job-details!
+ [base-url job-id]
+ (assert base-url)
+ (assert job-id)
+ (let [job-details (->
+ (http/get (str base-url "/jobs/" job-id) {:as :json})
+ :body)]
+ (assert (:vertices job-details) "Job does not have vertices")
+ job-details))
+
+(defn job-running?
+ [base-url job-id]
+ (->>
+ (get-job-details! base-url job-id)
+ :vertices
+ (map :status)
+ (every? #(= "RUNNING" %))))
+
+(defrecord Client
+ [deploy-cluster! closer rest-url init-future job-id]
+ client/Client
+ (open! [this test node]
+ (let [{:keys [rest-url-atom closer init-future]} (make-job-manager-url test)]
+ (assoc this :closer closer :rest-url rest-url-atom :init-future init-future :job-id (atom nil))))
+
+ (setup! [this test] this)
+
+ (invoke! [this test op]
+ (case (:f op)
+ :submit (do
+ (deploy-cluster! test)
+ (deref init-future)
+ (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
+ :fallback (fn [e] (do
+ (fatal e "Could not get running jobs.")
+ (System/exit 1))))
+ num-jobs (count jobs)]
+ (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
+ (reset! job-id (first jobs)))
+ (assoc op :type :ok))
+ :job-running? (let [base-url @rest-url]
+ (if base-url
+ (try
+ (assoc op :type :ok :value (job-running? base-url @job-id))
+ (catch Exception e (do
+ (warn e "Get job details from" base-url "failed.")
+ (assoc op :type :fail))))
+ (assoc op :type :fail :value "Cluster not deployed yet.")))))
+
+ (teardown! [this test])
+ (close! [this test] (closer)))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/db.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/db.clj b/flink-jepsen/src/jepsen/flink/db.clj
new file mode 100644
index 0000000..ff934e1
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -0,0 +1,232 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.db
+ (:require [clj-http.client :as http]
+ [clojure.java.io]
+ [clojure.string :as str]
+ [clojure.tools.logging :refer :all]
+ [jepsen
+ [control :as c]
+ [db :as db]
+ [util :refer [meh]]
+ [zookeeper :as zk]]
+ [jepsen.control.util :as cu]
+ [jepsen.flink.hadoop :as hadoop]
+ [jepsen.flink.mesos :as mesos]
+ [jepsen.flink.utils :as fu]
+ [jepsen.flink.zookeeper :refer :all]))
+
+(def install-dir "/opt/flink")
+(def upload-dir "/tmp")
+(def log-dir (str install-dir "/log"))
+(def conf-file (str install-dir "/conf/flink-conf.yaml"))
+(def masters-file (str install-dir "/conf/masters"))
+
+(def default-flink-dist-url "https://archive.apache.org/dist/flink/flink-1.5.0/flink-1.5.0-bin-hadoop28-scala_2.11.tgz")
+(def hadoop-dist-url "https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz")
+(def deb-zookeeper-package "3.4.9-3+deb8u1")
+(def deb-mesos-package "1.5.0-2.0.2")
+(def deb-marathon-package "1.6.322")
+
+(def taskmanager-slots 1)
+(def master-count 1)
+
+(defn flink-configuration
+ [test]
+ {:high-availability "zookeeper"
+ :high-availability.zookeeper.quorum (zookeeper-quorum test)
+ :high-availability.storageDir (str (:ha-storage-dir test) "/ha")
+ :state.savepoints.dir (str (:ha-storage-dir test) "/savepoints")
+ :web.port 8081
+ :rest.bind-address "0.0.0.0"
+ :taskmanager.numberOfTaskSlots taskmanager-slots
+ :yarn.application-attempts 99999
+ :slotmanager.taskmanager-timeout 10000
+ :state.backend.local-recovery "true"
+ :taskmanager.registration.timeout "30 s"})
+
+(defn master-nodes
+ [test]
+ (take master-count (sort (:nodes test))))
+
+(defn write-configuration!
+ "Writes the flink-conf.yaml and masters file to the flink conf directory"
+ [test]
+ (let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
+ (seq (flink-configuration test))))
+ m (clojure.string/join "\n" (master-nodes test))]
+ (c/exec :echo c :> conf-file)
+ (c/exec :echo m :> masters-file)
+ ;; TODO: write log4j.properties properly
+ (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, file/' " install-dir "/conf/log4j.properties")))))
+
+(defn install-flink!
+ [test]
+ (let [url (:tarball test)]
+ (info "Installing Flink from" url)
+ (cu/install-archive! url install-dir)
+ (info "Enable S3 FS")
+ (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/lib")))
+ (c/upload (:job-jar test) upload-dir)
+ (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar test)))) install-dir)
+ (write-configuration! test)))
+
+(defn teardown-flink!
+ []
+ (info "Tearing down Flink")
+ (meh (c/exec :rm :-rf install-dir))
+ (meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
+
+(defn get-log-files!
+ []
+ (if (cu/exists? log-dir) (cu/ls-full log-dir) []))
+
+(defn flink-db
+ [test]
+ (reify db/DB
+ (setup! [_ test node]
+ (c/su
+ (install-flink! test)))
+
+ (teardown! [_ test node]
+ (c/su
+ (teardown-flink!)))
+
+ db/LogFiles
+ (log-files [_ test node]
+ (concat
+ (get-log-files!)))))
+
+(defn combined-db
+ [dbs]
+ (reify db/DB
+ (setup! [_ test node]
+ (c/su
+ (doall (map #(db/setup! % test node) dbs))))
+ (teardown! [_ test node]
+ (c/su
+ (doall (map #(db/teardown! % test node) dbs))))
+ db/LogFiles
+ (log-files [_ test node]
+ (flatten (map #(db/log-files % test node) dbs)))))
+
+;;; YARN
+
+(defn flink-yarn-db
+ []
+ (let [zk (zk/db deb-zookeeper-package)
+ hadoop (hadoop/db hadoop-dist-url)
+ flink (flink-db test)]
+ (combined-db [hadoop zk flink])))
+
+(defn exec-flink!
+ [test cmd args]
+ (c/su
+ (c/exec (c/lit (str
+ "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+ "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+ install-dir "/bin/flink " cmd " " args)))))
+
+(defn flink-run-cli-args
+ "Returns the CLI args that should be passed to 'flink run'"
+ [test]
+ (concat
+ ["-d"]
+ (if (:main-class test)
+ [(str "-c " (:main-class test))]
+ [])
+ (if (= :yarn-job (:deployment-mode test))
+ ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]
+ [])))
+
+(defn submit-job!
+ ([test] (submit-job! test []))
+ ([test cli-args]
+ (exec-flink! test "run" (clojure.string/join
+ " "
+ (concat cli-args
+ (flink-run-cli-args test)
+ [(str install-dir "/" (last (str/split (:job-jar test) #"/")))
+ (:job-args test)])))))
+
+(defn first-node
+ [test]
+ (-> test :nodes sort first))
+
+(defn start-yarn-session!
+ [test]
+ (let [node (first-node test)]
+ (c/on node
+ (info "Starting YARN session from" node)
+ (c/su
+ (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+ "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
+ " " install-dir "/bin/yarn-session.sh -d -jm 2048m -tm 2048m")))
+ (submit-job! test)))))
+
+(defn start-yarn-job!
+ [test]
+ (c/on (first-node test)
+ (c/su
+ (submit-job! test))))
+
+;;; Mesos
+
+(defn flink-mesos-db
+ []
+ (let [zk (zk/db deb-zookeeper-package)
+ hadoop (hadoop/db hadoop-dist-url)
+ mesos (mesos/db deb-mesos-package deb-marathon-package)
+ flink (flink-db test)]
+ (combined-db [hadoop zk mesos flink])))
+
+(defn submit-job-with-retry!
+ [test]
+ (fu/retry
+ (partial submit-job! test)
+ :fallback (fn [e] (do
+ (fatal e "Could not submit job.")
+ (System/exit 1)))))
+
+(defn start-mesos-session!
+ [test]
+ (c/su
+ (let [r (fu/retry (fn []
+ (http/post
+ (str (mesos/marathon-base-url test) "/v2/apps")
+ {:form-params {:id "flink"
+ :cmd (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+ "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+ install-dir "/bin/mesos-appmaster.sh "
+ "-Dmesos.master=" (zookeeper-uri
+ test
+ mesos/zk-namespace) " "
+ "-Djobmanager.rpc.address=$(hostname -f) "
+ "-Djobmanager.heap.mb=2048 "
+ "-Djobmanager.rpc.port=6123 "
+ "-Djobmanager.web.port=8081 "
+ "-Dmesos.resourcemanager.tasks.mem=2048 "
+ "-Dtaskmanager.heap.mb=2048 "
+ "-Dtaskmanager.numberOfTaskSlots=2 "
+ "-Dmesos.resourcemanager.tasks.cpus=1 "
+ "-Drest.bind-address=$(hostname -f) ")
+ :cpus 1.0
+ :mem 2048}
+ :content-type :json})))]
+ (info "Submitted Flink Application via Marathon" r)
+ (c/on (-> test :nodes sort first)
+ (submit-job-with-retry! test)))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/flink.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj b/flink-jepsen/src/jepsen/flink/flink.clj
new file mode 100644
index 0000000..d5d4157
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -0,0 +1,110 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.flink
+ (:require [clojure.tools.logging :refer :all]
+ [jepsen
+ [cli :as cli]
+ [generator :as gen]
+ [tests :as tests]]
+ [jepsen.os.debian :as debian]
+ [jepsen.flink.client :refer :all]
+ [jepsen.flink.checker :as flink-checker]
+ [jepsen.flink.db :as fdb]
+ [jepsen.flink.nemesis :as fn])
+ (:import (jepsen.flink.client Client)))
+
+(def flink-test-config
+ {:yarn-session {:db (fdb/flink-yarn-db)
+ :deployment-strategy fdb/start-yarn-session!}
+ :yarn-job {:db (fdb/flink-yarn-db)
+ :deployment-strategy fdb/start-yarn-job!}
+ :mesos-session {:db (fdb/flink-mesos-db)
+ :deployment-strategy fdb/start-mesos-session!}})
+
+(defn client-gen
+ []
+ (->
+ (cons {:type :invoke, :f :submit, :value nil}
+ (cycle [{:type :invoke, :f :job-running?, :value nil}
+ (gen/sleep 5)]))
+ (gen/seq)
+ (gen/singlethreaded)))
+
+(defn flink-test
+ [opts]
+ (merge tests/noop-test
+ (let [{:keys [db deployment-strategy]} (-> opts :deployment-mode flink-test-config)
+ {:keys [job-running-healthy-threshold job-recovery-grace-period]} opts]
+ {:name "Apache Flink"
+ :os debian/os
+ :db db
+ :nemesis (fn/nemesis)
+ :model (flink-checker/job-running-within-grace-period
+ job-running-healthy-threshold
+ job-recovery-grace-period)
+ :generator (let [stop (atom nil)]
+ (->> (fn/stoppable-generator stop (client-gen))
+ (gen/nemesis
+ (fn/stop-generator stop
+ ((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
+ job-running-healthy-threshold
+ job-recovery-grace-period))))
+ :client (Client. deployment-strategy nil nil nil nil)
+ :checker (flink-checker/job-running-checker)})
+ (assoc opts :concurrency 1)))
+
+(defn keys-as-allowed-values-help-text
+ "Takes a map and returns a string explaining which values are allowed.
+ This is a CLI helper function."
+ [m]
+ (->> (keys m)
+ (map name)
+ (clojure.string/join ", ")
+ (str "Must be one of: ")))
+
+(defn -main
+ [& args]
+ (cli/run!
+ (merge
+ (cli/single-test-cmd
+ {:test-fn flink-test
+ :tarball fdb/default-flink-dist-url
+ :opt-spec [[nil "--ha-storage-dir DIR" "high-availability.storageDir"]
+ [nil "--job-jar JAR" "Path to the job jar"]
+ [nil "--job-args ARGS" "CLI arguments for the flink job"]
+ [nil "--main-class CLASS" "Job main class"]
+ [nil "--nemesis-gen GEN" (str "Which nemesis should be used?"
+ (keys-as-allowed-values-help-text fn/nemesis-generator-factories))
+ :parse-fn keyword
+ :default :kill-task-managers
+ :validate [#(fn/nemesis-generator-factories (keyword %))
+ (keys-as-allowed-values-help-text fn/nemesis-generator-factories)]]
+ [nil "--deployment-mode MODE" (keys-as-allowed-values-help-text flink-test-config)
+ :parse-fn keyword
+ :default :yarn-session
+ :validate [#(flink-test-config (keyword %))
+ (keys-as-allowed-values-help-text flink-test-config)]]
+ [nil "--job-running-healthy-threshold TIMES" "Number of consecutive times the job must be running to be considered healthy."
+ :default 5
+ :parse-fn #(Long/parseLong %)
+ :validate [pos? "Must be positive"]]
+ [nil "--job-recovery-grace-period SECONDS" "Time period in which the job must become healthy."
+ :default 180
+ :parse-fn #(Long/parseLong %)
+ :validate [pos? "Must be positive" (fn [v] (<= 60 v)) "Should be greater than 60"]]]})
+ (cli/serve-cmd))
+ args))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/generator.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/generator.clj b/flink-jepsen/src/jepsen/flink/generator.clj
new file mode 100644
index 0000000..af928c4
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/generator.clj
@@ -0,0 +1,39 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.generator
+ (:require [jepsen.util :as util]
+ [jepsen.generator :as gen]))
+
+(gen/defgenerator TimeLimitGen
+ [dt source deadline-atom]
+ [dt (when-let [deadline @deadline-atom]
+ (util/nanos->secs deadline)) source]
+ (gen/op [_ test process]
+ (compare-and-set! deadline-atom nil (+ (util/linear-time-nanos)
+ (util/secs->nanos dt)))
+ (when (<= (util/linear-time-nanos) @deadline-atom)
+ (gen/op source test process))))
+
+;; In Jepsen 0.1.9 jepsen.generator/time-limit was re-written to interrupt Threads.
+;; Unfortunately the logic has race conditions which can cause spurious failures
+;; (https://github.com/jepsen-io/jepsen/issues/268).
+;;
+;; In our tests we do not need interrupts. Therefore, we use a time-limit implementation that is
+;; similar to the one shipped with Jepsen 0.1.8.
+(defn time-limit
+ [dt source]
+ (TimeLimitGen. dt source (atom nil)))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/hadoop.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/hadoop.clj b/flink-jepsen/src/jepsen/flink/hadoop.clj
new file mode 100644
index 0000000..f633d07
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/hadoop.clj
@@ -0,0 +1,139 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.hadoop
+ (:require [clojure.data.xml :as xml]
+ [clojure.tools.logging :refer :all]
+ [jepsen
+ [control :as c]
+ [db :as db]]
+ [jepsen.control.util :as cu]))
+
+(def install-dir "/opt/hadoop")
+(def hadoop-conf-dir (str install-dir "/etc/hadoop"))
+(def yarn-log-dir "/tmp/logs/yarn")
+
+(defn name-node
+ [nodes]
+ (first (sort nodes)))
+
+(defn resource-manager
+ [nodes]
+ (second (sort nodes)))
+
+(defn data-nodes
+ [nodes]
+ (drop 2 (sort nodes)))
+
+(defn yarn-site-config
+ [test]
+ {:yarn.resourcemanager.hostname (resource-manager (:nodes test))
+ :yarn.log-aggregation-enable "true"
+ :yarn.nodemanager.resource.cpu-vcores "8"
+ :yarn.resourcemanager.am.max-attempts "99999"
+ :yarn.nodemanager.log-dirs yarn-log-dir})
+
+(defn core-site-config
+ [test]
+ {:fs.defaultFS (str "hdfs://" (name-node (:nodes test)) ":9000")})
+
+(defn property-value
+ [property value]
+ (xml/element :property {}
+ [(xml/element :name {} (name property))
+ (xml/element :value {} value)]))
+
+(defn write-config!
+ [^String config-file config]
+ (info "Writing config" config-file)
+ (let [config-xml (xml/indent-str
+ (xml/element :configuration
+ {}
+ (map (fn [[k v]] (property-value k v)) (seq config))))]
+ (c/exec :echo config-xml :> config-file)
+ ))
+
+(defn start-name-node!
+ [test node]
+ (when (= node (name-node (:nodes test)))
+ (info "Start NameNode daemon.")
+ (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :start :namenode)))
+
+(defn start-name-node-formatted!
+ [test node]
+ (when (= node (name-node (:nodes test)))
+ (info "Format HDFS")
+ (c/exec (str install-dir "/bin/hdfs") :namenode :-format :-force :-clusterId "0000000")
+ (start-name-node! test node)))
+
+(defn stop-name-node!
+ []
+ (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :stop :namenode))
+
+(defn start-data-node!
+ [test node]
+ (when (some #{node} (data-nodes (:nodes test)))
+ (info "Start DataNode")
+ (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :start :datanode)))
+
+(defn start-resource-manager!
+ [test node]
+ (when (= node (resource-manager (:nodes test)))
+ (info "Start ResourceManager")
+ (c/exec (str install-dir "/sbin/yarn-daemon.sh") :--config hadoop-conf-dir :start :resourcemanager)))
+
+(defn start-node-manager!
+ [test node]
+ (when (some #{node} (data-nodes (:nodes test)))
+ (info "Start NodeManager")
+ (c/exec (str install-dir "/sbin/yarn-daemon.sh") :--config hadoop-conf-dir :start :nodemanager)))
+
+(defn find-files!
+ [dir]
+ (->>
+ (clojure.string/split (c/exec :find dir :-type :f) #"\n")
+ (remove clojure.string/blank?)))
+
+(defn db
+ [url]
+ (reify db/DB
+ (setup! [_ test node]
+ (info "Install Hadoop from" url)
+ (c/su
+ (cu/install-archive! url install-dir)
+ (write-config! (str install-dir "/etc/hadoop/yarn-site.xml") (yarn-site-config test))
+ (write-config! (str install-dir "/etc/hadoop/core-site.xml") (core-site-config test))
+ (c/exec :echo (c/lit "export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64") :>> (str install-dir "/etc/hadoop/hadoop-env.sh"))
+ (start-name-node-formatted! test node)
+ (start-data-node! test node)
+ (start-resource-manager! test node)
+ (start-node-manager! test node)))
+
+ (teardown! [_ test node]
+ (info "Teardown Hadoop")
+ (c/su
+ (cu/grepkill! "hadoop")
+ (c/exec (c/lit (str "rm -rf /tmp/hadoop-* ||:")))))
+
+ db/LogFiles
+ (log-files [_ _ _]
+ (c/su
+ (concat (find-files! (str install-dir "/logs"))
+ (if (cu/exists? yarn-log-dir)
+ (do
+ (c/exec :chmod :-R :777 yarn-log-dir)
+ (find-files! yarn-log-dir))
+ []))))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/mesos.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/mesos.clj b/flink-jepsen/src/jepsen/flink/mesos.clj
new file mode 100644
index 0000000..74b2c0d
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/mesos.clj
@@ -0,0 +1,165 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.mesos
+ (:require [clojure.tools.logging :refer :all]
+ [jepsen
+ [control :as c]
+ [db :as db]
+ [util :as util :refer [meh]]]
+ [jepsen.control.util :as cu]
+ [jepsen.os.debian :as debian]
+ [jepsen.flink.zookeeper :refer [zookeeper-uri]]))
+
+;;; Mesos
+
+(def master-count 1)
+(def master-pidfile "/var/run/mesos/master.pid")
+(def slave-pidfile "/var/run/mesos/slave.pid")
+(def master-dir "/var/lib/mesos/master")
+(def slave-dir "/var/lib/mesos/slave")
+(def log-dir "/var/log/mesos")
+(def master-bin "/usr/sbin/mesos-master")
+(def slave-bin "/usr/sbin/mesos-slave")
+(def zk-namespace :mesos)
+
+;;; Marathon
+
+(def marathon-bin "/usr/bin/marathon")
+(def zk-marathon-namespace "marathon")
+(def marathon-pidfile "/var/run/mesos/marathon.pid")
+(def marathon-rest-port 8080)
+
+(defn install!
+ [test node mesos-version marathon-version]
+ (c/su
+ (debian/add-repo! :mesosphere
+ "deb http://repos.mesosphere.com/debian jessie main"
+ "keyserver.ubuntu.com"
+ "E56151BF")
+ (debian/install {:mesos mesos-version
+ :marathon marathon-version})
+ (c/exec :mkdir :-p "/var/run/mesos")
+ (c/exec :mkdir :-p master-dir)
+ (c/exec :mkdir :-p slave-dir)))
+
+;;; Mesos functions
+
+(defn start-master!
+ [test node]
+ (when (some #{node} (take master-count (sort (:nodes test))))
+ (info node "Starting mesos master")
+ (c/su
+ (c/exec :start-stop-daemon
+ :--background
+ :--chdir master-dir
+ :--exec "/usr/bin/env"
+ :--make-pidfile
+ :--no-close
+ :--oknodo
+ :--pidfile master-pidfile
+ :--start
+ :--
+ "GLOG_v=1"
+ master-bin
+ (str "--hostname=" (name node))
+ (str "--log_dir=" log-dir)
+ (str "--offer_timeout=30secs")
+ (str "--quorum=" (util/majority master-count))
+ (str "--registry_fetch_timeout=120secs")
+ (str "--registry_store_timeout=5secs")
+ (str "--work_dir=" master-dir)
+ (str "--zk=" (zookeeper-uri test zk-namespace))
+ :>> (str log-dir "/master.stdout")
+ (c/lit "2>&1")))))
+
+(defn start-slave!
+ [test node]
+ (when-not (some #{node} (take master-count (sort (:nodes test))))
+ (info node "Starting mesos slave")
+ (c/su
+ (c/exec :start-stop-daemon :--start
+ :--background
+ :--chdir slave-dir
+ :--exec slave-bin
+ :--make-pidfile
+ :--no-close
+ :--pidfile slave-pidfile
+ :--oknodo
+ :--
+ (str "--hostname=" (name node))
+ (str "--log_dir=" log-dir)
+ (str "--master=" (zookeeper-uri test zk-namespace))
+ (str "--recovery_timeout=30secs")
+ (str "--work_dir=" slave-dir)
+ :>> (str log-dir "/slave.stdout")
+ (c/lit "2>&1")))))
+
+(defn stop-master!
+ [node]
+ (info node "Stopping mesos master")
+ (meh (c/exec :killall :-9 :mesos-master))
+ (meh (c/exec :rm :-rf master-pidfile)))
+
+(defn stop-slave!
+ [node]
+ (info node "Stopping mesos slave")
+ (meh (c/exec :killall :-9 :mesos-slave))
+ (meh (c/exec :rm :-rf slave-pidfile)))
+
+;;; Marathon functions
+
+(defn start-marathon!
+ [test node]
+ (when (= node (first (sort (:nodes test))))
+ (info "Start marathon")
+ (c/su
+ (c/exec :start-stop-daemon :--start
+ :--background
+ :--exec marathon-bin
+ :--make-pidfile
+ :--no-close
+ :--pidfile marathon-pidfile
+ :--
+ (c/lit (str "--hostname " node))
+ (c/lit (str "--master " (zookeeper-uri test zk-namespace)))
+ (c/lit (str "--zk " (zookeeper-uri test zk-marathon-namespace)))
+ :>> (str log-dir "/marathon.stdout")
+ (c/lit "2>&1")))))
+
+(defn stop-marathon!
+ []
+ (cu/grepkill! "marathon"))
+
+(defn marathon-base-url
+ [test]
+ (str "http://" (first (sort (:nodes test))) ":" marathon-rest-port))
+
+(defn db
+ [mesos-version marathon-version]
+ (reify db/DB
+ (setup! [this test node]
+ (install! test node mesos-version marathon-version)
+ (start-master! test node)
+ (start-slave! test node)
+ (start-marathon! test node))
+ (teardown! [this test node]
+ (stop-slave! node)
+ (stop-master! node)
+ (stop-marathon!))
+ db/LogFiles
+ (log-files [_ test node]
+ (if (cu/exists? log-dir) (cu/ls-full log-dir) []))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/nemesis.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj b/flink-jepsen/src/jepsen/flink/nemesis.clj
new file mode 100644
index 0000000..3047eeb
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -0,0 +1,163 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.nemesis
+ (:require [clojure.tools.logging :refer :all]
+ [jepsen
+ [control :as c]
+ [generator :as gen]
+ [nemesis :as nemesis]
+ [util :as ju]]
+ [jepsen.control.util :as cu]
+ [jepsen.flink.client :refer :all]
+ [jepsen.flink.checker :as flink-checker]
+ [jepsen.flink.generator :as fgen]
+ [jepsen.flink.hadoop :as fh]
+ [jepsen.flink.zookeeper :refer :all]))
+
+(def job-submit-grace-period
+ "Period after job submission in which job managers must not fail."
+ 60)
+
+(defn kill-processes
+ ([pattern] (kill-processes rand-nth pattern))
+ ([targeter pattern]
+ (reify nemesis/Nemesis
+ (setup! [this test] this)
+ (invoke! [this test op]
+ (let [nodes (-> test :nodes targeter ju/coll)]
+ (c/on-many nodes
+ (c/su (cu/grepkill! pattern)))
+ (assoc op :value nodes)))
+ (teardown! [this test]))))
+
+(defn- non-empty-random-sample
+ [coll]
+ (let [sample (random-sample 0.5 coll)]
+ (if (empty? sample)
+ (first (shuffle coll))
+ sample)))
+
+(defn kill-taskmanager
+ ([] (kill-taskmanager identity))
+ ([targeter]
+ (kill-processes targeter "TaskExecutorRunner")))
+
+(defn kill-jobmanager
+ []
+ (kill-processes identity "ClusterEntrypoint"))
+
+(defn start-stop-name-node
+ "Nemesis stopping and starting the HDFS NameNode."
+ []
+ (nemesis/node-start-stopper
+ fh/name-node
+ (fn [test node] (c/su (fh/stop-name-node!)))
+ (fn [test node] (c/su (fh/start-name-node! test node)))))
+
+;;; Generators
+
+(defn stoppable-generator
+ [stop source]
+ (reify gen/Generator
+ (op [gen test process]
+ (if @stop
+ nil
+ (gen/op source test process)))))
+
+(defn take-last-with-default
+ [n default coll]
+ (->>
+ (cycle [default])
+ (concat (reverse coll))
+ (take n)
+ (reverse)))
+
+(defn stop-generator
+ [stop source job-running-healthy-threshold job-recovery-grace-period]
+ (gen/concat source
+ (let [t (atom nil)]
+ (reify gen/Generator
+ (op [_ test process]
+ (when (nil? @t)
+ (compare-and-set! t nil (ju/relative-time-nanos)))
+ (let [history (->>
+ (:active-histories test)
+ deref
+ first
+ deref)
+ job-running-history (->>
+ history
+ (filter (fn [op] (>= (- (:time op) @t) 0)))
+ (flink-checker/get-job-running-history)
+ (take-last-with-default job-running-healthy-threshold false))]
+ (if (or
+ (and
+ (every? true? job-running-history))
+ (> (ju/relative-time-nanos) (+ @t (ju/secs->nanos job-recovery-grace-period))))
+ (do
+ (reset! stop true)
+ nil)
+ (do
+ (Thread/sleep 1000)
+ (recur test process)))))))))
+
+(defn kill-taskmanagers-gen
+ [time-limit dt op]
+ (fgen/time-limit time-limit (gen/stagger dt (gen/seq (cycle [{:type :info, :f op}])))))
+
+(defn kill-taskmanagers-bursts-gen
+ [time-limit]
+ (fgen/time-limit time-limit
+ (gen/seq (cycle (concat (repeat 20 {:type :info, :f :kill-task-managers})
+ [(gen/sleep 300)])))))
+
+(defn kill-jobmanagers-gen
+ [time-limit]
+ (fgen/time-limit (+ time-limit job-submit-grace-period)
+ (gen/seq (cons (gen/sleep job-submit-grace-period)
+ (cycle [{:type :info, :f :kill-job-manager}])))))
+
+(defn fail-name-node-during-recovery
+ []
+ (gen/seq [(gen/sleep job-submit-grace-period)
+ {:type :info, :f :partition-start}
+ {:type :info, :f :fail-name-node-start}
+ (gen/sleep 20)
+ {:type :info, :f :partition-stop}
+ (gen/sleep 60)
+ {:type :info, :f :fail-name-node-stop}]))
+
+(def nemesis-generator-factories
+ {:kill-task-managers (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-task-managers))
+ :kill-single-task-manager (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-single-task-manager))
+ :kill-random-task-managers (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-random-task-managers))
+ :kill-task-managers-bursts (fn [opts] (kill-taskmanagers-bursts-gen (:time-limit opts)))
+ :kill-job-managers (fn [opts] (kill-jobmanagers-gen (:time-limit opts)))
+ :fail-name-node-during-recovery (fn [_] (fail-name-node-during-recovery))
+ :utopia (fn [_] (gen/sleep 60))})
+
+(defn nemesis
+ []
+ (nemesis/compose
+ {{:partition-start :start
+ :partition-stop :stop} (nemesis/partition-random-halves)
+ {:fail-name-node-start :start
+ :fail-name-node-stop :stop} (start-stop-name-node)
+ {:kill-task-managers :start} (kill-taskmanager)
+ {:kill-single-task-manager :start} (kill-taskmanager (fn [coll] (rand-nth coll)))
+ {:kill-random-task-managers :start} (kill-taskmanager non-empty-random-sample)
+ {:kill-job-manager :start} (kill-jobmanager)}))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/utils.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj b/flink-jepsen/src/jepsen/flink/utils.clj
new file mode 100644
index 0000000..3fd9f96
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -0,0 +1,48 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.utils
+ (:require [clojure.tools.logging :refer :all]))
+
+(defn retry
+ "Runs a function op and retries on exception.
+
+ The following options are supported:
+
+ :on-retry - A function called for every retry with an exception and the attempt number as arguments.
+ :success - A function called with the result of op.
+ :fallback – A function with an exception as the first argument that is called if all retries are exhausted.
+ :retries - Number of total retries.
+ :delay – The time between retries."
+ ([op & {:keys [on-retry success fallback retries delay]
+ :or {on-retry (fn [exception attempt] (warn "Retryable operation failed:"
+ (.getMessage exception)))
+ success identity
+ fallback :default
+ retries 10
+ delay 2000}
+ :as keys}]
+ (let [r (try
+ (op)
+ (catch Exception e (if (< 0 retries)
+ {:exception e}
+ (fallback e))))]
+ (if (:exception r)
+ (do
+ (on-retry (:exception r) retries)
+ (Thread/sleep delay)
+ (recur op (assoc keys :retries (dec retries))))
+ (success r)))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/zookeeper.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/zookeeper.clj b/flink-jepsen/src/jepsen/flink/zookeeper.clj
new file mode 100644
index 0000000..8b4c319
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/zookeeper.clj
@@ -0,0 +1,29 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.zookeeper)
+
+(defn zookeeper-quorum
+ "Returns the zk quorum string, e.g., host1:2181,host2:2181"
+ [test]
+ (->> test
+ :nodes
+ (map #(str % ":2181"))
+ (clojure.string/join ",")))
+
+(defn zookeeper-uri
+ ([test] (zookeeper-uri test ""))
+ ([test namespace] (str "zk://" (zookeeper-quorum test) "/" (name namespace))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/checker_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj b/flink-jepsen/test/jepsen/flink/checker_test.clj
new file mode 100644
index 0000000..7389bbc
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -0,0 +1,82 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.checker-test
+ (:require [clojure.test :refer :all]
+ [jepsen
+ [checker :as checker]]
+ [jepsen.flink.checker :refer :all]))
+
+(deftest get-job-running-history-test
+ (let [history [{:type :info, :f :kill-random-subset-task-managers, :process :nemesis, :time 121898381144, :value '("172.31.33.170")}
+ {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127443701575}
+ {:type :ok, :f :job-running?, :value false, :process 0, :time 127453553462}
+ {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127453553463}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 127453553464}]]
+ (is (= (get-job-running-history history) [false true]))))
+
+(deftest job-running-checker-test
+ (let [checker (job-running-checker)
+ test {}
+ model (job-running-within-grace-period 3 60)
+ opts {}
+ check (fn [history] (checker/check checker test model history opts))]
+ (testing "Job is not running after grace period."
+ (is (= (:valid? (check
+ [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+ {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}])) false)))
+ (testing "Job is running after grace period."
+ (is (= (:valid? (check
+ [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000001}])) true)))
+ (testing "Should tolerate non-running job during failures."
+ (is (= (:valid? (check
+ [{:type :info, :f :partition-start, :process :nemesis, :time -1}
+ {:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
+ {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
+ {:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
+ {:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true)))
+ (testing "Should respect healthy threshold."
+ (is (= (:valid? (check
+ [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
+ {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000003}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true))
+ (is (= (:valid? (check
+ [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+ {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000002}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) false)))
+ (testing "Job was not deployed successfully."
+ (is (= (:valid? (check [{:type :invoke, :f :job-running?, :value nil, :process 45, :time 239150413307}
+ {:type :info, :f :job-running?, :value nil, :process 45, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])) false)))))
+
+(deftest safe-inc-test
+ (is (= (safe-inc nil) 1))
+ (is (= (safe-inc 1) 2)))
+
+(deftest nemeses-active?-test
+ (is (= (nemeses-active? {:partition-start 2 :fail-name-node-start 0}) true))
+ (is (= (nemeses-active? {:partition-start 0}) false)))
+
+(deftest dissoc-if-test
+ (is (= (:a (dissoc-if #(-> (first %) (= :b)) {:a 1 :b 2})) 1)))
+
+(deftest zero-value?-test
+ (is (= (zero-value? [:test 0]) true))
+ (is (= (zero-value? [:test 1]) false)))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/client_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/client_test.clj b/flink-jepsen/test/jepsen/flink/client_test.clj
new file mode 100644
index 0000000..b4373bf
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/client_test.clj
@@ -0,0 +1,37 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.client-test
+ (:require [clojure.test :refer :all]
+ [clj-http.fake :as fake]
+ [jepsen.flink.client :refer :all]))
+
+(deftest read-url-test
+ (is (= "https://www.asdf.de" (read-url (byte-array [0xAC 0xED 0x00 0x05 0x77 0x15 0x00 0x13 0x68 0x74 0x74 0x70 0x73 0x3A 0x2F 0x2F 0x77 0x77 0x77 0x2E 0x61 0x73 0x64 0x66 0x2E 0x64 0x65])))))
+
+(deftest job-running?-test
+ (fake/with-fake-routes
+ {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c"
+ (fn [request] {:status 200
+ :headers {}
+ :body "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"w
rite-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd544
06c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -> Flat Map\",\"optimizer_properties\":{}}]}}"})
+ "http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196d"
+ (fn [request] {:status 200
+ :headers {}
+ :body "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"CREATED\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"w
rite-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd544
06c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -> Flat Map\",\"optimizer_properties\":{}}]}}"})}
+
+ (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196c") true))
+ (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196d") false))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/utils_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/utils_test.clj b/flink-jepsen/test/jepsen/flink/utils_test.clj
new file mode 100644
index 0000000..607f90d
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/utils_test.clj
@@ -0,0 +1,39 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.utils-test
+ (:require [clojure.test :refer :all])
+ (:require [jepsen.flink.utils :refer [retry]]))
+
+(deftest retry-test
+ (testing "Single failure then result."
+ (let [counter (atom 0)
+ failing-once (fn [] (if (= @counter 0)
+ (do (swap! counter inc)
+ (throw (Exception. "Expected")))
+ "result"))]
+ (is (= "result" (retry failing-once :delay 0)))))
+
+ (testing "Exhaust all attempts."
+ (let [failing-always (fn [] (throw (Exception. "Expected")))]
+ (is (nil? (retry failing-always :retries 1 :delay 0)))))
+
+ (testing "Propagate exception."
+ (let [failing-always (fn [] (throw (Exception. "Expected")))]
+ (is (thrown-with-msg? Exception #"Expected" (retry failing-always
+ :retries 1
+ :delay 0
+ :fallback (fn [e] (throw e))))))))
[3/6] flink git commit: [FLINK-9004][tests] Implement Jepsen tests to
test job availability.
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/zookeeper_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/zookeeper_test.clj b/flink-jepsen/test/jepsen/flink/zookeeper_test.clj
new file mode 100644
index 0000000..b5cc3d5
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/zookeeper_test.clj
@@ -0,0 +1,28 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.zookeeper-test
+ (:require [clojure.test :refer :all])
+ (:require [jepsen.flink.zookeeper :refer :all]))
+
+(deftest zookeeper-quorum-test
+ (is (= (zookeeper-quorum {:nodes ["n1" "n2" "n3"]}) "n1:2181,n2:2181,n3:2181")))
+
+(deftest zookeeper-uri-test
+ (is (= (zookeeper-uri {:nodes ["n1" "n2" "n3"]}) "zk://n1:2181,n2:2181,n3:2181/"))
+ (is (= (zookeeper-uri {:nodes ["n1"]}) "zk://n1:2181/"))
+ (is (= (zookeeper-uri {:nodes ["n1" "n2" "n3"]} :mesos) "zk://n1:2181,n2:2181,n3:2181/mesos"))
+ (is (= (zookeeper-uri {:nodes ["n1" "n2" "n3"]} "mesos") "zk://n1:2181,n2:2181,n3:2181/mesos")))
[5/6] flink git commit: [FLINK-9807][tests] Parameterize
EventTimeWindowCheckpointITCase & LocalRecoveryITCase
Posted by tr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index e153b4b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for memory backend.
- */
-public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.MEM;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index e6d5b9e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for fully synchronous RocksDB backend.
- */
-public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.ROCKSDB_FULLY_ASYNC;
- }
-
- @Override
- protected int numElementsPerKey() {
- return 3000;
- }
-
- @Override
- protected int windowSize() {
- return 1000;
- }
-
- @Override
- protected int windowSlide() {
- return 100;
- }
-
- @Override
- protected int numKeys() {
- return 100;
- }
-}
[6/6] flink git commit: [FLINK-9807][tests] Parameterize
EventTimeWindowCheckpointITCase & LocalRecoveryITCase
Posted by tr...@apache.org.
[FLINK-9807][tests] Parameterize EventTimeWindowCheckpointITCase & LocalRecoveryITCase
This closes #6305.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc49801d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc49801d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc49801d
Branch: refs/heads/master
Commit: fc49801d4723413d3a09ecb85d60d39078056c68
Parents: 37abf46
Author: klion26 <qc...@gmail.com>
Authored: Sun Jul 8 11:52:41 2018 +0800
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200
----------------------------------------------------------------------
...tractEventTimeWindowCheckpointingITCase.java | 889 ------------------
.../AbstractLocalRecoveryITCase.java | 99 --
...ckendEventTimeWindowCheckpointingITCase.java | 30 -
...ckendEventTimeWindowCheckpointingITCase.java | 29 -
.../EventTimeAllWindowCheckpointingITCase.java | 2 +-
.../EventTimeWindowCheckpointingITCase.java | 931 +++++++++++++++++++
...ckendEventTimeWindowCheckpointingITCase.java | 29 -
...ckendEventTimeWindowCheckpointingITCase.java | 50 -
...ckendEventTimeWindowCheckpointingITCase.java | 50 -
.../checkpointing/LocalRecoveryHeapITCase.java | 30 -
.../test/checkpointing/LocalRecoveryITCase.java | 110 +++
.../LocalRecoveryRocksDBFullITCase.java | 30 -
.../LocalRecoveryRocksDBIncrementalITCase.java | 30 -
...ckendEventTimeWindowCheckpointingITCase.java | 30 -
...ckendEventTimeWindowCheckpointingITCase.java | 50 -
15 files changed, 1042 insertions(+), 1347 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index df74450..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,889 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.test.util.MiniClusterResource;
-import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.SuccessException;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-
-import org.apache.curator.test.TestingServer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-import static org.apache.flink.test.util.TestUtils.tryExecute;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * This verifies that checkpointing works correctly with event time windows. This is more
- * strict than {@link WindowCheckpointingITCase} because for event-time the contents
- * of the emitted windows are deterministic.
- *
- * <p>Split into multiple test classes in order to decrease the runtime per backend
- * and not run into CI infrastructure limits like no std output being emitted for
- * I/O heavy variants.
- */
-@SuppressWarnings("serial")
-public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLogger {
-
- private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
- private static final int PARALLELISM = 4;
-
- private TestingServer zkServer;
-
- public MiniClusterResource miniClusterResource;
-
- @ClassRule
- public static TemporaryFolder tempFolder = new TemporaryFolder();
-
- @Rule
- public TestName name = new TestName();
-
- private AbstractStateBackend stateBackend;
-
- enum StateBackendEnum {
- MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
- }
-
- protected abstract StateBackendEnum getStateBackend();
-
- protected final MiniClusterResource getMiniClusterResource() {
- return new MiniClusterResource(
- new MiniClusterResourceConfiguration.Builder()
- .setConfiguration(getConfigurationSafe())
- .setNumberTaskManagers(2)
- .setNumberSlotsPerTaskManager(PARALLELISM / 2)
- .build());
- }
-
- private Configuration getConfigurationSafe() {
- try {
- return getConfiguration();
- } catch (Exception e) {
- throw new AssertionError("Could not initialize test.", e);
- }
- }
-
- private Configuration getConfiguration() throws Exception {
-
- // print a message when starting a test method to avoid Travis' <tt>"Maven produced no
- // output for xxx seconds."</tt> messages
- System.out.println(
- "Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
-
- // Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
- StateBackendEnum stateBackendEnum = getStateBackend();
- if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
- zkServer = new TestingServer();
- zkServer.start();
- }
-
- Configuration config = createClusterConfig();
-
- switch (stateBackendEnum) {
- case MEM:
- this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
- break;
- case FILE: {
- String backups = tempFolder.newFolder().getAbsolutePath();
- this.stateBackend = new FsStateBackend("file://" + backups, false);
- break;
- }
- case MEM_ASYNC:
- this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
- break;
- case FILE_ASYNC: {
- String backups = tempFolder.newFolder().getAbsolutePath();
- this.stateBackend = new FsStateBackend("file://" + backups, true);
- break;
- }
- case ROCKSDB_FULLY_ASYNC: {
- String rocksDb = tempFolder.newFolder().getAbsolutePath();
- String backups = tempFolder.newFolder().getAbsolutePath();
- RocksDBStateBackend rdb = new RocksDBStateBackend(new FsStateBackend("file://" + backups));
- rdb.setDbStoragePath(rocksDb);
- this.stateBackend = rdb;
- break;
- }
- case ROCKSDB_INCREMENTAL:
- case ROCKSDB_INCREMENTAL_ZK: {
- String rocksDb = tempFolder.newFolder().getAbsolutePath();
- String backups = tempFolder.newFolder().getAbsolutePath();
- // we use the fs backend with small threshold here to test the behaviour with file
- // references, not self contained byte handles
- RocksDBStateBackend rdb =
- new RocksDBStateBackend(
- new FsStateBackend(
- new Path("file://" + backups).toUri(), 16),
- true);
- rdb.setDbStoragePath(rocksDb);
- this.stateBackend = rdb;
- break;
- }
- default:
- throw new IllegalStateException("No backend selected.");
- }
- return config;
- }
-
- protected Configuration createClusterConfig() throws IOException {
- TemporaryFolder temporaryFolder = new TemporaryFolder();
- temporaryFolder.create();
- final File haDir = temporaryFolder.newFolder();
-
- Configuration config = new Configuration();
- config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
- // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
- config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
- config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
-
- if (zkServer != null) {
- config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
- config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
- config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
- }
- return config;
- }
-
- @Before
- public void setupTestCluster() throws Exception {
- miniClusterResource = getMiniClusterResource();
- miniClusterResource.before();
- }
-
- @After
- public void stopTestCluster() throws IOException {
- if (miniClusterResource != null) {
- miniClusterResource.after();
- miniClusterResource = null;
- }
-
- if (zkServer != null) {
- zkServer.stop();
- zkServer = null;
- }
-
- //Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no output
- // for xxx seconds."</tt> messages.
- System.out.println(
- "Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
- }
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testTumblingTimeWindow() {
- final int numElementsPerKey = numElementsPerKey();
- final int windowSize = windowSize();
- final int numKeys = numKeys();
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
- env.getConfig().setUseSnapshotCompression(true);
-
- env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(windowSize, MILLISECONDS))
- .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- int sum = 0;
- long key = -1;
-
- for (Tuple2<Long, IntType> value : values) {
- sum += value.f1.value;
- key = value.f0;
- }
- out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
- }
- })
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
- doTestTumblingTimeWindowWithKVState(PARALLELISM);
- }
-
- @Test
- public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
- doTestTumblingTimeWindowWithKVState(1 << 15);
- }
-
- public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
- final int numElementsPerKey = numElementsPerKey();
- final int windowSize = windowSize();
- final int numKeys = numKeys();
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.setMaxParallelism(maxParallelism);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
- env.getConfig().setUseSnapshotCompression(true);
-
- env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(windowSize, MILLISECONDS))
- .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- private ValueState<Integer> count;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- count = getRuntimeContext().getState(
- new ValueStateDescriptor<>("count", Integer.class, 0));
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
-
- // the window count state starts with the key, so that we get
- // different count results for each key
- if (count.value() == 0) {
- count.update(tuple.<Long>getField(0).intValue());
- }
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- count.update(count.value() + 1);
- out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
- }
- })
- .addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSlidingTimeWindow() {
- final int numElementsPerKey = numElementsPerKey();
- final int windowSize = windowSize();
- final int windowSlide = windowSlide();
- final int numKeys = numKeys();
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setMaxParallelism(2 * PARALLELISM);
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
- env.getConfig().setUseSnapshotCompression(true);
-
- env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
- .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> values,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- int sum = 0;
- long key = -1;
-
- for (Tuple2<Long, IntType> value : values) {
- sum += value.f1.value;
- key = value.f0;
- }
- out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
- }
- })
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPreAggregatedTumblingTimeWindow() {
- final int numElementsPerKey = numElementsPerKey();
- final int windowSize = windowSize();
- final int numKeys = numKeys();
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
- env.getConfig().setUseSnapshotCompression(true);
-
- env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(windowSize, MILLISECONDS))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a,
- Tuple2<Long, IntType> b) {
- return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
- }
- },
- new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> input,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> in: input) {
- out.collect(new Tuple4<>(in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1));
- }
- }
- })
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPreAggregatedSlidingTimeWindow() {
- final int numElementsPerKey = numElementsPerKey();
- final int windowSize = windowSize();
- final int windowSlide = windowSlide();
- final int numKeys = numKeys();
- FailingSource.reset();
-
- try {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(PARALLELISM);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.enableCheckpointing(100);
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
- env.getConfig().disableSysoutLogging();
- env.setStateBackend(this.stateBackend);
- env.getConfig().setUseSnapshotCompression(true);
-
- env
- .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
- .rebalance()
- .keyBy(0)
- .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
- .reduce(
- new ReduceFunction<Tuple2<Long, IntType>>() {
-
- @Override
- public Tuple2<Long, IntType> reduce(
- Tuple2<Long, IntType> a,
- Tuple2<Long, IntType> b) {
-
- // validate that the function has been opened properly
- return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
- }
- },
- new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
-
- private boolean open = false;
-
- @Override
- public void open(Configuration parameters) {
- assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
- open = true;
- }
-
- @Override
- public void apply(
- Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<Long, IntType>> input,
- Collector<Tuple4<Long, Long, Long, IntType>> out) {
-
- // validate that the function has been opened properly
- assertTrue(open);
-
- for (Tuple2<Long, IntType> in: input) {
- out.collect(new Tuple4<>(in.f0,
- window.getStart(),
- window.getEnd(),
- in.f1));
- }
- }
- })
- .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
-
- tryExecute(env, "Tumbling Window Test");
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
- implements ListCheckpointed<Integer>, CheckpointListener {
- private static volatile boolean failedBefore = false;
-
- private final int numKeys;
- private final int numElementsToEmit;
- private final int failureAfterNumElements;
-
- private volatile int numElementsEmitted;
- private volatile int numSuccessfulCheckpoints;
- private volatile boolean running = true;
-
- private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
- this.numKeys = numKeys;
- this.numElementsToEmit = numElementsToEmitPerKey;
- this.failureAfterNumElements = failureAfterNumElements;
- }
-
- @Override
- public void open(Configuration parameters) {
- // non-parallel source
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
- // we loop longer than we have elements, to permit delayed checkpoints
- // to still cause a failure
- while (running) {
-
- if (!failedBefore) {
- // delay a bit, if we have not failed before
- Thread.sleep(1);
- if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
- // cause a failure if we have not failed before and have reached
- // enough completed checkpoints and elements
- failedBefore = true;
- throw new Exception("Artificial Failure");
- }
- }
-
- if (numElementsEmitted < numElementsToEmit &&
- (failedBefore || numElementsEmitted <= failureAfterNumElements)) {
- // the function failed before, or we are in the elements before the failure
- synchronized (ctx.getCheckpointLock()) {
- int next = numElementsEmitted++;
- for (long i = 0; i < numKeys; i++) {
- ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
- }
- ctx.emitWatermark(new Watermark(next));
- }
- }
- else {
-
- // if our work is done, delay a bit to prevent busy waiting
- Thread.sleep(1);
- }
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) {
- numSuccessfulCheckpoints++;
- }
-
- @Override
- public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.numElementsEmitted);
- }
-
- @Override
- public void restoreState(List<Integer> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- this.numElementsEmitted = state.get(0);
- }
-
- public static void reset() {
- failedBefore = false;
- }
- }
-
- private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements ListCheckpointed<HashMap<Long, Integer>> {
-
- private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
- private final int numKeys;
- private final int numWindowsExpected;
-
- private ValidatingSink(int numKeys, int numWindowsExpected) {
- this.numKeys = numKeys;
- this.numWindowsExpected = numWindowsExpected;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- // this sink can only work with DOP 1
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
-
- // it can happen that a checkpoint happens when the complete success state is
- // already set. In that case we restart with the final state and would never
- // finish because no more elements arrive.
- if (windowCounts.size() == numKeys) {
- boolean seenAll = true;
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount != numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- if (seenAll) {
- throw new SuccessException();
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- }
- assertTrue("The sink must see all expected windows.", seenAll);
- }
-
- @Override
- public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
- // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
- // the sum should be "sum (start .. end-1)"
-
- int expectedSum = 0;
- for (long i = value.f1; i < value.f2; i++) {
- // only sum up positive vals, to filter out the negative start of the
- // first sliding windows
- if (i > 0) {
- expectedSum += i;
- }
- }
-
- assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
-
- Integer curr = windowCounts.get(value.f0);
- if (curr != null) {
- windowCounts.put(value.f0, curr + 1);
- }
- else {
- windowCounts.put(value.f0, 1);
- }
-
- if (windowCounts.size() == numKeys) {
- boolean seenAll = true;
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- } else if (windowCount > numWindowsExpected) {
- fail("Window count to high: " + windowCount);
- }
- }
-
- if (seenAll) {
- // exit
- throw new SuccessException();
- }
-
- }
- }
-
- @Override
- public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.windowCounts);
- }
-
- @Override
- public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- windowCounts.putAll(state.get(0));
- }
- }
-
- // Sink for validating the stateful window counts
- private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
- implements ListCheckpointed<HashMap<Long, Integer>> {
-
- private final HashMap<Long, Integer> windowCounts = new HashMap<>();
-
- private final int numKeys;
- private final int numWindowsExpected;
-
- private CountValidatingSink(int numKeys, int numWindowsExpected) {
- this.numKeys = numKeys;
- this.numWindowsExpected = numWindowsExpected;
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- // this sink can only work with DOP 1
- assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
- }
-
- @Override
- public void close() throws Exception {
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- }
- }
- }
- assertTrue("The source must see all expected windows.", seenAll);
- }
-
- @Override
- public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
-
- Integer curr = windowCounts.get(value.f0);
- if (curr != null) {
- windowCounts.put(value.f0, curr + 1);
- }
- else {
- windowCounts.put(value.f0, 1);
- }
-
- // verify the contents of that window, the contents should be:
- // (key + num windows so far)
-
- assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
-
- boolean seenAll = true;
- if (windowCounts.size() == numKeys) {
- for (Integer windowCount: windowCounts.values()) {
- if (windowCount < numWindowsExpected) {
- seenAll = false;
- break;
- } else if (windowCount > numWindowsExpected) {
- fail("Window count to high: " + windowCount);
- }
- }
-
- if (seenAll) {
- // exit
- throw new SuccessException();
- }
-
- }
- }
-
- @Override
- public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
- return Collections.singletonList(this.windowCounts);
- }
-
- @Override
- public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
- if (state.isEmpty() || state.size() > 1) {
- throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
- }
- this.windowCounts.putAll(state.get(0));
- }
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- private static class IntType {
-
- public int value;
-
- public IntType() {}
-
- public IntType(int value) {
- this.value = value;
- }
- }
-
- protected int numElementsPerKey() {
- return 300;
- }
-
- protected int windowSize() {
- return 100;
- }
-
- protected int windowSlide() {
- return 100;
- }
-
- protected int numKeys() {
- return 20;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
deleted file mode 100644
index 4e454d7..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractLocalRecoveryITCase.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.io.IOException;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum;
-
-/**
- * This test delegates to instances of {@link AbstractEventTimeWindowCheckpointingITCase} that have been reconfigured
- * to use local recovery.
- *
- * <p>TODO: This class must be refactored to properly extend {@link AbstractEventTimeWindowCheckpointingITCase}.
- */
-public abstract class AbstractLocalRecoveryITCase extends TestLogger {
-
- private final StateBackendEnum backendEnum;
- private final boolean localRecoveryEnabled;
-
- @Rule
- public TestName testName = new TestName();
-
- AbstractLocalRecoveryITCase(StateBackendEnum backendEnum, boolean localRecoveryEnabled) {
- this.backendEnum = backendEnum;
- this.localRecoveryEnabled = localRecoveryEnabled;
- }
-
- @Test
- public final void executeTest() throws Exception {
- AbstractEventTimeWindowCheckpointingITCase.tempFolder.create();
- AbstractEventTimeWindowCheckpointingITCase windowChkITCase =
- new AbstractEventTimeWindowCheckpointingITCase() {
- @Override
- protected StateBackendEnum getStateBackend() {
- return backendEnum;
- }
-
- @Override
- protected Configuration createClusterConfig() throws IOException {
- Configuration config = super.createClusterConfig();
-
- config.setBoolean(
- CheckpointingOptions.LOCAL_RECOVERY,
- localRecoveryEnabled);
-
- return config;
- }
- };
-
- executeTest(windowChkITCase);
- }
-
- private void executeTest(AbstractEventTimeWindowCheckpointingITCase delegate) throws Exception {
- delegate.name = testName;
- try {
- delegate.setupTestCluster();
- try {
- delegate.testTumblingTimeWindow();
- delegate.stopTestCluster();
- } catch (Exception e) {
- delegate.stopTestCluster();
- }
-
- delegate.setupTestCluster();
- try {
- delegate.testSlidingTimeWindow();
- delegate.stopTestCluster();
- } catch (Exception e) {
- delegate.stopTestCluster();
- }
- } finally {
- delegate.tempFolder.delete();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index c4b06d4..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for asynchronous file backend.
- */
-public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.FILE_ASYNC;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index 2cc5b01..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for asynchronous memory backend.
- */
-public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.MEM_ASYNC;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index d05bafb..9e14b26 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -58,7 +58,7 @@ import static org.junit.Assert.fail;
/**
* This verifies that checkpointing works correctly with event time windows.
*
- * <p>This is a version of {@link AbstractEventTimeWindowCheckpointingITCase} for All-Windows.
+ * <p>This is a version of {@link EventTimeWindowCheckpointingITCase} for All-Windows.
*/
@SuppressWarnings("serial")
public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..c3d93d7
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,931 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.test.util.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.MEM_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
+import static org.apache.flink.test.util.TestUtils.tryExecute;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This verifies that checkpointing works correctly with event time windows. This is more
+ * strict than {@link WindowCheckpointingITCase} because for event-time the contents
+ * of the emitted windows are deterministic.
+ *
+ * <p>Split into multiple test classes in order to decrease the runtime per backend
+ * and not run into CI infrastructure limits like no std output being emitted for
+ * I/O heavy variants.
+ */
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class EventTimeWindowCheckpointingITCase extends TestLogger {
+
+ private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
+ private static final int PARALLELISM = 4;
+
+ private TestingServer zkServer;
+
+ public MiniClusterResource miniClusterResource;
+
+ @ClassRule
+ public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Rule
+ public TestName name = new TestName();
+
+ private AbstractStateBackend stateBackend;
+
+ @Parameterized.Parameter
+ public StateBackendEnum stateBackendEnum;
+
+ enum StateBackendEnum {
+ MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
+ }
+
+ @Parameterized.Parameters(name = "statebackend type ={0}")
+ public static Collection<StateBackendEnum> parameter() {
+ return Arrays.asList(StateBackendEnum.values());
+ }
+
+ protected StateBackendEnum getStateBackend() {
+ return this.stateBackendEnum;
+ }
+
+ protected final MiniClusterResource getMiniClusterResource() {
+ return new MiniClusterResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfigurationSafe())
+ .setNumberTaskManagers(2)
+ .setNumberSlotsPerTaskManager(PARALLELISM / 2)
+ .build());
+ }
+
+ private Configuration getConfigurationSafe() {
+ try {
+ return getConfiguration();
+ } catch (Exception e) {
+ throw new AssertionError("Could not initialize test.", e);
+ }
+ }
+
+ private Configuration getConfiguration() throws Exception {
+
+ // print a message when starting a test method to avoid Travis' <tt>"Maven produced no
+ // output for xxx seconds."</tt> messages
+ System.out.println(
+ "Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
+
+ // Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
+ StateBackendEnum stateBackendEnum = getStateBackend();
+ if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
+ zkServer = new TestingServer();
+ zkServer.start();
+ }
+
+ Configuration config = createClusterConfig();
+
+ switch (stateBackendEnum) {
+ case MEM:
+ this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
+ break;
+ case FILE: {
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ this.stateBackend = new FsStateBackend("file://" + backups, false);
+ break;
+ }
+ case MEM_ASYNC:
+ this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
+ break;
+ case FILE_ASYNC: {
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ this.stateBackend = new FsStateBackend("file://" + backups, true);
+ break;
+ }
+ case ROCKSDB_FULLY_ASYNC: {
+ String rocksDb = tempFolder.newFolder().getAbsolutePath();
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ RocksDBStateBackend rdb = new RocksDBStateBackend(new FsStateBackend("file://" + backups));
+ rdb.setDbStoragePath(rocksDb);
+ this.stateBackend = rdb;
+ break;
+ }
+ case ROCKSDB_INCREMENTAL:
+ case ROCKSDB_INCREMENTAL_ZK: {
+ String rocksDb = tempFolder.newFolder().getAbsolutePath();
+ String backups = tempFolder.newFolder().getAbsolutePath();
+ // we use the fs backend with small threshold here to test the behaviour with file
+ // references, not self contained byte handles
+ RocksDBStateBackend rdb =
+ new RocksDBStateBackend(
+ new FsStateBackend(
+ new Path("file://" + backups).toUri(), 16),
+ true);
+ rdb.setDbStoragePath(rocksDb);
+ this.stateBackend = rdb;
+ break;
+ }
+ default:
+ throw new IllegalStateException("No backend selected.");
+ }
+ return config;
+ }
+
+ protected Configuration createClusterConfig() throws IOException {
+ TemporaryFolder temporaryFolder = new TemporaryFolder();
+ temporaryFolder.create();
+ final File haDir = temporaryFolder.newFolder();
+
+ Configuration config = new Configuration();
+ config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "48m");
+ // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
+ config.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(80L << 20)); // 80 MB
+ config.setString(AkkaOptions.FRAMESIZE, String.valueOf(MAX_MEM_STATE_SIZE) + "b");
+
+ if (zkServer != null) {
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
+ }
+ return config;
+ }
+
+ @Before
+ public void setupTestCluster() throws Exception {
+ miniClusterResource = getMiniClusterResource();
+ miniClusterResource.before();
+ }
+
+ @After
+ public void stopTestCluster() throws IOException {
+ if (miniClusterResource != null) {
+ miniClusterResource.after();
+ miniClusterResource = null;
+ }
+
+ if (zkServer != null) {
+ zkServer.stop();
+ zkServer = null;
+ }
+
+ //Prints a message when finishing a test method to avoid Travis' <tt>"Maven produced no output
+ // for xxx seconds."</tt> messages.
+ System.out.println(
+ "Finished " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testTumblingTimeWindow() {
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int numKeys = numKeys();
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(windowSize, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
+ doTestTumblingTimeWindowWithKVState(PARALLELISM);
+ }
+
+ @Test
+ public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
+ doTestTumblingTimeWindowWithKVState(1 << 15);
+ }
+
+ public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int numKeys = numKeys();
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.setMaxParallelism(maxParallelism);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(windowSize, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ private ValueState<Integer> count;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ count = getRuntimeContext().getState(
+ new ValueStateDescriptor<>("count", Integer.class, 0));
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
+
+ // the window count state starts with the key, so that we get
+ // different count results for each key
+ if (count.value() == 0) {
+ count.update(tuple.<Long>getField(0).intValue());
+ }
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ count.update(count.value() + 1);
+ out.collect(new Tuple4<>(tuple.<Long>getField(0), window.getStart(), window.getEnd(), new IntType(count.value())));
+ }
+ })
+ .addSink(new CountValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSlidingTimeWindow() {
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int windowSlide = windowSlide();
+ final int numKeys = numKeys();
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setMaxParallelism(2 * PARALLELISM);
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
+ .apply(new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> values,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ int sum = 0;
+ long key = -1;
+
+ for (Tuple2<Long, IntType> value : values) {
+ sum += value.f1.value;
+ key = value.f0;
+ }
+ out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+ }
+ })
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreAggregatedTumblingTimeWindow() {
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int numKeys = numKeys();
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(windowSize, MILLISECONDS))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a,
+ Tuple2<Long, IntType> b) {
+ return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> input,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> in: input) {
+ out.collect(new Tuple4<>(in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1));
+ }
+ }
+ })
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1);
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPreAggregatedSlidingTimeWindow() {
+ final int numElementsPerKey = numElementsPerKey();
+ final int windowSize = windowSize();
+ final int windowSlide = windowSlide();
+ final int numKeys = numKeys();
+ FailingSource.reset();
+
+ try {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(PARALLELISM);
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+ env.getConfig().disableSysoutLogging();
+ env.setStateBackend(this.stateBackend);
+ env.getConfig().setUseSnapshotCompression(true);
+
+ env
+ .addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
+ .rebalance()
+ .keyBy(0)
+ .timeWindow(Time.of(windowSize, MILLISECONDS), Time.of(windowSlide, MILLISECONDS))
+ .reduce(
+ new ReduceFunction<Tuple2<Long, IntType>>() {
+
+ @Override
+ public Tuple2<Long, IntType> reduce(
+ Tuple2<Long, IntType> a,
+ Tuple2<Long, IntType> b) {
+
+ // validate that the function has been opened properly
+ return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value));
+ }
+ },
+ new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>() {
+
+ private boolean open = false;
+
+ @Override
+ public void open(Configuration parameters) {
+ assertEquals(PARALLELISM, getRuntimeContext().getNumberOfParallelSubtasks());
+ open = true;
+ }
+
+ @Override
+ public void apply(
+ Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<Long, IntType>> input,
+ Collector<Tuple4<Long, Long, Long, IntType>> out) {
+
+ // validate that the function has been opened properly
+ assertTrue(open);
+
+ for (Tuple2<Long, IntType> in: input) {
+ out.collect(new Tuple4<>(in.f0,
+ window.getStart(),
+ window.getEnd(),
+ in.f1));
+ }
+ }
+ })
+ .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSlide)).setParallelism(1);
+
+ tryExecute(env, "Tumbling Window Test");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class FailingSource extends RichSourceFunction<Tuple2<Long, IntType>>
+ implements ListCheckpointed<Integer>, CheckpointListener {
+ private static volatile boolean failedBefore = false;
+
+ private final int numKeys;
+ private final int numElementsToEmit;
+ private final int failureAfterNumElements;
+
+ private volatile int numElementsEmitted;
+ private volatile int numSuccessfulCheckpoints;
+ private volatile boolean running = true;
+
+ private FailingSource(int numKeys, int numElementsToEmitPerKey, int failureAfterNumElements) {
+ this.numKeys = numKeys;
+ this.numElementsToEmit = numElementsToEmitPerKey;
+ this.failureAfterNumElements = failureAfterNumElements;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ // non-parallel source
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void run(SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
+ // we loop longer than we have elements, to permit delayed checkpoints
+ // to still cause a failure
+ while (running) {
+
+ if (!failedBefore) {
+ // delay a bit, if we have not failed before
+ Thread.sleep(1);
+ if (numSuccessfulCheckpoints >= 2 && numElementsEmitted >= failureAfterNumElements) {
+ // cause a failure if we have not failed before and have reached
+ // enough completed checkpoints and elements
+ failedBefore = true;
+ throw new Exception("Artificial Failure");
+ }
+ }
+
+ if (numElementsEmitted < numElementsToEmit &&
+ (failedBefore || numElementsEmitted <= failureAfterNumElements)) {
+ // the function failed before, or we are in the elements before the failure
+ synchronized (ctx.getCheckpointLock()) {
+ int next = numElementsEmitted++;
+ for (long i = 0; i < numKeys; i++) {
+ ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+ }
+ ctx.emitWatermark(new Watermark(next));
+ }
+ }
+ else {
+
+ // if our work is done, delay a bit to prevent busy waiting
+ Thread.sleep(1);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ numSuccessfulCheckpoints++;
+ }
+
+ @Override
+ public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.numElementsEmitted);
+ }
+
+ @Override
+ public void restoreState(List<Integer> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.numElementsEmitted = state.get(0);
+ }
+
+ public static void reset() {
+ failedBefore = false;
+ }
+ }
+
+ private static class ValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+ implements ListCheckpointed<HashMap<Long, Integer>> {
+
+ private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+ private final int numKeys;
+ private final int numWindowsExpected;
+
+ private ValidatingSink(int numKeys, int numWindowsExpected) {
+ this.numKeys = numKeys;
+ this.numWindowsExpected = numWindowsExpected;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // this sink can only work with DOP 1
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+
+ // it can happen that a checkpoint happens when the complete success state is
+ // already set. In that case we restart with the final state and would never
+ // finish because no more elements arrive.
+ if (windowCounts.size() == numKeys) {
+ boolean seenAll = true;
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount != numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ if (seenAll) {
+ throw new SuccessException();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ }
+ assertTrue("The sink must see all expected windows.", seenAll);
+ }
+
+ @Override
+ public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+ // verify the contents of that window, Tuple4.f1 and .f2 are the window start/end
+ // the sum should be "sum (start .. end-1)"
+
+ int expectedSum = 0;
+ for (long i = value.f1; i < value.f2; i++) {
+ // only sum up positive vals, to filter out the negative start of the
+ // first sliding windows
+ if (i > 0) {
+ expectedSum += i;
+ }
+ }
+
+ assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value);
+
+ Integer curr = windowCounts.get(value.f0);
+ if (curr != null) {
+ windowCounts.put(value.f0, curr + 1);
+ }
+ else {
+ windowCounts.put(value.f0, 1);
+ }
+
+ if (windowCounts.size() == numKeys) {
+ boolean seenAll = true;
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ } else if (windowCount > numWindowsExpected) {
+ fail("Window count to high: " + windowCount);
+ }
+ }
+
+ if (seenAll) {
+ // exit
+ throw new SuccessException();
+ }
+
+ }
+ }
+
+ @Override
+ public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.windowCounts);
+ }
+
+ @Override
+ public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ windowCounts.putAll(state.get(0));
+ }
+ }
+
+ // Sink for validating the stateful window counts
+ private static class CountValidatingSink extends RichSinkFunction<Tuple4<Long, Long, Long, IntType>>
+ implements ListCheckpointed<HashMap<Long, Integer>> {
+
+ private final HashMap<Long, Integer> windowCounts = new HashMap<>();
+
+ private final int numKeys;
+ private final int numWindowsExpected;
+
+ private CountValidatingSink(int numKeys, int numWindowsExpected) {
+ this.numKeys = numKeys;
+ this.numWindowsExpected = numWindowsExpected;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ // this sink can only work with DOP 1
+ assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
+ @Override
+ public void close() throws Exception {
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ }
+ }
+ }
+ assertTrue("The source must see all expected windows.", seenAll);
+ }
+
+ @Override
+ public void invoke(Tuple4<Long, Long, Long, IntType> value) throws Exception {
+
+ Integer curr = windowCounts.get(value.f0);
+ if (curr != null) {
+ windowCounts.put(value.f0, curr + 1);
+ }
+ else {
+ windowCounts.put(value.f0, 1);
+ }
+
+ // verify the contents of that window, the contents should be:
+ // (key + num windows so far)
+
+ assertEquals("Window counts don't match for key " + value.f0 + ".", value.f0.intValue() + windowCounts.get(value.f0), value.f3.value);
+
+ boolean seenAll = true;
+ if (windowCounts.size() == numKeys) {
+ for (Integer windowCount: windowCounts.values()) {
+ if (windowCount < numWindowsExpected) {
+ seenAll = false;
+ break;
+ } else if (windowCount > numWindowsExpected) {
+ fail("Window count to high: " + windowCount);
+ }
+ }
+
+ if (seenAll) {
+ // exit
+ throw new SuccessException();
+ }
+
+ }
+ }
+
+ @Override
+ public List<HashMap<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {
+ return Collections.singletonList(this.windowCounts);
+ }
+
+ @Override
+ public void restoreState(List<HashMap<Long, Integer>> state) throws Exception {
+ if (state.isEmpty() || state.size() > 1) {
+ throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
+ }
+ this.windowCounts.putAll(state.get(0));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static class IntType {
+
+ public int value;
+
+ public IntType() {}
+
+ public IntType(int value) {
+ this.value = value;
+ }
+ }
+
+ private int numElementsPerKey() {
+ switch (this.stateBackendEnum) {
+ case ROCKSDB_FULLY_ASYNC:
+ case ROCKSDB_INCREMENTAL:
+ case ROCKSDB_INCREMENTAL_ZK:
+ return 3000;
+ default:
+ return 300;
+ }
+ }
+
+ private int windowSize() {
+ switch (this.stateBackendEnum) {
+ case ROCKSDB_FULLY_ASYNC:
+ case ROCKSDB_INCREMENTAL:
+ case ROCKSDB_INCREMENTAL_ZK:
+ return 1000;
+ default:
+ return 100;
+ }
+ }
+
+ private int windowSlide() {
+ return 100;
+ }
+
+ private int numKeys() {
+ switch (this.stateBackendEnum) {
+ case ROCKSDB_FULLY_ASYNC:
+ case ROCKSDB_INCREMENTAL:
+ case ROCKSDB_INCREMENTAL_ZK:
+ return 100;
+ default:
+ return 20;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index eab6153..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for file backend.
- */
-public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.FILE;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index ed43ad6..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for incremental RocksDB backend.
- */
-public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
- }
-
- @Override
- protected int numElementsPerKey() {
- return 3000;
- }
-
- @Override
- protected int windowSize() {
- return 1000;
- }
-
- @Override
- protected int windowSlide() {
- return 100;
- }
-
- @Override
- protected int numKeys() {
- return 100;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
deleted file mode 100644
index 1276a00..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-/**
- * Integration tests for incremental RocksDB backend.
- */
-public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase {
-
- @Override
- protected StateBackendEnum getStateBackend() {
- return StateBackendEnum.ROCKSDB_INCREMENTAL;
- }
-
- @Override
- protected int numElementsPerKey() {
- return 3000;
- }
-
- @Override
- protected int windowSize() {
- return 1000;
- }
-
- @Override
- protected int windowSlide() {
- return 100;
- }
-
- @Override
- protected int numKeys() {
- return 100;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
deleted file mode 100644
index 6749366..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryHeapITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
-
-/**
- * Tests file-based local recovery with the HeapBackend.
- */
-public class LocalRecoveryHeapITCase extends AbstractLocalRecoveryITCase {
- public LocalRecoveryHeapITCase() {
- super(FILE_ASYNC, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
new file mode 100644
index 0000000..5374765
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryITCase.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
+import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
+
+/**
+ * This test delegates to instances of {@link EventTimeWindowCheckpointingITCase} that have been reconfigured
+ * to use local recovery.
+ *
+ * <p>TODO: This class must be refactored to properly extend {@link EventTimeWindowCheckpointingITCase}.
+ */
+@RunWith(Parameterized.class)
+public class LocalRecoveryITCase extends TestLogger {
+
+ private final boolean localRecoveryEnabled = true;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Parameterized.Parameter
+ public StateBackendEnum backendEnum;
+
+ @Parameterized.Parameters(name = "statebackend type ={0}")
+ public static Collection<StateBackendEnum> parameter() {
+ return Arrays.asList(ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL_ZK, FILE_ASYNC);
+ }
+
+ @Test
+ public final void executeTest() throws Exception {
+ EventTimeWindowCheckpointingITCase.tempFolder.create();
+ EventTimeWindowCheckpointingITCase windowChkITCase =
+ new EventTimeWindowCheckpointingITCase() {
+
+ @Override
+ protected StateBackendEnum getStateBackend() {
+ return backendEnum;
+ }
+
+ @Override
+ protected Configuration createClusterConfig() throws IOException {
+ Configuration config = super.createClusterConfig();
+
+ config.setBoolean(
+ CheckpointingOptions.LOCAL_RECOVERY,
+ localRecoveryEnabled);
+
+ return config;
+ }
+ };
+
+ executeTest(windowChkITCase);
+ }
+
+ private void executeTest(EventTimeWindowCheckpointingITCase delegate) throws Exception {
+ delegate.name = testName;
+ try {
+ delegate.setupTestCluster();
+ try {
+ delegate.testTumblingTimeWindow();
+ delegate.stopTestCluster();
+ } catch (Exception e) {
+ delegate.stopTestCluster();
+ }
+
+ delegate.setupTestCluster();
+ try {
+ delegate.testSlidingTimeWindow();
+ delegate.stopTestCluster();
+ } catch (Exception e) {
+ delegate.stopTestCluster();
+ }
+ } finally {
+ delegate.tempFolder.delete();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
deleted file mode 100644
index 2d12ae2..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBFullITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_FULLY_ASYNC;
-
-/**
- * Tests file-based local recovery with the RocksDB state-backend.
- */
-public class LocalRecoveryRocksDBFullITCase extends AbstractLocalRecoveryITCase {
- public LocalRecoveryRocksDBFullITCase() {
- super(ROCKSDB_FULLY_ASYNC, true);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fc49801d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
deleted file mode 100644
index 718d4a3..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/LocalRecoveryRocksDBIncrementalITCase.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.checkpointing;
-
-import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
-
-/**
- * Tests file-based local recovery with the RocksDB state-backend and incremental checkpointing enabled.
- */
-public class LocalRecoveryRocksDBIncrementalITCase extends AbstractLocalRecoveryITCase {
- public LocalRecoveryRocksDBIncrementalITCase() {
- super(ROCKSDB_INCREMENTAL_ZK, true);
- }
-}