You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/13 12:39:48 UTC
[4/6] flink git commit: [FLINK-9004][tests] Implement Jepsen tests to
test job availability.
[FLINK-9004][tests] Implement Jepsen tests to test job availability.
Use the Jepsen framework (https://github.com/jepsen-io/jepsen) to implement
tests that verify Flink's HA capabilities under real-world faults, such as
sudden TaskManager/JobManager termination, HDFS NameNode unavailability, network
partitions, etc. The Flink cluster under test is automatically deployed on YARN
(session & job mode) and Mesos.
Provide Dockerfiles for local test development.
This closes #6240.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d58c8c05
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d58c8c05
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d58c8c05
Branch: refs/heads/master
Commit: d58c8c05f0b86bdb74cb6e450848690e309011d6
Parents: d783c62
Author: gyao <ga...@data-artisans.com>
Authored: Mon Mar 5 22:23:33 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jul 13 14:39:31 2018 +0200
----------------------------------------------------------------------
flink-jepsen/.gitignore | 18 ++
flink-jepsen/README.md | 70 ++++++
flink-jepsen/bin/.gitkeep | 0
flink-jepsen/docker/.gitignore | 3 +
flink-jepsen/docker/Dockerfile-control | 45 ++++
flink-jepsen/docker/Dockerfile-db | 39 ++++
flink-jepsen/docker/docker-compose.yml | 52 +++++
flink-jepsen/docker/run-tests.sh | 31 +++
flink-jepsen/docker/up.sh | 31 +++
flink-jepsen/project.clj | 28 +++
flink-jepsen/scripts/run-tests.sh | 43 ++++
flink-jepsen/src/jepsen/flink/checker.clj | 128 ++++++++++
flink-jepsen/src/jepsen/flink/client.clj | 150 ++++++++++++
flink-jepsen/src/jepsen/flink/db.clj | 232 +++++++++++++++++++
flink-jepsen/src/jepsen/flink/flink.clj | 110 +++++++++
flink-jepsen/src/jepsen/flink/generator.clj | 39 ++++
flink-jepsen/src/jepsen/flink/hadoop.clj | 139 +++++++++++
flink-jepsen/src/jepsen/flink/mesos.clj | 165 +++++++++++++
flink-jepsen/src/jepsen/flink/nemesis.clj | 163 +++++++++++++
flink-jepsen/src/jepsen/flink/utils.clj | 48 ++++
flink-jepsen/src/jepsen/flink/zookeeper.clj | 29 +++
flink-jepsen/test/jepsen/flink/checker_test.clj | 82 +++++++
flink-jepsen/test/jepsen/flink/client_test.clj | 37 +++
flink-jepsen/test/jepsen/flink/utils_test.clj | 39 ++++
.../test/jepsen/flink/zookeeper_test.clj | 28 +++
25 files changed, 1749 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/.gitignore
----------------------------------------------------------------------
diff --git a/flink-jepsen/.gitignore b/flink-jepsen/.gitignore
new file mode 100644
index 0000000..ed5eca5
--- /dev/null
+++ b/flink-jepsen/.gitignore
@@ -0,0 +1,18 @@
+*.class
+*.iml
+*.jar
+*.retry
+.DS_Store
+.hg/
+.hgignore
+.idea/
+/.lein-*
+/.nrepl-port
+/checkouts
+/classes
+/target
+pom.xml
+pom.xml.asc
+store
+bin/*
+!bin/.gitkeep
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/README.md
----------------------------------------------------------------------
diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
new file mode 100644
index 0000000..9343246
--- /dev/null
+++ b/flink-jepsen/README.md
@@ -0,0 +1,70 @@
+# flink-jepsen
+
+A Clojure project based on the [Jepsen](https://github.com/jepsen-io/jepsen) framework to find bugs in the
+distributed coordination of Apache Flink®.
+
+## Test Coverage
+Jepsen is a framework built to test the behavior of distributed systems
+under faults. The tests in this particular project deploy Flink on either YARN or Mesos, submit a
+job, and examine the availability of the job after injecting faults.
+A job is said to be available if all the tasks of the job are running.
+The faults that can be currently introduced to the Flink cluster include:
+* Killing of TaskManager/JobManager processes
+* Stopping HDFS NameNode
+* Network partitions
+
+There are many more properties other than job availability that could be
+verified but are not yet covered by this test suite, e.g., end-to-end exactly-once processing
+semantics.
+
+## Usage
+See the [Jepsen documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment)
+for how to set up the environment to run tests. The script under `scripts/run-tests.sh` documents how to invoke
+tests. The Flink job used for testing is located under
+`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build the job first and copy
+the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's
+root.
+
+### Docker
+
+To simplify development, we have prepared Dockerfiles and a Docker Compose template
+so that you can run the tests locally in containers. To build the images
+and start the containers, simply run:
+
+ $ cd docker
+ $ ./up.sh
+
+After the containers started, open a new terminal window and run `docker exec -it jepsen-control bash`.
+This will allow you to run arbitrary commands on the control node.
+To start the tests, you can use the `run-tests.sh` script in the `docker` directory,
+which expects the number of test iterations, and a URI to a Flink distribution, e.g.,
+
+ ./docker/run-tests.sh 1 https://example.com/flink-dist.tgz
+
+The project's root is mounted as a volume to all containers under the path `/jepsen`.
+This means that changes to the test sources are immediately reflected in the control node container.
+Moreover, this allows you to test locally built Flink distributions by copying the tarball to the
+project's root and passing a URI with the `file://` scheme to the `run-tests.sh` script, e.g.,
+`file:///jepsen/flink-dist.tgz`.
+
+#### Memory Requirements
+
+The tests have high memory demands due to the many processes that are started by the control node.
+For example, to test Flink on YARN in a HA setup, we require ZooKeeper, HDFS NameNode,
+HDFS DataNode, YARN NodeManager, and YARN ResourceManager, in addition to the Flink processes.
+We found that the tests can be run comfortably in Docker containers on a machine with 32 GiB RAM.
+
+### Checking the Output of Tests
+
+Consult the `jepsen.log` file for the particular test run in the `store` folder. The final output of every test will be either
+
+ Everything looks good! ヽ('ー`)ノ
+
+or
+
+ Analysis invalid! (ノಥ益ಥ)ノ ┻━┻
+
+depending on whether the test passed or not. If neither output is generated, the test did not finish
+properly due to problems of the environment, bugs in Jepsen or in the test suite, etc.
+
+In addition, the test directories contain all relevant log files aggregated from all hosts.
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/bin/.gitkeep
----------------------------------------------------------------------
diff --git a/flink-jepsen/bin/.gitkeep b/flink-jepsen/bin/.gitkeep
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/.gitignore
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/.gitignore b/flink-jepsen/docker/.gitignore
new file mode 100644
index 0000000..6ff5a8e
--- /dev/null
+++ b/flink-jepsen/docker/.gitignore
@@ -0,0 +1,3 @@
+id_rsa
+id_rsa.pub
+nodes
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/Dockerfile-control
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/Dockerfile-control b/flink-jepsen/docker/Dockerfile-control
new file mode 100644
index 0000000..96198e3
--- /dev/null
+++ b/flink-jepsen/docker/Dockerfile-control
@@ -0,0 +1,45 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM debian:jessie
+
+RUN echo "deb http://http.debian.net/debian jessie-backports main" >> /etc/apt/sources.list && \
+ apt-get update && \
+ apt-get install -y -t jessie-backports openjdk-8-jdk && \
+ apt-get install -qqy \
+ less \
+ libjna-java \
+ gnuplot \
+ openjdk-8-jdk \
+ openssh-client \
+ vim \
+ wget
+
+ENV LEIN_ROOT true
+RUN wget https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein && \
+ mv lein /usr/bin && \
+ chmod +x /usr/bin/lein && \
+ lein self-install
+
+ADD id_rsa /root/.ssh/
+ADD id_rsa.pub /root/.ssh/
+RUN touch ~/.ssh/known_hosts
+
+WORKDIR /jepsen
+
+CMD tail -f /dev/null
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/Dockerfile-db
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/Dockerfile-db b/flink-jepsen/docker/Dockerfile-db
new file mode 100644
index 0000000..1555329
--- /dev/null
+++ b/flink-jepsen/docker/Dockerfile-db
@@ -0,0 +1,39 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+FROM debian:jessie
+
+RUN echo "deb http://http.debian.net/debian jessie-backports main" >> /etc/apt/sources.list && \
+ apt-get update && \
+ apt-get install -y -t jessie-backports openjdk-8-jdk && \
+ apt-get install -y apt-utils bzip2 curl faketime iproute iptables iputils-ping less libzip2 logrotate man man-db net-tools ntpdate psmisc python rsyslog sudo sysvinit sysvinit-core sysvinit-utils tar unzip vim wget
+
+RUN apt-get update && \
+ apt-get -y install openssh-server && \
+ mkdir -p /var/run/sshd && \
+ sed -i "s/UsePrivilegeSeparation.*/UsePrivilegeSeparation no/g" /etc/ssh/sshd_config && \
+ sed -i "s/PermitRootLogin without-password/PermitRootLogin yes/g" /etc/ssh/sshd_config
+
+ADD id_rsa.pub /root
+RUN mkdir -p /root/.ssh/ && \
+ touch /root/.ssh/authorized_keys && \
+ chmod 600 /root/.ssh/authorized_keys && \
+ cat /root/id_rsa.pub >> /root/.ssh/authorized_keys
+
+EXPOSE 22
+CMD exec /usr/sbin/sshd -D
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/docker-compose.yml
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/docker-compose.yml b/flink-jepsen/docker/docker-compose.yml
new file mode 100644
index 0000000..3d2bdbd
--- /dev/null
+++ b/flink-jepsen/docker/docker-compose.yml
@@ -0,0 +1,52 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+version: '2'
+services:
+ control:
+ volumes:
+ - ${JEPSEN_ROOT}:/jepsen
+
+ container_name: jepsen-control
+ hostname: control
+ build:
+ context: ./
+ dockerfile: Dockerfile-control
+ privileged: true
+ links:
+ - n1
+ - n2
+ - n3
+
+ n1:
+ build:
+ context: ./
+ dockerfile: Dockerfile-db
+ privileged: true
+ container_name: jepsen-n1
+ hostname: n1
+ volumes:
+ - ${JEPSEN_ROOT}:/jepsen
+ n2:
+ extends: n1
+ container_name: jepsen-n2
+ hostname: n2
+ n3:
+ extends: n1
+ container_name: jepsen-n3
+ hostname: n3
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/run-tests.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/run-tests.sh b/flink-jepsen/docker/run-tests.sh
new file mode 100755
index 0000000..8b2b1e6
--- /dev/null
+++ b/flink-jepsen/docker/run-tests.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+dockerdir=$(dirname $0)
+dockerdir=$(cd ${dockerdir}; pwd)
+
+cat <<EOF > ${dockerdir}/nodes
+n1
+n2
+n3
+EOF
+
+common_jepsen_args+=(--nodes-file ${dockerdir}/nodes)
+
+. ${dockerdir}/../scripts/run-tests.sh ${1} ${2} 1
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/docker/up.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/docker/up.sh b/flink-jepsen/docker/up.sh
new file mode 100755
index 0000000..5479b3b
--- /dev/null
+++ b/flink-jepsen/docker/up.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -e
+
+dockerdir=$(dirname $0)
+dockerdir=$(cd ${dockerdir}; pwd)
+
+if [ ! -f ./id_rsa ]; then
+ ssh-keygen -t rsa -N "" -f ./id_rsa
+fi
+
+export JEPSEN_ROOT=${dockerdir}/../
+docker-compose build
+docker-compose -f docker-compose.yml up --force-recreate
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/project.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/project.clj b/flink-jepsen/project.clj
new file mode 100644
index 0000000..78935d7
--- /dev/null
+++ b/flink-jepsen/project.clj
@@ -0,0 +1,28 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(defproject jepsen.flink "0.1.0-SNAPSHOT"
+ :license {:name "Apache License"
+ :url "http://www.apache.org/licenses/LICENSE-2.0"}
+ :main jepsen.flink.flink
+ :dependencies [[org.clojure/clojure "1.9.0"],
+ [cheshire "5.8.0"]
+ [clj-http "3.8.0"]
+ [jepsen "0.1.10"],
+ [jepsen.zookeeper "0.1.0"]
+ [org.clojure/data.xml "0.0.8"]
+ [zookeeper-clj "0.9.4" :exclusions [org.slf4j/slf4j-log4j12]]]
+ :profiles {:test {:dependencies [[clj-http-fake "1.0.3"]]}})
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/scripts/run-tests.sh
----------------------------------------------------------------------
diff --git a/flink-jepsen/scripts/run-tests.sh b/flink-jepsen/scripts/run-tests.sh
new file mode 100755
index 0000000..e448124
--- /dev/null
+++ b/flink-jepsen/scripts/run-tests.sh
@@ -0,0 +1,43 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -euo pipefail
+
+scripts=$(dirname $0)
+scripts=$(cd ${scripts}; pwd)
+
+parallelism=${3}
+
+common_jepsen_args+=(--ha-storage-dir hdfs:///flink
+--job-jar ${scripts}/../bin/DataStreamAllroundTestProgram.jar
+--tarball ${2}
+--job-args "--environment.parallelism ${parallelism} --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"
+--ssh-private-key ~/.ssh/id_rsa)
+
+for i in $(seq 1 ${1})
+do
+ echo "Executing run #${i} of ${1}"
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-session
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-job
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-job
+ lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-job
+ echo
+done
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/checker.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj b/flink-jepsen/src/jepsen/flink/checker.clj
new file mode 100644
index 0000000..02cc863
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -0,0 +1,128 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.checker
+ (:require [jepsen
+ [checker :as checker]
+ [util :as ju]]
+ [knossos.model :as model])
+ (:import (knossos.model Model)))
+
+(defn stoppable-op? [op]
+ (clojure.string/includes? (name (:f op)) "-start"))
+
+(defn stop-op? [op]
+ (clojure.string/includes? (name (:f op)) "-stop"))
+
+(defn strip-op-suffix [op]
+ (clojure.string/replace (name (:f op)) #"-start|-stop" ""))
+
+(def safe-inc
+ (fnil inc 0))
+
+(defn nemeses-active?
+ [active-nemeses]
+ (->> (vals active-nemeses)
+ (reduce +)
+ pos?))
+
+(defn dissoc-if
+ [f m]
+ (->> (remove f m)
+ (into {})))
+
+(defn zero-value?
+ [[_ v]]
+ (zero? v))
+
+(defrecord
+ JobRunningWithinGracePeriod
+ ^{:doc "A Model which is consistent iff. the Flink job became available within
+ `job-recovery-grace-period` seconds after the last fault injected by the nemesis.
+ Note that some faults happen at a single point in time (e.g., killing of processes). Other faults,
+ such as network splits, happen during a period of time, and can thus be interleaving. As long as
+ there are active faults, the job is allowed not to be available."}
+ [active-nemeses ; stores active failures
+ healthy-count ; how many consecutive times was the job running?
+ last-failure ; timestamp when the last failure was injected/ended
+ healthy-threshold ; after how many times is the job considered healthy
+ job-recovery-grace-period] ; after how many seconds should the job be recovered
+ Model
+ (step [this op]
+ (case (:process op)
+ :nemesis (cond
+ (nil? (:value op)) this
+ (stoppable-op? op) (assoc
+ this
+ :active-nemeses (update active-nemeses
+ (strip-op-suffix op)
+ safe-inc))
+ (stop-op? op) (assoc
+ this
+ :active-nemeses (dissoc-if zero-value?
+ (update active-nemeses (strip-op-suffix op) dec))
+ :last-failure (:time op))
+ :else (assoc this :last-failure (:time op)))
+ (case (:f op)
+ :job-running? (case (:type op)
+ :info this ; ignore :info operations
+ :fail this ; ignore :fail operations
+ :invoke this ; ignore :invoke operations
+ :ok (if (:value op) ; check if job is running
+ (assoc ; job is running
+ this
+ :healthy-count
+ (inc healthy-count))
+ (if (and ; job is not running
+ (not (nemeses-active? active-nemeses))
+ (< healthy-count healthy-threshold)
+ (> (ju/nanos->secs (- (:time op) last-failure)) job-recovery-grace-period))
+ ; job is not running but it should be running
+ ; because grace period passed
+ (model/inconsistent "Job is not running.")
+ (conj this
+ [:healthy-count 0]))))
+ ; ignore other client operations
+ this))))
+
+(defn job-running-within-grace-period
+ [job-running-healthy-threshold job-recovery-grace-period]
+ (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold job-recovery-grace-period))
+
+(defn job-running-checker
+ []
+ (reify
+ checker/Checker
+ (check [_ test model history _]
+ (let [final (reduce model/step (assoc model :last-failure (:time (first history))) history)
+ result-map (conj {}
+ (find test :nemesis-gen)
+ (find test :deployment-mode))]
+ (if (or (model/inconsistent? final) (zero? (:healthy-count final 0)))
+ (into result-map {:valid? false
+ :error (:msg final)})
+ (into result-map {:valid? true
+ :final-model final}))))))
+
+(defn get-job-running-history
+ [history]
+ (->>
+ history
+ (remove #(= (:process %) :nemesis))
+ (remove #(= (:type %) :invoke))
+ (map :value)
+ (map boolean)
+ (remove nil?)))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/client.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/client.clj b/flink-jepsen/src/jepsen/flink/client.clj
new file mode 100644
index 0000000..905dc48
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -0,0 +1,150 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.client
+ (:require [clj-http.client :as http]
+ [clojure.tools.logging :refer :all]
+ [jepsen.client :as client]
+ [jepsen.flink.zookeeper :as fz]
+ [jepsen.flink.utils :as fu]
+ [zookeeper :as zk])
+ (:import (java.io ByteArrayInputStream ObjectInputStream)))
+
+(defn connect-zk-client!
+ [connection-string]
+ (zk/connect connection-string :timeout-msec 60000))
+
+(defn read-url
+ [bytes]
+ (with-open [object-input-stream (ObjectInputStream. (ByteArrayInputStream. bytes))]
+ (.readUTF object-input-stream)))
+
+(defn wait-for-zk-operation
+ [zk-client operation path]
+ (let [p (promise)]
+ (letfn [(iter [_]
+ (when-let [res (operation zk-client path :watcher iter)]
+ (deliver p res)))
+ ]
+ (iter nil)
+ p)))
+
+(defn wait-for-path-to-exist
+ [zk-client path]
+ (info "Waiting for path" path "in ZK.")
+ (wait-for-zk-operation zk-client zk/exists path))
+
+(defn wait-for-children-to-exist
+ [zk-client path]
+ (wait-for-zk-operation zk-client zk/children path))
+
+(defn find-application-id
+ [zk-client]
+ (do
+ (->
+ (wait-for-path-to-exist zk-client "/flink")
+ (deref))
+ (->
+ (wait-for-children-to-exist zk-client "/flink")
+ (deref)
+ (first))))
+
+(defn watch-node-bytes
+ [zk-client path callback]
+ (when (zk/exists zk-client path :watcher (fn [_] (watch-node-bytes zk-client path callback)))
+ (->>
+ (zk/data zk-client path :watcher (fn [_] (watch-node-bytes zk-client path callback)))
+ :data
+ (callback))))
+
+(defn make-job-manager-url [test]
+ (let [rest-url-atom (atom nil)
+ zk-client (connect-zk-client! (fz/zookeeper-quorum test))
+ init-future (future
+ (let [application-id (find-application-id zk-client)
+ path (str "/flink/" application-id "/leader/rest_server_lock")
+ _ (->
+ (wait-for-path-to-exist zk-client path)
+ (deref))]
+ (info "Determined application id to be" application-id)
+ (watch-node-bytes zk-client path
+ (fn [bytes]
+ (let [url (read-url bytes)]
+ (info "Leading REST url changed to" url)
+ (reset! rest-url-atom url))))))]
+ {:rest-url-atom rest-url-atom
+ :closer (fn [] (zk/close zk-client))
+ :init-future init-future}))
+
+(defn list-jobs!
+ [base-url]
+ (->>
+ (http/get (str base-url "/jobs") {:as :json})
+ :body
+ :jobs
+ (map :id)))
+
+(defn get-job-details!
+ [base-url job-id]
+ (assert base-url)
+ (assert job-id)
+ (let [job-details (->
+ (http/get (str base-url "/jobs/" job-id) {:as :json})
+ :body)]
+ (assert (:vertices job-details) "Job does not have vertices")
+ job-details))
+
+(defn job-running?
+ [base-url job-id]
+ (->>
+ (get-job-details! base-url job-id)
+ :vertices
+ (map :status)
+ (every? #(= "RUNNING" %))))
+
+(defrecord Client
+ [deploy-cluster! closer rest-url init-future job-id]
+ client/Client
+ (open! [this test node]
+ (let [{:keys [rest-url-atom closer init-future]} (make-job-manager-url test)]
+ (assoc this :closer closer :rest-url rest-url-atom :init-future init-future :job-id (atom nil))))
+
+ (setup! [this test] this)
+
+ (invoke! [this test op]
+ (case (:f op)
+ :submit (do
+ (deploy-cluster! test)
+ (deref init-future)
+ (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
+ :fallback (fn [e] (do
+ (fatal e "Could not get running jobs.")
+ (System/exit 1))))
+ num-jobs (count jobs)]
+ (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
+ (reset! job-id (first jobs)))
+ (assoc op :type :ok))
+ :job-running? (let [base-url @rest-url]
+ (if base-url
+ (try
+ (assoc op :type :ok :value (job-running? base-url @job-id))
+ (catch Exception e (do
+ (warn e "Get job details from" base-url "failed.")
+ (assoc op :type :fail))))
+ (assoc op :type :fail :value "Cluster not deployed yet.")))))
+
+ (teardown! [this test])
+ (close! [this test] (closer)))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/db.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/db.clj b/flink-jepsen/src/jepsen/flink/db.clj
new file mode 100644
index 0000000..ff934e1
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -0,0 +1,232 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.db
+ (:require [clj-http.client :as http]
+ [clojure.java.io]
+ [clojure.string :as str]
+ [clojure.tools.logging :refer :all]
+ [jepsen
+ [control :as c]
+ [db :as db]
+ [util :refer [meh]]
+ [zookeeper :as zk]]
+ [jepsen.control.util :as cu]
+ [jepsen.flink.hadoop :as hadoop]
+ [jepsen.flink.mesos :as mesos]
+ [jepsen.flink.utils :as fu]
+ [jepsen.flink.zookeeper :refer :all]))
+
+(def install-dir "/opt/flink")
+(def upload-dir "/tmp")
+(def log-dir (str install-dir "/log"))
+(def conf-file (str install-dir "/conf/flink-conf.yaml"))
+(def masters-file (str install-dir "/conf/masters"))
+
+(def default-flink-dist-url "https://archive.apache.org/dist/flink/flink-1.5.0/flink-1.5.0-bin-hadoop28-scala_2.11.tgz")
+(def hadoop-dist-url "https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz")
+(def deb-zookeeper-package "3.4.9-3+deb8u1")
+(def deb-mesos-package "1.5.0-2.0.2")
+(def deb-marathon-package "1.6.322")
+
+(def taskmanager-slots 1)
+(def master-count 1)
+
+(defn flink-configuration
+ [test]
+ {:high-availability "zookeeper"
+ :high-availability.zookeeper.quorum (zookeeper-quorum test)
+ :high-availability.storageDir (str (:ha-storage-dir test) "/ha")
+ :state.savepoints.dir (str (:ha-storage-dir test) "/savepoints")
+ :web.port 8081
+ :rest.bind-address "0.0.0.0"
+ :taskmanager.numberOfTaskSlots taskmanager-slots
+ :yarn.application-attempts 99999
+ :slotmanager.taskmanager-timeout 10000
+ :state.backend.local-recovery "true"
+ :taskmanager.registration.timeout "30 s"})
+
+(defn master-nodes
+ [test]
+ (take master-count (sort (:nodes test))))
+
+(defn write-configuration!
+ "Writes the flink-conf.yaml and masters file to the flink conf directory"
+ [test]
+ (let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
+ (seq (flink-configuration test))))
+ m (clojure.string/join "\n" (master-nodes test))]
+ (c/exec :echo c :> conf-file)
+ (c/exec :echo m :> masters-file)
+ ;; TODO: write log4j.properties properly
+ (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, file/' " install-dir "/conf/log4j.properties")))))
+
+(defn install-flink!
+ [test]
+ (let [url (:tarball test)]
+ (info "Installing Flink from" url)
+ (cu/install-archive! url install-dir)
+ (info "Enable S3 FS")
+ (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/lib")))
+ (c/upload (:job-jar test) upload-dir)
+ (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar test)))) install-dir)
+ (write-configuration! test)))
+
+(defn teardown-flink!
+ []
+ (info "Tearing down Flink")
+ (meh (c/exec :rm :-rf install-dir))
+ (meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
+
+(defn get-log-files!
+ []
+ (if (cu/exists? log-dir) (cu/ls-full log-dir) []))
+
+(defn flink-db
+ [test]
+ (reify db/DB
+ (setup! [_ test node]
+ (c/su
+ (install-flink! test)))
+
+ (teardown! [_ test node]
+ (c/su
+ (teardown-flink!)))
+
+ db/LogFiles
+ (log-files [_ test node]
+ (concat
+ (get-log-files!)))))
+
+(defn combined-db
+ [dbs]
+ (reify db/DB
+ (setup! [_ test node]
+ (c/su
+ (doall (map #(db/setup! % test node) dbs))))
+ (teardown! [_ test node]
+ (c/su
+ (doall (map #(db/teardown! % test node) dbs))))
+ db/LogFiles
+ (log-files [_ test node]
+ (flatten (map #(db/log-files % test node) dbs)))))
+
+;;; YARN
+
+(defn flink-yarn-db
+ []
+ (let [zk (zk/db deb-zookeeper-package)
+ hadoop (hadoop/db hadoop-dist-url)
+ flink (flink-db test)]
+ (combined-db [hadoop zk flink])))
+
+(defn exec-flink!
+ [test cmd args]
+ (c/su
+ (c/exec (c/lit (str
+ "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+ "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+ install-dir "/bin/flink " cmd " " args)))))
+
+(defn flink-run-cli-args
+ "Returns the CLI args that should be passed to 'flink run'"
+ [test]
+ (concat
+ ["-d"]
+ (if (:main-class test)
+ [(str "-c " (:main-class test))]
+ [])
+ (if (= :yarn-job (:deployment-mode test))
+ ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]
+ [])))
+
+(defn submit-job!
+ ([test] (submit-job! test []))
+ ([test cli-args]
+ (exec-flink! test "run" (clojure.string/join
+ " "
+ (concat cli-args
+ (flink-run-cli-args test)
+ [(str install-dir "/" (last (str/split (:job-jar test) #"/")))
+ (:job-args test)])))))
+
+(defn first-node
+ [test]
+ (-> test :nodes sort first))
+
+(defn start-yarn-session!
+ [test]
+ (let [node (first-node test)]
+ (c/on node
+ (info "Starting YARN session from" node)
+ (c/su
+ (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+ "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
+ " " install-dir "/bin/yarn-session.sh -d -jm 2048m -tm 2048m")))
+ (submit-job! test)))))
+
+(defn start-yarn-job!
+ [test]
+ (c/on (first-node test)
+ (c/su
+ (submit-job! test))))
+
+;;; Mesos
+
+(defn flink-mesos-db
+ []
+ (let [zk (zk/db deb-zookeeper-package)
+ hadoop (hadoop/db hadoop-dist-url)
+ mesos (mesos/db deb-mesos-package deb-marathon-package)
+ flink (flink-db test)]
+ (combined-db [hadoop zk mesos flink])))
+
+(defn submit-job-with-retry!
+ [test]
+ (fu/retry
+ (partial submit-job! test)
+ :fallback (fn [e] (do
+ (fatal e "Could not submit job.")
+ (System/exit 1)))))
+
+(defn start-mesos-session!
+ [test]
+ (c/su
+ (let [r (fu/retry (fn []
+ (http/post
+ (str (mesos/marathon-base-url test) "/v2/apps")
+ {:form-params {:id "flink"
+ :cmd (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
+ "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+ install-dir "/bin/mesos-appmaster.sh "
+ "-Dmesos.master=" (zookeeper-uri
+ test
+ mesos/zk-namespace) " "
+ "-Djobmanager.rpc.address=$(hostname -f) "
+ "-Djobmanager.heap.mb=2048 "
+ "-Djobmanager.rpc.port=6123 "
+ "-Djobmanager.web.port=8081 "
+ "-Dmesos.resourcemanager.tasks.mem=2048 "
+ "-Dtaskmanager.heap.mb=2048 "
+ "-Dtaskmanager.numberOfTaskSlots=2 "
+ "-Dmesos.resourcemanager.tasks.cpus=1 "
+ "-Drest.bind-address=$(hostname -f) ")
+ :cpus 1.0
+ :mem 2048}
+ :content-type :json})))]
+ (info "Submitted Flink Application via Marathon" r)
+ (c/on (-> test :nodes sort first)
+ (submit-job-with-retry! test)))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/flink.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj b/flink-jepsen/src/jepsen/flink/flink.clj
new file mode 100644
index 0000000..d5d4157
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -0,0 +1,110 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.flink
+ (:require [clojure.tools.logging :refer :all]
+ [jepsen
+ [cli :as cli]
+ [generator :as gen]
+ [tests :as tests]]
+ [jepsen.os.debian :as debian]
+ [jepsen.flink.client :refer :all]
+ [jepsen.flink.checker :as flink-checker]
+ [jepsen.flink.db :as fdb]
+ [jepsen.flink.nemesis :as fn])
+ (:import (jepsen.flink.client Client)))
+
+(def flink-test-config
+ {:yarn-session {:db (fdb/flink-yarn-db)
+ :deployment-strategy fdb/start-yarn-session!}
+ :yarn-job {:db (fdb/flink-yarn-db)
+ :deployment-strategy fdb/start-yarn-job!}
+ :mesos-session {:db (fdb/flink-mesos-db)
+ :deployment-strategy fdb/start-mesos-session!}})
+
+(defn client-gen
+ []
+ (->
+ (cons {:type :invoke, :f :submit, :value nil}
+ (cycle [{:type :invoke, :f :job-running?, :value nil}
+ (gen/sleep 5)]))
+ (gen/seq)
+ (gen/singlethreaded)))
+
+(defn flink-test
+ [opts]
+ (merge tests/noop-test
+ (let [{:keys [db deployment-strategy]} (-> opts :deployment-mode flink-test-config)
+ {:keys [job-running-healthy-threshold job-recovery-grace-period]} opts]
+ {:name "Apache Flink"
+ :os debian/os
+ :db db
+ :nemesis (fn/nemesis)
+ :model (flink-checker/job-running-within-grace-period
+ job-running-healthy-threshold
+ job-recovery-grace-period)
+ :generator (let [stop (atom nil)]
+ (->> (fn/stoppable-generator stop (client-gen))
+ (gen/nemesis
+ (fn/stop-generator stop
+ ((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
+ job-running-healthy-threshold
+ job-recovery-grace-period))))
+ :client (Client. deployment-strategy nil nil nil nil)
+ :checker (flink-checker/job-running-checker)})
+ (assoc opts :concurrency 1)))
+
+(defn keys-as-allowed-values-help-text
+ "Takes a map and returns a string explaining which values are allowed.
+ This is a CLI helper function."
+ [m]
+ (->> (keys m)
+ (map name)
+ (clojure.string/join ", ")
+ (str "Must be one of: ")))
+
+(defn -main
+ [& args]
+ (cli/run!
+ (merge
+ (cli/single-test-cmd
+ {:test-fn flink-test
+ :tarball fdb/default-flink-dist-url
+ :opt-spec [[nil "--ha-storage-dir DIR" "high-availability.storageDir"]
+ [nil "--job-jar JAR" "Path to the job jar"]
+ [nil "--job-args ARGS" "CLI arguments for the flink job"]
+ [nil "--main-class CLASS" "Job main class"]
+ [nil "--nemesis-gen GEN" (str "Which nemesis should be used?"
+ (keys-as-allowed-values-help-text fn/nemesis-generator-factories))
+ :parse-fn keyword
+ :default :kill-task-managers
+ :validate [#(fn/nemesis-generator-factories (keyword %))
+ (keys-as-allowed-values-help-text fn/nemesis-generator-factories)]]
+ [nil "--deployment-mode MODE" (keys-as-allowed-values-help-text flink-test-config)
+ :parse-fn keyword
+ :default :yarn-session
+ :validate [#(flink-test-config (keyword %))
+ (keys-as-allowed-values-help-text flink-test-config)]]
+ [nil "--job-running-healthy-threshold TIMES" "Number of consecutive times the job must be running to be considered healthy."
+ :default 5
+ :parse-fn #(Long/parseLong %)
+ :validate [pos? "Must be positive"]]
+ [nil "--job-recovery-grace-period SECONDS" "Time period in which the job must become healthy."
+ :default 180
+ :parse-fn #(Long/parseLong %)
+ :validate [pos? "Must be positive" (fn [v] (<= 60 v)) "Should be greater than 60"]]]})
+ (cli/serve-cmd))
+ args))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/generator.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/generator.clj b/flink-jepsen/src/jepsen/flink/generator.clj
new file mode 100644
index 0000000..af928c4
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/generator.clj
@@ -0,0 +1,39 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.generator
+ (:require [jepsen.util :as util]
+ [jepsen.generator :as gen]))
+
+(gen/defgenerator TimeLimitGen
+ [dt source deadline-atom]
+ [dt (when-let [deadline @deadline-atom]
+ (util/nanos->secs deadline)) source]
+ (gen/op [_ test process]
+ (compare-and-set! deadline-atom nil (+ (util/linear-time-nanos)
+ (util/secs->nanos dt)))
+ (when (<= (util/linear-time-nanos) @deadline-atom)
+ (gen/op source test process))))
+
+;; In Jepsen 0.1.9 jepsen.generator/time-limit was re-written to interrupt Threads.
+;; Unfortunately the logic has race conditions which can cause spurious failures
+;; (https://github.com/jepsen-io/jepsen/issues/268).
+;;
+;; In our tests we do not need interrupts. Therefore, we use a time-limit implementation that is
+;; similar to the one shipped with Jepsen 0.1.8.
+(defn time-limit
+ [dt source]
+ (TimeLimitGen. dt source (atom nil)))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/hadoop.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/hadoop.clj b/flink-jepsen/src/jepsen/flink/hadoop.clj
new file mode 100644
index 0000000..f633d07
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/hadoop.clj
@@ -0,0 +1,139 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.hadoop
+ (:require [clojure.data.xml :as xml]
+ [clojure.tools.logging :refer :all]
+ [jepsen
+ [control :as c]
+ [db :as db]]
+ [jepsen.control.util :as cu]))
+
+(def install-dir "/opt/hadoop")
+(def hadoop-conf-dir (str install-dir "/etc/hadoop"))
+(def yarn-log-dir "/tmp/logs/yarn")
+
+(defn name-node
+ [nodes]
+ (first (sort nodes)))
+
+(defn resource-manager
+ [nodes]
+ (second (sort nodes)))
+
+(defn data-nodes
+ [nodes]
+ (drop 2 (sort nodes)))
+
+(defn yarn-site-config
+ [test]
+ {:yarn.resourcemanager.hostname (resource-manager (:nodes test))
+ :yarn.log-aggregation-enable "true"
+ :yarn.nodemanager.resource.cpu-vcores "8"
+ :yarn.resourcemanager.am.max-attempts "99999"
+ :yarn.nodemanager.log-dirs yarn-log-dir})
+
+(defn core-site-config
+ [test]
+ {:fs.defaultFS (str "hdfs://" (name-node (:nodes test)) ":9000")})
+
+(defn property-value
+ [property value]
+ (xml/element :property {}
+ [(xml/element :name {} (name property))
+ (xml/element :value {} value)]))
+
+(defn write-config!
+ [^String config-file config]
+ (info "Writing config" config-file)
+ (let [config-xml (xml/indent-str
+ (xml/element :configuration
+ {}
+ (map (fn [[k v]] (property-value k v)) (seq config))))]
+ (c/exec :echo config-xml :> config-file)
+ ))
+
+(defn start-name-node!
+ [test node]
+ (when (= node (name-node (:nodes test)))
+ (info "Start NameNode daemon.")
+ (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :start :namenode)))
+
+(defn start-name-node-formatted!
+ [test node]
+ (when (= node (name-node (:nodes test)))
+ (info "Format HDFS")
+ (c/exec (str install-dir "/bin/hdfs") :namenode :-format :-force :-clusterId "0000000")
+ (start-name-node! test node)))
+
+(defn stop-name-node!
+ []
+ (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :stop :namenode))
+
+(defn start-data-node!
+ [test node]
+ (when (some #{node} (data-nodes (:nodes test)))
+ (info "Start DataNode")
+ (c/exec (str install-dir "/sbin/hadoop-daemon.sh") :--config hadoop-conf-dir :--script :hdfs :start :datanode)))
+
+(defn start-resource-manager!
+ [test node]
+ (when (= node (resource-manager (:nodes test)))
+ (info "Start ResourceManager")
+ (c/exec (str install-dir "/sbin/yarn-daemon.sh") :--config hadoop-conf-dir :start :resourcemanager)))
+
+(defn start-node-manager!
+ [test node]
+ (when (some #{node} (data-nodes (:nodes test)))
+ (info "Start NodeManager")
+ (c/exec (str install-dir "/sbin/yarn-daemon.sh") :--config hadoop-conf-dir :start :nodemanager)))
+
+(defn find-files!
+ [dir]
+ (->>
+ (clojure.string/split (c/exec :find dir :-type :f) #"\n")
+ (remove clojure.string/blank?)))
+
+(defn db
+ [url]
+ (reify db/DB
+ (setup! [_ test node]
+ (info "Install Hadoop from" url)
+ (c/su
+ (cu/install-archive! url install-dir)
+ (write-config! (str install-dir "/etc/hadoop/yarn-site.xml") (yarn-site-config test))
+ (write-config! (str install-dir "/etc/hadoop/core-site.xml") (core-site-config test))
+ (c/exec :echo (c/lit "export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64") :>> (str install-dir "/etc/hadoop/hadoop-env.sh"))
+ (start-name-node-formatted! test node)
+ (start-data-node! test node)
+ (start-resource-manager! test node)
+ (start-node-manager! test node)))
+
+ (teardown! [_ test node]
+ (info "Teardown Hadoop")
+ (c/su
+ (cu/grepkill! "hadoop")
+ (c/exec (c/lit (str "rm -rf /tmp/hadoop-* ||:")))))
+
+ db/LogFiles
+ (log-files [_ _ _]
+ (c/su
+ (concat (find-files! (str install-dir "/logs"))
+ (if (cu/exists? yarn-log-dir)
+ (do
+ (c/exec :chmod :-R :777 yarn-log-dir)
+ (find-files! yarn-log-dir))
+ []))))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/mesos.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/mesos.clj b/flink-jepsen/src/jepsen/flink/mesos.clj
new file mode 100644
index 0000000..74b2c0d
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/mesos.clj
@@ -0,0 +1,165 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.mesos
+ (:require [clojure.tools.logging :refer :all]
+ [jepsen
+ [control :as c]
+ [db :as db]
+ [util :as util :refer [meh]]]
+ [jepsen.control.util :as cu]
+ [jepsen.os.debian :as debian]
+ [jepsen.flink.zookeeper :refer [zookeeper-uri]]))
+
+;;; Mesos
+
+(def master-count 1)
+(def master-pidfile "/var/run/mesos/master.pid")
+(def slave-pidfile "/var/run/mesos/slave.pid")
+(def master-dir "/var/lib/mesos/master")
+(def slave-dir "/var/lib/mesos/slave")
+(def log-dir "/var/log/mesos")
+(def master-bin "/usr/sbin/mesos-master")
+(def slave-bin "/usr/sbin/mesos-slave")
+(def zk-namespace :mesos)
+
+;;; Marathon
+
+(def marathon-bin "/usr/bin/marathon")
+(def zk-marathon-namespace "marathon")
+(def marathon-pidfile "/var/run/mesos/marathon.pid")
+(def marathon-rest-port 8080)
+
+(defn install!
+ [test node mesos-version marathon-version]
+ (c/su
+ (debian/add-repo! :mesosphere
+ "deb http://repos.mesosphere.com/debian jessie main"
+ "keyserver.ubuntu.com"
+ "E56151BF")
+ (debian/install {:mesos mesos-version
+ :marathon marathon-version})
+ (c/exec :mkdir :-p "/var/run/mesos")
+ (c/exec :mkdir :-p master-dir)
+ (c/exec :mkdir :-p slave-dir)))
+
+;;; Mesos functions
+
+(defn start-master!
+ [test node]
+ (when (some #{node} (take master-count (sort (:nodes test))))
+ (info node "Starting mesos master")
+ (c/su
+ (c/exec :start-stop-daemon
+ :--background
+ :--chdir master-dir
+ :--exec "/usr/bin/env"
+ :--make-pidfile
+ :--no-close
+ :--oknodo
+ :--pidfile master-pidfile
+ :--start
+ :--
+ "GLOG_v=1"
+ master-bin
+ (str "--hostname=" (name node))
+ (str "--log_dir=" log-dir)
+ (str "--offer_timeout=30secs")
+ (str "--quorum=" (util/majority master-count))
+ (str "--registry_fetch_timeout=120secs")
+ (str "--registry_store_timeout=5secs")
+ (str "--work_dir=" master-dir)
+ (str "--zk=" (zookeeper-uri test zk-namespace))
+ :>> (str log-dir "/master.stdout")
+ (c/lit "2>&1")))))
+
+(defn start-slave!
+ [test node]
+ (when-not (some #{node} (take master-count (sort (:nodes test))))
+ (info node "Starting mesos slave")
+ (c/su
+ (c/exec :start-stop-daemon :--start
+ :--background
+ :--chdir slave-dir
+ :--exec slave-bin
+ :--make-pidfile
+ :--no-close
+ :--pidfile slave-pidfile
+ :--oknodo
+ :--
+ (str "--hostname=" (name node))
+ (str "--log_dir=" log-dir)
+ (str "--master=" (zookeeper-uri test zk-namespace))
+ (str "--recovery_timeout=30secs")
+ (str "--work_dir=" slave-dir)
+ :>> (str log-dir "/slave.stdout")
+ (c/lit "2>&1")))))
+
+(defn stop-master!
+ [node]
+ (info node "Stopping mesos master")
+ (meh (c/exec :killall :-9 :mesos-master))
+ (meh (c/exec :rm :-rf master-pidfile)))
+
+(defn stop-slave!
+ [node]
+ (info node "Stopping mesos slave")
+ (meh (c/exec :killall :-9 :mesos-slave))
+ (meh (c/exec :rm :-rf slave-pidfile)))
+
+;;; Marathon functions
+
+(defn start-marathon!
+ [test node]
+ (when (= node (first (sort (:nodes test))))
+ (info "Start marathon")
+ (c/su
+ (c/exec :start-stop-daemon :--start
+ :--background
+ :--exec marathon-bin
+ :--make-pidfile
+ :--no-close
+ :--pidfile marathon-pidfile
+ :--
+ (c/lit (str "--hostname " node))
+ (c/lit (str "--master " (zookeeper-uri test zk-namespace)))
+ (c/lit (str "--zk " (zookeeper-uri test zk-marathon-namespace)))
+ :>> (str log-dir "/marathon.stdout")
+ (c/lit "2>&1")))))
+
+(defn stop-marathon!
+ []
+ (cu/grepkill! "marathon"))
+
+(defn marathon-base-url
+ [test]
+ (str "http://" (first (sort (:nodes test))) ":" marathon-rest-port))
+
+(defn db
+ [mesos-version marathon-version]
+ (reify db/DB
+ (setup! [this test node]
+ (install! test node mesos-version marathon-version)
+ (start-master! test node)
+ (start-slave! test node)
+ (start-marathon! test node))
+ (teardown! [this test node]
+ (stop-slave! node)
+ (stop-master! node)
+ (stop-marathon!))
+ db/LogFiles
+ (log-files [_ test node]
+ (if (cu/exists? log-dir) (cu/ls-full log-dir) []))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/nemesis.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj b/flink-jepsen/src/jepsen/flink/nemesis.clj
new file mode 100644
index 0000000..3047eeb
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -0,0 +1,163 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.nemesis
+ (:require [clojure.tools.logging :refer :all]
+ [jepsen
+ [control :as c]
+ [generator :as gen]
+ [nemesis :as nemesis]
+ [util :as ju]]
+ [jepsen.control.util :as cu]
+ [jepsen.flink.client :refer :all]
+ [jepsen.flink.checker :as flink-checker]
+ [jepsen.flink.generator :as fgen]
+ [jepsen.flink.hadoop :as fh]
+ [jepsen.flink.zookeeper :refer :all]))
+
+(def job-submit-grace-period
+ "Period after job submission in which job managers must not fail."
+ 60)
+
+(defn kill-processes
+ ([pattern] (kill-processes rand-nth pattern))
+ ([targeter pattern]
+ (reify nemesis/Nemesis
+ (setup! [this test] this)
+ (invoke! [this test op]
+ (let [nodes (-> test :nodes targeter ju/coll)]
+ (c/on-many nodes
+ (c/su (cu/grepkill! pattern)))
+ (assoc op :value nodes)))
+ (teardown! [this test]))))
+
+(defn- non-empty-random-sample
+ [coll]
+ (let [sample (random-sample 0.5 coll)]
+ (if (empty? sample)
+ (first (shuffle coll))
+ sample)))
+
+(defn kill-taskmanager
+ ([] (kill-taskmanager identity))
+ ([targeter]
+ (kill-processes targeter "TaskExecutorRunner")))
+
+(defn kill-jobmanager
+ []
+ (kill-processes identity "ClusterEntrypoint"))
+
+(defn start-stop-name-node
+ "Nemesis stopping and starting the HDFS NameNode."
+ []
+ (nemesis/node-start-stopper
+ fh/name-node
+ (fn [test node] (c/su (fh/stop-name-node!)))
+ (fn [test node] (c/su (fh/start-name-node! test node)))))
+
+;;; Generators
+
+(defn stoppable-generator
+ [stop source]
+ (reify gen/Generator
+ (op [gen test process]
+ (if @stop
+ nil
+ (gen/op source test process)))))
+
+(defn take-last-with-default
+ [n default coll]
+ (->>
+ (cycle [default])
+ (concat (reverse coll))
+ (take n)
+ (reverse)))
+
+(defn stop-generator
+ [stop source job-running-healthy-threshold job-recovery-grace-period]
+ (gen/concat source
+ (let [t (atom nil)]
+ (reify gen/Generator
+ (op [_ test process]
+ (when (nil? @t)
+ (compare-and-set! t nil (ju/relative-time-nanos)))
+ (let [history (->>
+ (:active-histories test)
+ deref
+ first
+ deref)
+ job-running-history (->>
+ history
+ (filter (fn [op] (>= (- (:time op) @t) 0)))
+ (flink-checker/get-job-running-history)
+ (take-last-with-default job-running-healthy-threshold false))]
+ (if (or
+ (and
+ (every? true? job-running-history))
+ (> (ju/relative-time-nanos) (+ @t (ju/secs->nanos job-recovery-grace-period))))
+ (do
+ (reset! stop true)
+ nil)
+ (do
+ (Thread/sleep 1000)
+ (recur test process)))))))))
+
+(defn kill-taskmanagers-gen
+ [time-limit dt op]
+ (fgen/time-limit time-limit (gen/stagger dt (gen/seq (cycle [{:type :info, :f op}])))))
+
+(defn kill-taskmanagers-bursts-gen
+ [time-limit]
+ (fgen/time-limit time-limit
+ (gen/seq (cycle (concat (repeat 20 {:type :info, :f :kill-task-managers})
+ [(gen/sleep 300)])))))
+
+(defn kill-jobmanagers-gen
+ [time-limit]
+ (fgen/time-limit (+ time-limit job-submit-grace-period)
+ (gen/seq (cons (gen/sleep job-submit-grace-period)
+ (cycle [{:type :info, :f :kill-job-manager}])))))
+
+(defn fail-name-node-during-recovery
+ []
+ (gen/seq [(gen/sleep job-submit-grace-period)
+ {:type :info, :f :partition-start}
+ {:type :info, :f :fail-name-node-start}
+ (gen/sleep 20)
+ {:type :info, :f :partition-stop}
+ (gen/sleep 60)
+ {:type :info, :f :fail-name-node-stop}]))
+
+(def nemesis-generator-factories
+ {:kill-task-managers (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-task-managers))
+ :kill-single-task-manager (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-single-task-manager))
+ :kill-random-task-managers (fn [opts] (kill-taskmanagers-gen (:time-limit opts) 3 :kill-random-task-managers))
+ :kill-task-managers-bursts (fn [opts] (kill-taskmanagers-bursts-gen (:time-limit opts)))
+ :kill-job-managers (fn [opts] (kill-jobmanagers-gen (:time-limit opts)))
+ :fail-name-node-during-recovery (fn [_] (fail-name-node-during-recovery))
+ :utopia (fn [_] (gen/sleep 60))})
+
+(defn nemesis
+ []
+ (nemesis/compose
+ {{:partition-start :start
+ :partition-stop :stop} (nemesis/partition-random-halves)
+ {:fail-name-node-start :start
+ :fail-name-node-stop :stop} (start-stop-name-node)
+ {:kill-task-managers :start} (kill-taskmanager)
+ {:kill-single-task-manager :start} (kill-taskmanager (fn [coll] (rand-nth coll)))
+ {:kill-random-task-managers :start} (kill-taskmanager non-empty-random-sample)
+ {:kill-job-manager :start} (kill-jobmanager)}))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/utils.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj b/flink-jepsen/src/jepsen/flink/utils.clj
new file mode 100644
index 0000000..3fd9f96
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -0,0 +1,48 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.utils
+ (:require [clojure.tools.logging :refer :all]))
+
+(defn retry
+ "Runs a function op and retries on exception.
+
+ The following options are supported:
+
+ :on-retry - A function called for every retry with an exception and the attempt number as arguments.
+ :success - A function called with the result of op.
+ :fallback – A function with an exception as the first argument that is called if all retries are exhausted.
+ :retries - Number of total retries.
+ :delay – The time between retries."
+ ([op & {:keys [on-retry success fallback retries delay]
+ :or {on-retry (fn [exception attempt] (warn "Retryable operation failed:"
+ (.getMessage exception)))
+ success identity
+ fallback :default
+ retries 10
+ delay 2000}
+ :as keys}]
+ (let [r (try
+ (op)
+ (catch Exception e (if (< 0 retries)
+ {:exception e}
+ (fallback e))))]
+ (if (:exception r)
+ (do
+ (on-retry (:exception r) retries)
+ (Thread/sleep delay)
+ (recur op (assoc keys :retries (dec retries))))
+ (success r)))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/src/jepsen/flink/zookeeper.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/src/jepsen/flink/zookeeper.clj b/flink-jepsen/src/jepsen/flink/zookeeper.clj
new file mode 100644
index 0000000..8b4c319
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/zookeeper.clj
@@ -0,0 +1,29 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.zookeeper)
+
+(defn zookeeper-quorum
+ "Returns the zk quorum string, e.g., host1:2181,host2:2181"
+ [test]
+ (->> test
+ :nodes
+ (map #(str % ":2181"))
+ (clojure.string/join ",")))
+
+(defn zookeeper-uri
+ ([test] (zookeeper-uri test ""))
+ ([test namespace] (str "zk://" (zookeeper-quorum test) "/" (name namespace))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/checker_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj b/flink-jepsen/test/jepsen/flink/checker_test.clj
new file mode 100644
index 0000000..7389bbc
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -0,0 +1,82 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.checker-test
+ (:require [clojure.test :refer :all]
+ [jepsen
+ [checker :as checker]]
+ [jepsen.flink.checker :refer :all]))
+
+(deftest get-job-running-history-test
+ (let [history [{:type :info, :f :kill-random-subset-task-managers, :process :nemesis, :time 121898381144, :value '("172.31.33.170")}
+ {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127443701575}
+ {:type :ok, :f :job-running?, :value false, :process 0, :time 127453553462}
+ {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127453553463}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 127453553464}]]
+ (is (= (get-job-running-history history) [false true]))))
+
+(deftest job-running-checker-test
+ (let [checker (job-running-checker)
+ test {}
+ model (job-running-within-grace-period 3 60)
+ opts {}
+ check (fn [history] (checker/check checker test model history opts))]
+ (testing "Job is not running after grace period."
+ (is (= (:valid? (check
+ [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+ {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}])) false)))
+ (testing "Job is running after grace period."
+ (is (= (:valid? (check
+ [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000001}])) true)))
+ (testing "Should tolerate non-running job during failures."
+ (is (= (:valid? (check
+ [{:type :info, :f :partition-start, :process :nemesis, :time -1}
+ {:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
+ {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
+ {:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
+ {:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true)))
+ (testing "Should respect healthy threshold."
+ (is (= (:valid? (check
+ [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
+ {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000003}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true))
+ (is (= (:valid? (check
+ [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+ {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000002}
+ {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) false)))
+ (testing "Job was not deployed successfully."
+ (is (= (:valid? (check [{:type :invoke, :f :job-running?, :value nil, :process 45, :time 239150413307}
+ {:type :info, :f :job-running?, :value nil, :process 45, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])) false)))))
+
+(deftest safe-inc-test
+ (is (= (safe-inc nil) 1))
+ (is (= (safe-inc 1) 2)))
+
+(deftest nemeses-active?-test
+ (is (= (nemeses-active? {:partition-start 2 :fail-name-node-start 0}) true))
+ (is (= (nemeses-active? {:partition-start 0}) false)))
+
+(deftest dissoc-if-test
+ (is (= (:a (dissoc-if #(-> (first %) (= :b)) {:a 1 :b 2})) 1)))
+
+(deftest zero-value?-test
+ (is (= (zero-value? [:test 0]) true))
+ (is (= (zero-value? [:test 1]) false)))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/client_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/client_test.clj b/flink-jepsen/test/jepsen/flink/client_test.clj
new file mode 100644
index 0000000..b4373bf
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/client_test.clj
@@ -0,0 +1,37 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.client-test
+ (:require [clojure.test :refer :all]
+ [clj-http.fake :as fake]
+ [jepsen.flink.client :refer :all]))
+
+(deftest read-url-test
+ (is (= "https://www.asdf.de" (read-url (byte-array [0xAC 0xED 0x00 0x05 0x77 0x15 0x00 0x13 0x68 0x74 0x74 0x70 0x73 0x3A 0x2F 0x2F 0x77 0x77 0x77 0x2E 0x61 0x73 0x64 0x66 0x2E 0x64 0x65])))))
+
+(deftest job-running?-test
+ (fake/with-fake-routes
+ {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c"
+ (fn [request] {:status 200
+ :headers {}
+ :body "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"w
rite-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd544
06c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -> Flat Map\",\"optimizer_properties\":{}}]}}"})
+ "http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196d"
+ (fn [request] {:status 200
+ :headers {}
+ :body "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"CREATED\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"w
rite-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd544
06c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -> Flat Map\",\"optimizer_properties\":{}}]}}"})}
+
+ (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196c") true))
+ (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196d") false))))
http://git-wip-us.apache.org/repos/asf/flink/blob/d58c8c05/flink-jepsen/test/jepsen/flink/utils_test.clj
----------------------------------------------------------------------
diff --git a/flink-jepsen/test/jepsen/flink/utils_test.clj b/flink-jepsen/test/jepsen/flink/utils_test.clj
new file mode 100644
index 0000000..607f90d
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/utils_test.clj
@@ -0,0 +1,39 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.utils-test
+ (:require [clojure.test :refer :all])
+ (:require [jepsen.flink.utils :refer [retry]]))
+
+(deftest retry-test
+ (testing "Single failure then result."
+ (let [counter (atom 0)
+ failing-once (fn [] (if (= @counter 0)
+ (do (swap! counter inc)
+ (throw (Exception. "Expected")))
+ "result"))]
+ (is (= "result" (retry failing-once :delay 0)))))
+
+ (testing "Exhaust all attempts."
+ (let [failing-always (fn [] (throw (Exception. "Expected")))]
+ (is (nil? (retry failing-always :retries 1 :delay 0)))))
+
+ (testing "Propagate exception."
+ (let [failing-always (fn [] (throw (Exception. "Expected")))]
+ (is (thrown-with-msg? Exception #"Expected" (retry failing-always
+ :retries 1
+ :delay 0
+ :fallback (fn [e] (throw e))))))))