You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/13 12:39:48 UTC

[4/6] flink git commit: [FLINK-9004][tests] Implement Jepsen tests to test job availability.

[FLINK-9004][tests] Implement Jepsen tests to test job availability.

Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network
partitions, etc. The Flink cluster under test is automatically deployed on YARN
(session & job mode) and Mesos.

Provide Dockerfiles for local test development.

This closes #6240.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d58c8c05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d58c8c05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d58c8c05

Branch: refs/heads/master
Commit: d58c8c05f0b86bdb74cb6e450848690e309011d6
Parents: d783c62
Author: gyao <ga...@data-artisans.com>
Authored: Mon Mar 5 22:23:33 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200

----------------------------------------------------------------------
 flink-jepsen/.gitignore                         |  18 ++
 flink-jepsen/README.md                          |  70 ++++++
 flink-jepsen/bin/.gitkeep                       |   0
 flink-jepsen/docker/.gitignore                  |   3 +
 flink-jepsen/docker/Dockerfile-control          |  45 ++++
 flink-jepsen/docker/Dockerfile-db               |  39 ++++
 flink-jepsen/docker/docker-compose.yml          |  52 +++++
 flink-jepsen/docker/run-tests.sh                |  31 +++
 flink-jepsen/docker/up.sh                       |  31 +++
 flink-jepsen/project.clj                        |  28 +++
 flink-jepsen/scripts/run-tests.sh               |  43 ++++
 flink-jepsen/src/jepsen/flink/checker.clj       | 128 ++++++++++
 flink-jepsen/src/jepsen/flink/client.clj        | 150 ++++++++++++
 flink-jepsen/src/jepsen/flink/db.clj            | 232 +++++++++++++++++++
 flink-jepsen/src/jepsen/flink/flink.clj         | 110 +++++++++
 flink-jepsen/src/jepsen/flink/generator.clj     |  39 ++++
 flink-jepsen/src/jepsen/flink/hadoop.clj        | 139 +++++++++++
 flink-jepsen/src/jepsen/flink/mesos.clj         | 165 +++++++++++++
 flink-jepsen/src/jepsen/flink/nemesis.clj       | 163 +++++++++++++
 flink-jepsen/src/jepsen/flink/utils.clj         |  48 ++++
 flink-jepsen/src/jepsen/flink/zookeeper.clj     |  29 +++
 flink-jepsen/test/jepsen/flink/checker_test.clj |  82 +++++++
 flink-jepsen/test/jepsen/flink/client_test.clj  |  37 +++
 flink-jepsen/test/jepsen/flink/utils_test.clj   |  39 ++++
 .../test/jepsen/flink/zookeeper_test.clj        |  28 +++
 25 files changed, 1749 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/.gitignore
----------------------------------------------------------------------
diff --git a/flink-jepsen/.gitignore b/flink-jepsen/.gitignore
new file mode 100644
index 0000000..ed5eca5
--- /dev/null
+++ b/flink-jepsen/.gitignore
@@ -0,0 +1,18 @@
+*.class
+*.iml
+*.jar
+*.retry
+.DS_Store
+.hg/
+.hgignore
+.idea/
+/.lein-*
+/.nrepl-port
+/checkouts
+/classes
+/target
+pom.xml
+pom.xml.asc
+store
+bin/*
+!bin/.gitkeep

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/README.md
----------------------------------------------------------------------
diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
new file mode 100644
index 0000000..9343246
--- /dev/null
+++ b/flink-jepsen/README.md
@@ -0,0 +1,70 @@
+# flink-jepsen
+
+A Clojure project based on the [Jepsen](https://github.com/jepsen-io/jepsen) framework to find bugs in the
+distributed coordination of Apache Flink®.
+
+## Test Coverage
+Jepsen is a framework built to test the behavior of distributed systems
+under faults. The tests in this particular project deploy Flink on either YARN or Mesos, submit a
+job, and examine the availability of the job after injecting faults.
+A job is said to be available if all the tasks of the job are running.
+The faults that can be currently introduced to the Flink cluster include:
+* Killing of TaskManager/JobManager processes
+* Stopping HDFS NameNode
+* Network partitions
+
+There are many more properties other than job availability that could be
+verified but are not yet covered by this test suite, e.g., end-to-end exactly-once processing
+semantics.
+
+## Usage
+See the [Jepsen documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment)
+for how to set up the environment to run tests. The script under `scripts/run-tests.sh` documents how to invoke
+tests. The Flink job used for testing is located under
+`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build the job first and copy
+the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's
+root.
+
+### Docker
+
+To simplify development, we have prepared Dockerfiles and a Docker Compose template
+so that you can run the tests locally in containers. To build the images
+and start the containers, simply run:
+
+    $ cd docker
+    $ ./up.sh
+
+After the containers started, open a new terminal window and run `docker exec -it jepsen-control bash`.
+This will allow you to run arbitrary commands on the control node.
+To start the tests, you can use the `run-tests.sh` script in the `docker` directory,
+which expects the number of test iterations, and a URI to a Flink distribution, e.g.,
+
+    ./docker/run-tests.sh 1 https://example.com/flink-dist.tgz
+
+The project's root is mounted as a volume to all containers under the path `/jepsen`.
+This means that changes to the test sources are immediately reflected in the control node container.
+Moreover, this allows you to test locally built Flink distributions by copying the tarball to the
+project's root and passing a URI with the `file://` scheme to the `run-tests.sh` script, e.g.,
+`file:///jepsen/flink-dist.tgz`.
+
+#### Memory Requirements
+
+The tests have high memory demands due to the many processes that are started by the control node.
+For example, to test Flink on YARN in a HA setup, we require ZooKeeper, HDFS NameNode,
+HDFS DataNode, YARN NodeManager, and YARN ResourceManager, in addition to the Flink processes.
+We found that the tests can be run comfortably in Docker containers on a machine with 32 GiB RAM. 
+
+### Checking the Output of Tests
+
+Consult the `jepsen.log` file for the particular test run in the `store` folder. The final output of every test will be either
+
+    Everything looks good! ヽ('ー`)ノ
+
+or
+
+    Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
+
+depending on whether the test passed or not. If neither output is generated, the test did not finish
+properly due to problems of the environment, bugs in Jepsen or in the test suite, etc.
+
+In addition, the test directories contain all relevant log files aggregated from all hosts.

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/bin/.gitkeep
----------------------------------------------------------------------
diff --git a/flink-jepsen/bin/.gitkeep b/flink-jepsen/bin/.gitkeep
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/.gitignore
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/.gitignore b/flink-jepsen/docker/.gitignore
new file mode 100644
index 0000000..6ff5a8e
--- /dev/null
+++ b/flink-jepsen/docker/.gitignore
@@ -0,0 +1,3 @@
+id_rsa
+id_rsa.pub
+nodes

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/Dockerfile-control
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/Dockerfile-control b/flink-jepsen/docker/Dockerfile-control
new file mode 100644
index 0000000..96198e3
--- /dev/null
+++ b/flink-jepsen/docker/Dockerfile-control
@@ -0,0 +1,45 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM debian:jessie
+
+RUN echo "deb http://http.debian.net/debian jessie-backports main" >> /etc/apt/sources.list && \
+    apt-get update && \
+    apt-get install -y -t jessie-backports openjdk-8-jdk && \
+    apt-get install -qqy \
+            less \
+            libjna-java \
+            gnuplot \
+            openjdk-8-jdk \
+            openssh-client \
+            vim \
+            wget
+
+ENV LEIN_ROOT true
+RUN wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein && \
+    mv lein /usr/bin && \
+    chmod +x /usr/bin/lein && \
+    lein self-install
+
+ADD id_rsa /root/.ssh/
+ADD id_rsa.pub /root/.ssh/
+RUN touch ~/.ssh/known_hosts
+
+WORKDIR /jepsen
+
+CMD tail -f /dev/null

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/Dockerfile-db
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/Dockerfile-db b/flink-jepsen/docker/Dockerfile-db
new file mode 100644
index 0000000..1555329
--- /dev/null
+++ b/flink-jepsen/docker/Dockerfile-db
@@ -0,0 +1,39 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM debian:jessie
+
+RUN echo "deb http://http.debian.net/debian jessie-backports main" >> /etc/apt/sources.list && \
+    apt-get update && \
+    apt-get install -y -t jessie-backports openjdk-8-jdk && \
+    apt-get install -y apt-utils bzip2 curl faketime iproute iptables iputils-ping less libzip2 logrotate man man-db net-tools ntpdate psmisc python rsyslog sudo sysvinit sysvinit-core sysvinit-utils tar unzip vim wget
+
+RUN apt-get update && \
+    apt-get -y install openssh-server && \
+    mkdir -p /var/run/sshd && \
+    sed -i "s/UsePrivilegeSeparation.*/UsePrivilegeSeparation no/g" /etc/ssh/sshd_config && \
+    sed -i "s/PermitRootLogin without-password/PermitRootLogin yes/g" /etc/ssh/sshd_config
+
+ADD id_rsa.pub /root
+RUN mkdir -p /root/.ssh/ && \
+    touch /root/.ssh/authorized_keys && \
+    chmod 600 /root/.ssh/authorized_keys && \
+    cat /root/id_rsa.pub >> /root/.ssh/authorized_keys
+
+EXPOSE 22
+CMD exec /usr/sbin/sshd -D

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/docker-compose.yml
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/docker-compose.yml b/flink-jepsen/docker/docker-compose.yml
new file mode 100644
index 0000000..3d2bdbd
--- /dev/null
+++ b/flink-jepsen/docker/docker-compose.yml
@@ -0,0 +1,52 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+version: '2'
+services:
+  control:
+    volumes:
+      - ${JEPSEN_ROOT}:/jepsen
+
+    container_name: jepsen-control
+    hostname: control
+    build:
+      context: ./
+      dockerfile: Dockerfile-control
+    privileged: true
+    links:
+      - n1
+      - n2
+      - n3
+
+  n1:
+    build:
+      context: ./
+      dockerfile: Dockerfile-db
+    privileged: true
+    container_name: jepsen-n1
+    hostname: n1
+    volumes:
+      - ${JEPSEN_ROOT}:/jepsen
+  n2:
+    extends: n1
+    container_name: jepsen-n2
+    hostname: n2
+  n3:
+    extends: n1
+    container_name: jepsen-n3
+    hostname: n3

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/run-tests.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/run-tests.sh b/flink-jepsen/docker/run-tests.sh
new file mode 100755
index 0000000..8b2b1e6
--- /dev/null
+++ b/flink-jepsen/docker/run-tests.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+dockerdir=$(dirname $0)
+dockerdir=$(cd ${dockerdir}; pwd)
+
+cat <<EOF > ${dockerdir}/nodes
+n1
+n2
+n3
+EOF
+
+common_jepsen_args+=(--nodes-file ${dockerdir}/nodes)
+
+. ${dockerdir}/../scripts/run-tests.sh ${1} ${2} 1

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/up.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/up.sh b/flink-jepsen/docker/up.sh
new file mode 100755
index 0000000..5479b3b
--- /dev/null
+++ b/flink-jepsen/docker/up.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -e
+
+dockerdir=$(dirname $0)
+dockerdir=$(cd ${dockerdir}; pwd)
+
+if [ ! -f ./id_rsa ]; then
+    ssh-keygen -t rsa -N "" -f ./id_rsa
+fi
+
+export JEPSEN_ROOT=${dockerdir}/../
+docker-compose build
+docker-compose -f docker-compose.yml up --force-recreate

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/project.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/project.clj b/flink-jepsen/project.clj
new file mode 100644
index 0000000..78935d7
--- /dev/null
+++ b/flink-jepsen/project.clj
@@ -0,0 +1,28 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(defproject jepsen.flink "0.1.0-SNAPSHOT"
+  :license {:name "Apache License"
+            :url  "http://www.apache.org/licenses/LICENSE-2.0"}
+  :main jepsen.flink.flink
+  :dependencies [[org.clojure/clojure "1.9.0"],
+                 [cheshire "5.8.0"]
+                 [clj-http "3.8.0"]
+                 [jepsen "0.1.10"],
+                 [jepsen.zookeeper "0.1.0"]
+                 [org.clojure/data.xml "0.0.8"]
+                 [zookeeper-clj "0.9.4" :exclusions [org.slf4j/slf4j-log4j12]]]
+  :profiles {:test {:dependencies [[clj-http-fake "1.0.3"]]}})

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/scripts/run-tests.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/scripts/run-tests.sh b/flink-jepsen/scripts/run-tests.sh
new file mode 100755
index 0000000..e448124
--- /dev/null
+++ b/flink-jepsen/scripts/run-tests.sh
@@ -0,0 +1,43 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -euo pipefail
+
+scripts=$(dirname $0)
+scripts=$(cd ${scripts}; pwd)
+
+parallelism=${3}
+
+common_jepsen_args+=(--ha-storage-dir hdfs:///flink
+--job-jar ${scripts}/../bin/DataStreamAllroundTestProgram.jar
+--tarball ${2}
+--job-args "--environment.parallelism ${parallelism} --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"
+--ssh-private-key ~/.ssh/id_rsa)
+
+for i in $(seq 1 ${1})
+do
+  echo "Executing run #${i} of ${1}"
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-job
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-job
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-job
+  echo
+done

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/checker.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj b/flink-jepsen/src/jepsen/flink/checker.clj
new file mode 100644
index 0000000..02cc863
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -0,0 +1,128 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.checker
+  (:require [jepsen
+             [checker :as checker]
+             [util :as ju]]
+            [knossos.model :as model])
+  (:import (knossos.model Model)))
+
+(defn stoppable-op? [op]
+  (clojure.string/includes? (name (:f op)) "-start"))
+
+(defn stop-op? [op]
+  (clojure.string/includes? (name (:f op)) "-stop"))
+
+(defn strip-op-suffix [op]
+  (clojure.string/replace (name (:f op)) #"-start|-stop" ""))
+
+(def safe-inc
+  (fnil inc 0))
+
+(defn nemeses-active?
+  [active-nemeses]
+  (->> (vals active-nemeses)
+       (reduce +)
+       pos?))
+
+(defn dissoc-if
+  [f m]
+  (->> (remove f m)
+       (into {})))
+
+(defn zero-value?
+  [[_ v]]
+  (zero? v))
+
+(defrecord
+  JobRunningWithinGracePeriod
+  ^{:doc "A Model which is consistent iff. the Flink job became available within
+  `job-recovery-grace-period` seconds after the last fault injected by the nemesis.
+  Note that some faults happen at a single point in time (e.g., killing of processes). Other faults,
+  such as network splits, happen during a period of time, and can thus be interleaving. As long as
+  there are active faults, the job is allowed not to be available."}
+  [active-nemeses                                           ; stores active failures
+   healthy-count                                            ; how many consecutive times was the job running?
+   last-failure                                             ; timestamp when the last failure was injected/ended
+   healthy-threshold                                        ; after how many times is the job considered healthy
+   job-recovery-grace-period]                               ; after how many seconds should the job be recovered
+  Model
+  (step [this op]
+    (case (:process op)
+      :nemesis (cond
+                 (nil? (:value op)) this
+                 (stoppable-op? op) (assoc
+                                      this
+                                      :active-nemeses (update active-nemeses
+                                                              (strip-op-suffix op)
+                                                              safe-inc))
+                 (stop-op? op) (assoc
+                                 this
+                                 :active-nemeses (dissoc-if zero-value?
+                                                            (update active-nemeses (strip-op-suffix op) dec))
+                                 :last-failure (:time op))
+                 :else (assoc this :last-failure (:time op)))
+      (case (:f op)
+        :job-running? (case (:type op)
+                        :info this                          ; ignore :info operations
+                        :fail this                          ; ignore :fail operations
+                        :invoke this                        ; ignore :invoke operations
+                        :ok (if (:value op)                 ; check if job is running
+                              (assoc                        ; job is running
+                                this
+                                :healthy-count
+                                (inc healthy-count))
+                              (if (and                      ; job is not running
+                                    (not (nemeses-active? active-nemeses))
+                                    (< healthy-count healthy-threshold)
+                                    (> (ju/nanos->secs (- (:time op) last-failure)) job-recovery-grace-period))
+                                ; job is not running but it should be running
+                                ; because grace period passed
+                                (model/inconsistent "Job is not running.")
+                                (conj this
+                                      [:healthy-count 0]))))
+        ; ignore other client operations
+        this))))
+
+(defn job-running-within-grace-period
+  [job-running-healthy-threshold job-recovery-grace-period]
+  (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold job-recovery-grace-period))
+
+(defn job-running-checker
+  []
+  (reify
+    checker/Checker
+    (check [_ test model history _]
+      (let [final (reduce model/step (assoc model :last-failure (:time (first history))) history)
+            result-map (conj {}
+                             (find test :nemesis-gen)
+                             (find test :deployment-mode))]
+        (if (or (model/inconsistent? final) (zero? (:healthy-count final 0)))
+          (into result-map {:valid? false
+                            :error  (:msg final)})
+          (into result-map {:valid?      true
+                            :final-model final}))))))
+
+(defn get-job-running-history
+  [history]
+  (->>
+    history
+    (remove #(= (:process %) :nemesis))
+    (remove #(= (:type %) :invoke))
+    (map :value)
+    (map boolean)
+    (remove nil?)))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/client.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/client.clj b/flink-jepsen/src/jepsen/flink/client.clj
new file mode 100644
index 0000000..905dc48
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -0,0 +1,150 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.client
+  (:require [clj-http.client :as http]
+            [clojure.tools.logging :refer :all]
+            [jepsen.client :as client]
+            [jepsen.flink.zookeeper :as fz]
+            [jepsen.flink.utils :as fu]
+            [zookeeper :as zk])
+  (:import (java.io ByteArrayInputStream ObjectInputStream)))
+
+(defn connect-zk-client!
+  [connection-string]
+  (zk/connect connection-string :timeout-msec 60000))
+
+(defn read-url
+  [bytes]
+  (with-open [object-input-stream (ObjectInputStream. (ByteArrayInputStream. bytes))]
+    (.readUTF object-input-stream)))
+
+(defn wait-for-zk-operation
+  [zk-client operation path]
+  (let [p (promise)]
+    (letfn [(iter [_]
+              (when-let [res (operation zk-client path :watcher iter)]
+                (deliver p res)))
+            ]
+      (iter nil)
+      p)))
+
+(defn wait-for-path-to-exist
+  [zk-client path]
+  (info "Waiting for path" path "in ZK.")
+  (wait-for-zk-operation zk-client zk/exists path))
+
+(defn wait-for-children-to-exist
+  [zk-client path]
+  (wait-for-zk-operation zk-client zk/children path))
+
+(defn find-application-id
+  [zk-client]
+  (do
+    (->
+      (wait-for-path-to-exist zk-client "/flink")
+      (deref))
+    (->
+      (wait-for-children-to-exist zk-client "/flink")
+      (deref)
+      (first))))
+
+(defn watch-node-bytes
+  [zk-client path callback]
+  (when (zk/exists zk-client path :watcher (fn [_] (watch-node-bytes zk-client path callback)))
+    (->>
+      (zk/data zk-client path :watcher (fn [_] (watch-node-bytes zk-client path callback)))
+      :data
+      (callback))))
+
+(defn make-job-manager-url [test]
+  (let [rest-url-atom (atom nil)
+        zk-client (connect-zk-client! (fz/zookeeper-quorum test))
+        init-future (future
+                      (let [application-id (find-application-id zk-client)
+                            path (str "/flink/" application-id "/leader/rest_server_lock")
+                            _ (->
+                                (wait-for-path-to-exist zk-client path)
+                                (deref))]
+                        (info "Determined application id to be" application-id)
+                        (watch-node-bytes zk-client path
+                                          (fn [bytes]
+                                            (let [url (read-url bytes)]
+                                              (info "Leading REST url changed to" url)
+                                              (reset! rest-url-atom url))))))]
+    {:rest-url-atom rest-url-atom
+     :closer        (fn [] (zk/close zk-client))
+     :init-future   init-future}))
+
+(defn list-jobs!
+  [base-url]
+  (->>
+    (http/get (str base-url "/jobs") {:as :json})
+    :body
+    :jobs
+    (map :id)))
+
+(defn get-job-details!
+  [base-url job-id]
+  (assert base-url)
+  (assert job-id)
+  (let [job-details (->
+                      (http/get (str base-url "/jobs/" job-id) {:as :json})
+                      :body)]
+    (assert (:vertices job-details) "Job does not have vertices")
+    job-details))
+
+(defn job-running?
+  [base-url job-id]
+  (->>
+    (get-job-details! base-url job-id)
+    :vertices
+    (map :status)
+    (every? #(= "RUNNING" %))))
+
+(defrecord Client
+  [deploy-cluster! closer rest-url init-future job-id]
+  client/Client
+  (open! [this test node]
+    (let [{:keys [rest-url-atom closer init-future]} (make-job-manager-url test)]
+      (assoc this :closer closer :rest-url rest-url-atom :init-future init-future :job-id (atom nil))))
+
+  (setup! [this test] this)
+
+  (invoke! [this test op]
+    (case (:f op)
+      :submit (do
+                (deploy-cluster! test)
+                (deref init-future)
+                (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
+                                     :fallback (fn [e] (do
+                                                         (fatal e "Could not get running jobs.")
+                                                         (System/exit 1))))
+                      num-jobs (count jobs)]
+                  (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
+                  (reset! job-id (first jobs)))
+                (assoc op :type :ok))
+      :job-running? (let [base-url @rest-url]
+                      (if base-url
+                        (try
+                          (assoc op :type :ok :value (job-running? base-url @job-id))
+                          (catch Exception e (do
+                                               (warn e "Get job details from" base-url "failed.")
+                                               (assoc op :type :fail))))
+                        (assoc op :type :fail :value "Cluster not deployed yet.")))))
+
+  (teardown! [this test])
+  (close! [this test] (closer)))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/db.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/db.clj b/flink-jepsen/src/jepsen/flink/db.clj
new file mode 100644
index 0000000..ff934e1
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -0,0 +1,232 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.db
+  (:require [clj-http.client :as http]
+            [clojure.java.io]
+            [clojure.string :as str]
+            [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]
+             [db :as db]
+             [util :refer [meh]]
+             [zookeeper :as zk]]
+            [jepsen.control.util :as cu]
+            [jepsen.flink.hadoop :as hadoop]
+            [jepsen.flink.mesos :as mesos]
+            [jepsen.flink.utils :as fu]
+            [jepsen.flink.zookeeper :refer :all]))
+
+(def install-dir "/opt/flink")
+(def upload-dir "/tmp")
+(def log-dir (str install-dir "/log"))
+(def conf-file (str install-dir "/conf/flink-conf.yaml"))
+(def masters-file (str install-dir "/conf/masters"))
+
+(def default-flink-dist-url "https://archive.apache.org/dist/flink/flink-1.5.0/flink-1.5.0-bin-hadoop28-scala_2.11.tgz")
+(def hadoop-dist-url "https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz")
+(def deb-zookeeper-package "3.4.9-3+deb8u1")
+(def deb-mesos-package "1.5.0-2.0.2")
+(def deb-marathon-package "1.6.322")
+
+(def taskmanager-slots 1)
+(def master-count 1)
+
+(defn flink-configuration
+  [test]
+  {:high-availability                  "zookeeper"
+   :high-availability.zookeeper.quorum (zookeeper-quorum test)
+   :high-availability.storageDir       (str (:ha-storage-dir test) "/ha")
+   :state.savepoints.dir               (str (:ha-storage-dir test) "/savepoints")
+   :web.port                           8081
+   :rest.bind-address                  "0.0.0.0"
+   :taskmanager.numberOfTaskSlots      taskmanager-slots
+   :yarn.application-attempts          99999
+   :slotmanager.taskmanager-timeout    10000
+   :state.backend.local-recovery       "true"
+   :taskmanager.registration.timeout   "30 s"})
+
+(defn master-nodes
+  [test]
+  (take master-count (sort (:nodes test))))
+
+(defn write-configuration!
+  "Writes the flink-conf.yaml and masters file to the flink conf directory"
+  [test]
+  (let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
+                                         (seq (flink-configuration test))))
+        m (clojure.string/join "\n" (master-nodes test))]
+    (c/exec :echo c :> conf-file)
+    (c/exec :echo m :> masters-file)
+    ;; TODO: write log4j.properties properly
+    (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, file/' " install-dir "/conf/log4j.properties")))))
+
+(defn install-flink!
+  [test]
+  (let [url (:tarball test)]
+    (info "Installing Flink from" url)
+    (cu/install-archive! url install-dir)
+    (info "Enable S3 FS")
+    (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/lib")))
+    (c/upload (:job-jar test) upload-dir)
+    (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar test)))) install-dir)
+    (write-configuration! test)))
+
+(defn teardown-flink!
+  []
+  (info "Tearing down Flink")
+  (meh (c/exec :rm :-rf install-dir))
+  (meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
+
+(defn get-log-files!
+  []
+  (if (cu/exists? log-dir) (cu/ls-full log-dir) []))
+
+(defn flink-db
+  [test]
+  (reify db/DB
+    (setup! [_ test node]
+      (c/su
+        (install-flink! test)))
+
+    (teardown! [_ test node]
+      (c/su
+        (teardown-flink!)))
+
+    db/LogFiles
+    (log-files [_ test node]
+      (concat
+        (get-log-files!)))))
+
+(defn combined-db
+  [dbs]
+  (reify db/DB
+    (setup! [_ test node]
+      (c/su
+        (doall (map #(db/setup! % test node) dbs))))
+    (teardown! [_ test node]
+      (c/su
+        (doall (map #(db/teardown! % test node) dbs))))
+    db/LogFiles
+    (log-files [_ test node]
+      (flatten (map #(db/log-files % test node) dbs)))))
+
+;;; YARN
+
+(defn flink-yarn-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        flink (flink-db test)]
+    (combined-db [hadoop zk flink])))
+
+(defn exec-flink!
+  [test cmd args]
+  (c/su
+    (c/exec (c/lit (str
+                     "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+                     "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+                     install-dir "/bin/flink " cmd " " args)))))
+
+(defn flink-run-cli-args
+  "Returns the CLI args that should be passed to 'flink run'"
+  [test]
+  (concat
+    ["-d"]
+    (if (:main-class test)
+      [(str "-c " (:main-class test))]
+      [])
+    (if (= :yarn-job (:deployment-mode test))
+      ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]
+      [])))
+
+(defn submit-job!
+  ([test] (submit-job! test []))
+  ([test cli-args]
+   (exec-flink! test "run" (clojure.string/join
+                             " "
+                             (concat cli-args
+                                     (flink-run-cli-args test)
+                                     [(str install-dir "/" (last (str/split (:job-jar test) #"/")))
+                                      (:job-args test)])))))
+
+(defn first-node
+  [test]
+  (-> test :nodes sort first))
+
+(defn start-yarn-session!
+  [test]
+  (let [node (first-node test)]
+    (c/on node
+          (info "Starting YARN session from" node)
+          (c/su
+            (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+                                "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
+                                " " install-dir "/bin/yarn-session.sh -d -jm 2048m -tm 2048m")))
+            (submit-job! test)))))
+
+(defn start-yarn-job!
+  [test]
+  (c/on (first-node test)
+        (c/su
+          (submit-job! test))))
+
+;;; Mesos
+
+(defn flink-mesos-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        mesos (mesos/db deb-mesos-package deb-marathon-package)
+        flink (flink-db test)]
+    (combined-db [hadoop zk mesos flink])))
+
+(defn submit-job-with-retry!
+  [test]
+  (fu/retry
+    (partial submit-job! test)
+    :fallback (fn [e] (do
+                        (fatal e "Could not submit job.")
+                        (System/exit 1)))))
+
+(defn start-mesos-session!
+  [test]
+  (c/su
+    (let [r (fu/retry (fn []
+                        (http/post
+                          (str (mesos/marathon-base-url test) "/v2/apps")
+                          {:form-params  {:id   "flink"
+                                          :cmd  (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+                                                     "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+                                                     install-dir "/bin/mesos-appmaster.sh "
+                                                     "-Dmesos.master=" (zookeeper-uri
+                                                                         test
+                                                                         mesos/zk-namespace) " "
+                                                     "-Djobmanager.rpc.address=$(hostname -f) "
+                                                     "-Djobmanager.heap.mb=2048 "
+                                                     "-Djobmanager.rpc.port=6123 "
+                                                     "-Djobmanager.web.port=8081 "
+                                                     "-Dmesos.resourcemanager.tasks.mem=2048 "
+                                                     "-Dtaskmanager.heap.mb=2048 "
+                                                     "-Dtaskmanager.numberOfTaskSlots=2 "
+                                                     "-Dmesos.resourcemanager.tasks.cpus=1 "
+                                                     "-Drest.bind-address=$(hostname -f) ")
+                                          :cpus 1.0
+                                          :mem  2048}
+                           :content-type :json})))]
+      (info "Submitted Flink Application via Marathon" r)
+      (c/on (-> test :nodes sort first)
+            (submit-job-with-retry! test)))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/flink.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj b/flink-jepsen/src/jepsen/flink/flink.clj
new file mode 100644
index 0000000..d5d4157
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -0,0 +1,110 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.flink
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [cli :as cli]
+             [generator :as gen]
+             [tests :as tests]]
+            [jepsen.os.debian :as debian]
+            [jepsen.flink.client :refer :all]
+            [jepsen.flink.checker :as flink-checker]
+            [jepsen.flink.db :as fdb]
+            [jepsen.flink.nemesis :as fn])
+  (:import (jepsen.flink.client Client)))
+
+(def flink-test-config
+  {:yarn-session  {:db                  (fdb/flink-yarn-db)
+                   :deployment-strategy fdb/start-yarn-session!}
+   :yarn-job      {:db                  (fdb/flink-yarn-db)
+                   :deployment-strategy fdb/start-yarn-job!}
+   :mesos-session {:db                  (fdb/flink-mesos-db)
+                   :deployment-strategy fdb/start-mesos-session!}})
+
+(defn client-gen
+  []
+  (->
+    (cons {:type :invoke, :f :submit, :value nil}
+          (cycle [{:type :invoke, :f :job-running?, :value nil}
+                  (gen/sleep 5)]))
+    (gen/seq)
+    (gen/singlethreaded)))
+
+(defn flink-test
+  [opts]
+  (merge tests/noop-test
+         (let [{:keys [db deployment-strategy]} (-> opts :deployment-mode flink-test-config)
+               {:keys [job-running-healthy-threshold job-recovery-grace-period]} opts]
+           {:name      "Apache Flink"
+            :os        debian/os
+            :db        db
+            :nemesis   (fn/nemesis)
+            :model     (flink-checker/job-running-within-grace-period
+                         job-running-healthy-threshold
+                         job-recovery-grace-period)
+            :generator (let [stop (atom nil)]
+                         (->> (fn/stoppable-generator stop (client-gen))
+                              (gen/nemesis
+                                (fn/stop-generator stop
+                                                   ((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
+                                                   job-running-healthy-threshold
+                                                   job-recovery-grace-period))))
+            :client    (Client. deployment-strategy nil nil nil nil)
+            :checker   (flink-checker/job-running-checker)})
+         (assoc opts :concurrency 1)))
+
+(defn keys-as-allowed-values-help-text
+  "Takes a map and returns a string explaining which values are allowed.
+  This is a CLI helper function."
+  [m]
+  (->> (keys m)
+       (map name)
+       (clojure.string/join ", ")
+       (str "Must be one of: ")))
+
+(defn -main
+  [& args]
+  (cli/run!
+    (merge
+      (cli/single-test-cmd
+        {:test-fn  flink-test
+         :tarball  fdb/default-flink-dist-url
+         :opt-spec [[nil "--ha-storage-dir DIR" "high-availability.storageDir"]
+                    [nil "--job-jar JAR" "Path to the job jar"]
+                    [nil "--job-args ARGS" "CLI arguments for the flink job"]
+                    [nil "--main-class CLASS" "Job main class"]
+                    [nil "--nemesis-gen GEN" (str "Which nemesis should be used?"
+                                                  (keys-as-allowed-values-help-text fn/nemesis-generator-factories))
+                     :parse-fn keyword
+                     :default :kill-task-managers
+                     :validate [#(fn/nemesis-generator-factories (keyword %))
+                                (keys-as-allowed-values-help-text fn/nemesis-generator-factories)]]
+                    [nil "--deployment-mode MODE" (keys-as-allowed-values-help-text flink-test-config)
+                     :parse-fn keyword
+                     :default :yarn-session
+                     :validate [#(flink-test-config (keyword %))
+                                (keys-as-allowed-values-help-text flink-test-config)]]
+                    [nil "--job-running-healthy-threshold TIMES" "Number of consecutive times the job must be running to be considered healthy."
+                     :default 5
+                     :parse-fn #(Long/parseLong %)
+                     :validate [pos? "Must be positive"]]
+                    [nil "--job-recovery-grace-period SECONDS" "Time period in which the job must become healthy."
+                     :default 180
+                     :parse-fn #(Long/parseLong %)
+                     :validate [pos? "Must be positive" (fn [v] (<= 60 v)) "Should be greater than 60"]]]})
+      (cli/serve-cmd))
+    args))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/generator.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/generator.clj b/flink-jepsen/src/jepsen/flink/generator.clj
new file mode 100644
index 0000000..af928c4
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/generator.clj
@@ -0,0 +1,39 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.generator
+  (:require [jepsen.util :as util]
+            [jepsen.generator :as gen]))
+
+(gen/defgenerator TimeLimitGen
+                  [dt source deadline-atom]
+                  [dt (when-let [deadline @deadline-atom]
+                        (util/nanos->secs deadline)) source]
+                  (gen/op [_ test process]
+                          (compare-and-set! deadline-atom nil (+ (util/linear-time-nanos)
+                                                                 (util/secs->nanos dt)))
+                          (when (<= (util/linear-time-nanos) @deadline-atom)
+                            (gen/op source test process))))
+
+;; In Jepsen 0.1.9 jepsen.generator/time-limit was re-written to interrupt Threads.
+;; Unfortunately the logic has race conditions which can cause spurious failures
+;; (https://github.com/jepsen-io/jepsen/issues/268).
+;;
+;; In our tests we do not need interrupts. Therefore, we use a time-limit implementation that is
+;; similar to the one shipped with Jepsen 0.1.8.
+(defn time-limit
+  [dt source]
+  (TimeLimitGen. dt source (atom nil)))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/hadoop.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/hadoop.clj b/flink-jepsen/src/jepsen/flink/hadoop.clj
new file mode 100644
index 0000000..f633d07
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/hadoop.clj
@@ -0,0 +1,139 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.hadoop
+  (:require [clojure.data.xml :as xml]
+            [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]
+             [db :as db]]
+            [jepsen.control.util :as cu]))
+
+(def install-dir "/opt/hadoop")
+(def hadoop-conf-dir (str install-dir "/etc/hadoop"))
+(def yarn-log-dir "/tmp/logs/yarn")
+
+(defn name-node
+  [nodes]
+  (first (sort nodes)))
+
+(defn resource-manager
+  [nodes]
+  (second (sort nodes)))
+
+(defn data-nodes
+  [nodes]
+  (drop 2 (sort nodes)))
+
+(defn yarn-site-config
+  [test]
+  {:yarn.resourcemanager.hostname        (resource-manager (:nodes test))
+   :yarn.log-aggregation-enable          "true"
+   :yarn.nodemanager.resource.cpu-vcores "8"
+   :yarn.resourcemanager.am.max-attempts "99999"
+   :yarn.nodemanager.log-dirs            yarn-log-dir})
+
+(defn core-site-config
+  [test]
+  {:fs.defaultFS (str "hdfs://" (name-node (:nodes test)) ":9000")})
+
+(defn property-value
+  [property value]
+  (xml/element :property {}
+               [(xml/element :name {} (name property))
+                (xml/element :value {} value)]))
+
+(defn write-config!
+  [^String config-file config]
+  (info "Writing config" config-file)
+  (let [config-xml (xml/indent-str
+                     (xml/element :configuration
+                                  {}
+                                  (map (fn [[k v]] (property-value k v)) (seq config))))]
+    (c/exec :echo config-xml :> config-file)
+    ))
+
+(defn start-name-node!
+  [test node]
+  (when (= node (name-node (:nodes test)))
+    (info "Start NameNode daemon.")
+    (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :start :namenode)))
+
+(defn start-name-node-formatted!
+  [test node]
+  (when (= node (name-node (:nodes test)))
+    (info "Format HDFS")
+    (c/exec (str install-dir "/bin/hdfs") :namenode :-format :-force :-clusterId "0000000")
+    (start-name-node! test node)))
+
+(defn stop-name-node!
+  []
+  (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :stop :namenode))
+
+(defn start-data-node!
+  [test node]
+  (when (some #{node} (data-nodes (:nodes test)))
+    (info "Start DataNode")
+    (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :start :datanode)))
+
+(defn start-resource-manager!
+  [test node]
+  (when (= node (resource-manager (:nodes test)))
+    (info "Start ResourceManager")
+    (c/exec (str install-dir "/sbin/yarn-daemon.sh") :--config hadoop-conf-dir :start :resourcemanager)))
+
+(defn start-node-manager!
+  [test node]
+  (when (some #{node} (data-nodes (:nodes test)))
+    (info "Start NodeManager")
+    (c/exec (str install-dir "/sbin/yarn-daemon.sh") :--config hadoop-conf-dir :start :nodemanager)))
+
+(defn find-files!
+  [dir]
+  (->>
+    (clojure.string/split (c/exec :find dir :-type :f) #"\n")
+    (remove clojure.string/blank?)))
+
+(defn db
+  [url]
+  (reify db/DB
+    (setup! [_ test node]
+      (info "Install Hadoop from" url)
+      (c/su
+        (cu/install-archive! url install-dir)
+        (write-config! (str install-dir "/etc/hadoop/yarn-site.xml") (yarn-site-config test))
+        (write-config! (str install-dir "/etc/hadoop/core-site.xml") (core-site-config test))
+        (c/exec :echo (c/lit "export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64") :>> (str install-dir "/etc/hadoop/hadoop-env.sh"))
+        (start-name-node-formatted! test node)
+        (start-data-node! test node)
+        (start-resource-manager! test node)
+        (start-node-manager! test node)))
+
+    (teardown! [_ test node]
+      (info "Teardown Hadoop")
+      (c/su
+        (cu/grepkill! "hadoop")
+        (c/exec (c/lit (str "rm -rf /tmp/hadoop-* ||:")))))
+
+    db/LogFiles
+    (log-files [_ _ _]
+      (c/su
+        (concat (find-files! (str install-dir "/logs"))
+                (if (cu/exists? yarn-log-dir)
+                  (do
+                    (c/exec :chmod :-R :777 yarn-log-dir)
+                    (find-files! yarn-log-dir))
+                  []))))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/mesos.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/mesos.clj b/flink-jepsen/src/jepsen/flink/mesos.clj
new file mode 100644
index 0000000..74b2c0d
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/mesos.clj
@@ -0,0 +1,165 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.mesos
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]
+             [db :as db]
+             [util :as util :refer [meh]]]
+            [jepsen.control.util :as cu]
+            [jepsen.os.debian :as debian]
+            [jepsen.flink.zookeeper :refer [zookeeper-uri]]))
+
+;;; Mesos
+
+(def master-count 1)
+(def master-pidfile "/var/run/mesos/master.pid")
+(def slave-pidfile "/var/run/mesos/slave.pid")
+(def master-dir "/var/lib/mesos/master")
+(def slave-dir "/var/lib/mesos/slave")
+(def log-dir "/var/log/mesos")
+(def master-bin "/usr/sbin/mesos-master")
+(def slave-bin "/usr/sbin/mesos-slave")
+(def zk-namespace :mesos)
+
+;;; Marathon
+
+(def marathon-bin "/usr/bin/marathon")
+(def zk-marathon-namespace "marathon")
+(def marathon-pidfile "/var/run/mesos/marathon.pid")
+(def marathon-rest-port 8080)
+
+(defn install!
+  [test node mesos-version marathon-version]
+  (c/su
+    (debian/add-repo! :mesosphere
+                      "deb http://repos.mesosphere.com/debian jessie main"
+                      "keyserver.ubuntu.com"
+                      "E56151BF")
+    (debian/install {:mesos    mesos-version
+                     :marathon marathon-version})
+    (c/exec :mkdir :-p "/var/run/mesos")
+    (c/exec :mkdir :-p master-dir)
+    (c/exec :mkdir :-p slave-dir)))
+
+;;; Mesos functions
+
+(defn start-master!
+  [test node]
+  (when (some #{node} (take master-count (sort (:nodes test))))
+    (info node "Starting mesos master")
+    (c/su
+      (c/exec :start-stop-daemon
+              :--background
+              :--chdir master-dir
+              :--exec "/usr/bin/env"
+              :--make-pidfile
+              :--no-close
+              :--oknodo
+              :--pidfile master-pidfile
+              :--start
+              :--
+              "GLOG_v=1"
+              master-bin
+              (str "--hostname=" (name node))
+              (str "--log_dir=" log-dir)
+              (str "--offer_timeout=30secs")
+              (str "--quorum=" (util/majority master-count))
+              (str "--registry_fetch_timeout=120secs")
+              (str "--registry_store_timeout=5secs")
+              (str "--work_dir=" master-dir)
+              (str "--zk=" (zookeeper-uri test zk-namespace))
+              :>> (str log-dir "/master.stdout")
+              (c/lit "2>&1")))))
+
+(defn start-slave!
+  [test node]
+  (when-not (some #{node} (take master-count (sort (:nodes test))))
+    (info node "Starting mesos slave")
+    (c/su
+      (c/exec :start-stop-daemon :--start
+              :--background
+              :--chdir slave-dir
+              :--exec slave-bin
+              :--make-pidfile
+              :--no-close
+              :--pidfile slave-pidfile
+              :--oknodo
+              :--
+              (str "--hostname=" (name node))
+              (str "--log_dir=" log-dir)
+              (str "--master=" (zookeeper-uri test zk-namespace))
+              (str "--recovery_timeout=30secs")
+              (str "--work_dir=" slave-dir)
+              :>> (str log-dir "/slave.stdout")
+              (c/lit "2>&1")))))
+
+(defn stop-master!
+  [node]
+  (info node "Stopping mesos master")
+  (meh (c/exec :killall :-9 :mesos-master))
+  (meh (c/exec :rm :-rf master-pidfile)))
+
+(defn stop-slave!
+  [node]
+  (info node "Stopping mesos slave")
+  (meh (c/exec :killall :-9 :mesos-slave))
+  (meh (c/exec :rm :-rf slave-pidfile)))
+
+;;; Marathon functions
+
+(defn start-marathon!
+  [test node]
+  (when (= node (first (sort (:nodes test))))
+    (info "Start marathon")
+    (c/su
+      (c/exec :start-stop-daemon :--start
+              :--background
+              :--exec marathon-bin
+              :--make-pidfile
+              :--no-close
+              :--pidfile marathon-pidfile
+              :--
+              (c/lit (str "--hostname " node))
+              (c/lit (str "--master " (zookeeper-uri test zk-namespace)))
+              (c/lit (str "--zk " (zookeeper-uri test zk-marathon-namespace)))
+              :>> (str log-dir "/marathon.stdout")
+              (c/lit "2>&1")))))
+
+(defn stop-marathon!
+  []
+  (cu/grepkill! "marathon"))
+
+(defn marathon-base-url
+  [test]
+  (str "http://" (first (sort (:nodes test))) ":" marathon-rest-port))
+
+(defn db
+  [mesos-version marathon-version]
+  (reify db/DB
+    (setup! [this test node]
+      (install! test node mesos-version marathon-version)
+      (start-master! test node)
+      (start-slave! test node)
+      (start-marathon! test node))
+    (teardown! [this test node]
+      (stop-slave! node)
+      (stop-master! node)
+      (stop-marathon!))
+    db/LogFiles
+    (log-files [_ test node]
+      (if (cu/exists? log-dir) (cu/ls-full log-dir) []))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/nemesis.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj b/flink-jepsen/src/jepsen/flink/nemesis.clj
new file mode 100644
index 0000000..3047eeb
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -0,0 +1,163 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.nemesis
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]
+             [generator :as gen]
+             [nemesis :as nemesis]
+             [util :as ju]]
+            [jepsen.control.util :as cu]
+            [jepsen.flink.client :refer :all]
+            [jepsen.flink.checker :as flink-checker]
+            [jepsen.flink.generator :as fgen]
+            [jepsen.flink.hadoop :as fh]
+            [jepsen.flink.zookeeper :refer :all]))
+
+(def job-submit-grace-period
+  "Period after job submission in which job managers must not fail."
+  60)
+
+(defn kill-processes
+  ([pattern] (kill-processes rand-nth pattern))
+  ([targeter pattern]
+   (reify nemesis/Nemesis
+     (setup! [this test] this)
+     (invoke! [this test op]
+       (let [nodes (-> test :nodes targeter ju/coll)]
+         (c/on-many nodes
+                    (c/su (cu/grepkill! pattern)))
+         (assoc op :value nodes)))
+     (teardown! [this test]))))
+
+(defn- non-empty-random-sample
+  [coll]
+  (let [sample (random-sample 0.5 coll)]
+    (if (empty? sample)
+      (first (shuffle coll))
+      sample)))
+
+(defn kill-taskmanager
+  ([] (kill-taskmanager identity))
+  ([targeter]
+   (kill-processes targeter "TaskExecutorRunner")))
+
+(defn kill-jobmanager
+  []
+  (kill-processes identity "ClusterEntrypoint"))
+
+(defn start-stop-name-node
+  "Nemesis stopping and starting the HDFS NameNode."
+  []
+  (nemesis/node-start-stopper
+    fh/name-node
+    (fn [test node] (c/su (fh/stop-name-node!)))
+    (fn [test node] (c/su (fh/start-name-node! test node)))))
+
+;;; Generators
+
+(defn stoppable-generator
+  [stop source]
+  (reify gen/Generator
+    (op [gen test process]
+      (if @stop
+        nil
+        (gen/op source test process)))))
+
+(defn take-last-with-default
+  [n default coll]
+  (->>
+    (cycle [default])
+    (concat (reverse coll))
+    (take n)
+    (reverse)))
+
+(defn stop-generator
+  [stop source job-running-healthy-threshold job-recovery-grace-period]
+  (gen/concat source
+              (let [t (atom nil)]
+                (reify gen/Generator
+                  (op [_ test process]
+                    (when (nil? @t)
+                      (compare-and-set! t nil (ju/relative-time-nanos)))
+                    (let [history (->>
+                                    (:active-histories test)
+                                    deref
+                                    first
+                                    deref)
+                          job-running-history (->>
+                                                history
+                                                (filter (fn [op] (>= (- (:time op) @t) 0)))
+                                                (flink-checker/get-job-running-history)
+                                                (take-last-with-default job-running-healthy-threshold false))]
+                      (if (or
+                            (and
+                              (every? true? job-running-history))
+                            (> (ju/relative-time-nanos) (+ @t (ju/secs->nanos job-recovery-grace-period))))
+                        (do
+                          (reset! stop true)
+                          nil)
+                        (do
+                          (Thread/sleep 1000)
+                          (recur test process)))))))))
+
+(defn kill-taskmanagers-gen
+  [time-limit dt op]
+  (fgen/time-limit time-limit (gen/stagger dt (gen/seq (cycle [{:type :info, :f op}])))))
+
+(defn kill-taskmanagers-bursts-gen
+  [time-limit]
+  (fgen/time-limit time-limit
+                  (gen/seq (cycle (concat (repeat 20 {:type :info, :f :kill-task-managers})
+                                          [(gen/sleep 300)])))))
+
+(defn kill-jobmanagers-gen
+  [time-limit]
+  (fgen/time-limit (+ time-limit job-submit-grace-period)
+                  (gen/seq (cons (gen/sleep job-submit-grace-period)
+                                 (cycle [{:type :info, :f :kill-job-manager}])))))
+
+(defn fail-name-node-during-recovery
+  []
+  (gen/seq [(gen/sleep job-submit-grace-period)
+            {:type :info, :f :partition-start}
+            {:type :info, :f :fail-name-node-start}
+            (gen/sleep 20)
+            {:type :info, :f :partition-stop}
+            (gen/sleep 60)
+            {:type :info, :f :fail-name-node-stop}]))
+
+(def nemesis-generator-factories
+  {:kill-task-managers             (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-task-managers))
+   :kill-single-task-manager       (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-single-task-manager))
+   :kill-random-task-managers      (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-random-task-managers))
+   :kill-task-managers-bursts      (fn [opts] (kill-taskmanagers-bursts-gen (:time-limit opts)))
+   :kill-job-managers              (fn [opts] (kill-jobmanagers-gen (:time-limit opts)))
+   :fail-name-node-during-recovery (fn [_] (fail-name-node-during-recovery))
+   :utopia                         (fn [_] (gen/sleep 60))})
+
+(defn nemesis
+  []
+  (nemesis/compose
+    {{:partition-start :start
+      :partition-stop  :stop}            (nemesis/partition-random-halves)
+     {:fail-name-node-start :start
+      :fail-name-node-stop  :stop}       (start-stop-name-node)
+     {:kill-task-managers :start}        (kill-taskmanager)
+     {:kill-single-task-manager :start}  (kill-taskmanager (fn [coll] (rand-nth coll)))
+     {:kill-random-task-managers :start} (kill-taskmanager non-empty-random-sample)
+     {:kill-job-manager :start}          (kill-jobmanager)}))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/utils.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj b/flink-jepsen/src/jepsen/flink/utils.clj
new file mode 100644
index 0000000..3fd9f96
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -0,0 +1,48 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.utils
+  (:require [clojure.tools.logging :refer :all]))
+
+(defn retry
+  "Runs a function op and retries on exception.
+
+  The following options are supported:
+
+  :on-retry - A function called for every retry with an exception and the attempt number as arguments.
+  :success - A function called with the result of op.
+  :fallback – A function with an exception as the first argument that is called if all retries are exhausted.
+  :retries - Number of total retries.
+  :delay – The time between retries."
+  ([op & {:keys [on-retry success fallback retries delay]
+          :or   {on-retry (fn [exception attempt] (warn "Retryable operation failed:"
+                                                        (.getMessage exception)))
+                 success  identity
+                 fallback :default
+                 retries  10
+                 delay    2000}
+          :as   keys}]
+   (let [r (try
+             (op)
+             (catch Exception e (if (< 0 retries)
+                                  {:exception e}
+                                  (fallback e))))]
+     (if (:exception r)
+       (do
+         (on-retry (:exception r) retries)
+         (Thread/sleep delay)
+         (recur op (assoc keys :retries (dec retries))))
+       (success r)))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/zookeeper.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/zookeeper.clj b/flink-jepsen/src/jepsen/flink/zookeeper.clj
new file mode 100644
index 0000000..8b4c319
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/zookeeper.clj
@@ -0,0 +1,29 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.zookeeper)
+
+(defn zookeeper-quorum
+  "Returns the zk quorum string, e.g., host1:2181,host2:2181"
+  [test]
+  (->> test
+       :nodes
+       (map #(str % ":2181"))
+       (clojure.string/join ",")))
+
+(defn zookeeper-uri
+  ([test] (zookeeper-uri test ""))
+  ([test namespace] (str "zk://" (zookeeper-quorum test) "/" (name namespace))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/checker_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj b/flink-jepsen/test/jepsen/flink/checker_test.clj
new file mode 100644
index 0000000..7389bbc
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -0,0 +1,82 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.checker-test
+  (:require [clojure.test :refer :all]
+            [jepsen
+             [checker :as checker]]
+            [jepsen.flink.checker :refer :all]))
+
+(deftest get-job-running-history-test
+  (let [history [{:type :info, :f :kill-random-subset-task-managers, :process :nemesis, :time 121898381144, :value '("172.31.33.170")}
+                 {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127443701575}
+                 {:type :ok, :f :job-running?, :value false, :process 0, :time 127453553462}
+                 {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127453553463}
+                 {:type :ok, :f :job-running?, :value true, :process 0, :time 127453553464}]]
+    (is (= (get-job-running-history history) [false true]))))
+
+(deftest job-running-checker-test
+  (let [checker (job-running-checker)
+        test {}
+        model (job-running-within-grace-period 3 60)
+        opts {}
+        check (fn [history] (checker/check checker test model history opts))]
+    (testing "Job is not running after grace period."
+      (is (= (:valid? (check
+                        [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}])) false)))
+    (testing "Job is running after grace period."
+      (is (= (:valid? (check
+                        [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000001}])) true)))
+    (testing "Should tolerate non-running job during failures."
+      (is (= (:valid? (check
+                        [{:type :info, :f :partition-start, :process :nemesis, :time -1}
+                         {:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
+                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
+                         {:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
+                         {:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true)))
+    (testing "Should respect healthy threshold."
+      (is (= (:valid? (check
+                        [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
+                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000003}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true))
+      (is (= (:valid? (check
+                        [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000002}
+                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) false)))
+    (testing "Job was not deployed successfully."
+      (is (= (:valid? (check [{:type :invoke, :f :job-running?, :value nil, :process 45, :time 239150413307}
+                              {:type :info, :f :job-running?, :value nil, :process 45, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])) false)))))
+
+(deftest safe-inc-test
+  (is (= (safe-inc nil) 1))
+  (is (= (safe-inc 1) 2)))
+
+(deftest nemeses-active?-test
+  (is (= (nemeses-active? {:partition-start 2 :fail-name-node-start 0}) true))
+  (is (= (nemeses-active? {:partition-start 0}) false)))
+
+(deftest dissoc-if-test
+  (is (= (:a (dissoc-if #(-> (first %) (= :b)) {:a 1 :b 2})) 1)))
+
+(deftest zero-value?-test
+  (is (= (zero-value? [:test 0]) true))
+  (is (= (zero-value? [:test 1]) false)))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/client_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/client_test.clj b/flink-jepsen/test/jepsen/flink/client_test.clj
new file mode 100644
index 0000000..b4373bf
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/client_test.clj
@@ -0,0 +1,37 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.client-test
+  (:require [clojure.test :refer :all]
+            [clj-http.fake :as fake]
+            [jepsen.flink.client :refer :all]))
+
+(deftest read-url-test
+  (is (= "https://www.asdf.de" (read-url (byte-array [0xAC 0xED 0x00 0x05 0x77 0x15 0x00 0x13 0x68 0x74 0x74 0x70 0x73 0x3A 0x2F 0x2F 0x77 0x77 0x77 0x2E 0x61 0x73 0x64 0x66 0x2E 0x64 0x65])))))
+
+(deftest job-running?-test
+  (fake/with-fake-routes
+    {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c"
+     (fn [request] {:status  200
+                    :headers {}
+                    :body    "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"w
 rite-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd544
 06c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
+     "http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196d"
+     (fn [request] {:status  200
+                    :headers {}
+                    :body    "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"CREATED\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"w
 rite-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd544
 06c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})}
+
+    (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196c") true))
+    (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196d") false))))

http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/utils_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/utils_test.clj b/flink-jepsen/test/jepsen/flink/utils_test.clj
new file mode 100644
index 0000000..607f90d
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/utils_test.clj
@@ -0,0 +1,39 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.utils-test
+  (:require [clojure.test :refer :all])
+  (:require [jepsen.flink.utils :refer [retry]]))
+
+(deftest retry-test
+  (testing "Single failure then result."
+    (let [counter (atom 0)
+          failing-once (fn [] (if (= @counter 0)
+                                (do (swap! counter inc)
+                                    (throw (Exception. "Expected")))
+                                "result"))]
+      (is (= "result" (retry failing-once :delay 0)))))
+
+  (testing "Exhaust all attempts."
+    (let [failing-always (fn [] (throw (Exception. "Expected")))]
+      (is (nil? (retry failing-always :retries 1 :delay 0)))))
+
+  (testing "Propagate exception."
+    (let [failing-always (fn [] (throw (Exception. "Expected")))]
+      (is (thrown-with-msg? Exception #"Expected" (retry failing-always
+                                                         :retries 1
+                                                         :delay 0
+                                                         :fallback (fn [e] (throw e))))))))