You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/12/10 11:06:00 UTC

[jira] [Commented] (FLINK-10986) Jepsen: Deploy Kafka Broker on DB Nodes

    [ https://issues.apache.org/jira/browse/FLINK-10986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714577#comment-16714577 ] 

ASF GitHub Bot commented on FLINK-10986:
----------------------------------------

asfgit closed pull request #7173: [FLINK-10986][tests] Implement DB to setup Apache Kafka
URL: https://github.com/apache/flink/pull/7173
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
index a3e2668c26b..d329c8379ad 100644
--- a/flink-jepsen/README.md
+++ b/flink-jepsen/README.md
@@ -15,27 +15,37 @@ The faults that can be currently introduced to the Flink cluster include:
 * Network partitions
 
 There are many more properties other than job availability that could be
-verified but are not yet covered by this test suite, e.g., end-to-end exactly-once processing
+verified but are not yet fully covered by this project, e.g., end-to-end exactly-once processing
 semantics.
 
 ## Usage
+
+### Setting up the Environment
 See the [Jepsen documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment)
-for how to set up the environment to run tests. The script under `scripts/run-tests.sh` documents how to invoke
-tests. The Flink job used for testing is located under
-`flink-end-to-end-tests/flink-datastream-allround-test`. You have to build the job first and copy
-the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's
-root.
+for details on how to set up the environment required to run the tests.
+To simplify development, we have prepared Dockerfiles and a [Docker Compose](https://docs.docker.com/compose/) template
+so that you can run the tests locally in containers (see Section [Docker](#usage-docker)).
+
+### Running Tests
+This project does not comprise of only a single test that can be run but rather a parameterizable
+test template. This allows the user to specify the cluster manager that Flink should be on, the
+location of the high availability storage directory, the jobs to be submitted, etc.
+The script under `docker/run-tests.sh` shows examples on how to specify and run tests.
+By default, the example tests run the `DataStreamAllroundTestProgram`, which is located under
+`flink-end-to-end-tests/flink-datastream-allround-test` of the Flink project root.
+Before running the tests, you have to build the job first, and copy the resulting jar
+(`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's root.
+Also included in the examples is a more complicated scenario with two jobs that share a Kafka
+topic. See the `run-tests.sh` script for details on how to enable and run this test.
 
 ### Docker
-
-To simplify development, we have prepared Dockerfiles and a Docker Compose template
-so that you can run the tests locally in containers. To build the images
-and start the containers, simply run:
+To build the images and start the containers, simply run:
 
     $ cd docker
     $ ./up.sh
 
-After the containers started, open a new terminal window and run `docker exec -it jepsen-control bash`.
+This should start one control node container and three containers that will be used as DB nodes.
+After the containers have started, open a new terminal window and run `docker exec -it jepsen-control bash`.
 This will allow you to run arbitrary commands on the control node.
 To start the tests, you can use the `run-tests.sh` script in the `docker` directory,
 which expects the number of test iterations, and a URI to a Flink distribution, e.g.,
diff --git a/flink-jepsen/docker/run-tests.sh b/flink-jepsen/docker/run-tests.sh
index 8b2b1e6d18f..b2fd195b4da 100755
--- a/flink-jepsen/docker/run-tests.sh
+++ b/flink-jepsen/docker/run-tests.sh
@@ -17,6 +17,8 @@
 # limitations under the License.
 ################################################################################
 
+set -euo pipefail
+
 dockerdir=$(dirname $0)
 dockerdir=$(cd ${dockerdir}; pwd)
 
@@ -26,6 +28,47 @@ n2
 n3
 EOF
 
-common_jepsen_args+=(--nodes-file ${dockerdir}/nodes)
+common_jepsen_args+=(--ha-storage-dir hdfs:///flink
+--tarball ${2}
+--ssh-private-key ~/.ssh/id_rsa
+--nodes-file ${dockerdir}/nodes)
+
+for i in $(seq 1 ${1})
+do
+  echo "Executing run #${i} of ${1}"
+
+  # YARN session cluster
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --test-spec "${dockerdir}/test-specs/yarn-session.edn"
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --test-spec "${dockerdir}/test-specs/yarn-session.edn"
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --test-spec "${dockerdir}/test-specs/yarn-session.edn"
+
+  # YARN per-job cluster
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --test-spec "${dockerdir}/test-specs/yarn-job.edn"
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --test-spec "${dockerdir}/test-specs/yarn-job.edn"
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --test-spec "${dockerdir}/test-specs/yarn-job.edn"
+
+  # Mesos
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --test-spec "${dockerdir}/test-specs/mesos-session.edn"
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --test-spec "${dockerdir}/test-specs/mesos-session.edn"
+
+  # Standalone
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --test-spec "${dockerdir}/test-specs/standalone-session.edn"
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --client-gen cancel-jobs --test-spec "${dockerdir}/test-specs/standalone-session.edn"
+
+  # Below is a test that uses Flink's exactly-once Kafka producer/consumer.
+  # The test submits two jobs:
+  #
+  #   (1) DataGeneratorJob - Publishes data to a Kafka topic
+  #   (2) StateMachineJob  - Consumes data from the same Kafka topic, and validates exactly-once semantics
+  #
+  # To enable the test, you first need to build the flink-state-machine-kafka job jar,
+  # and copy the artifact to flink-jepsen/bin:
+  #
+  #   git clone https://github.com/igalshilman/flink-state-machine-example
+  #   cd flink-state-machine-example
+  #   mvn clean package -pl flink-state-machine-kafka/flink-state-machine-kafka -am
+  #   cp flink-state-machine-kafka/flink-state-machine-kafka/target/flink-state-machine-kafka-1.0-SNAPSHOT.jar /path/to/flink-jepsen/bin
+  #
+  # lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers-bursts --time-limit 60 --test-spec "${dockerdir}/test-specs/standalone-session-kafka.edn" --job-running-healthy-threshold 15
 
-. ${dockerdir}/../scripts/run-tests.sh ${1} ${2} 1
+done
diff --git a/flink-jepsen/docker/test-specs/mesos-session.edn b/flink-jepsen/docker/test-specs/mesos-session.edn
new file mode 100644
index 00000000000..e4c01bfd733
--- /dev/null
+++ b/flink-jepsen/docker/test-specs/mesos-session.edn
@@ -0,0 +1,19 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+{:dbs  [:hadoop :zookeeper :mesos :flink-mesos-session]
+ :jobs [{:job-jar  "/jepsen/bin/DataStreamAllroundTestProgram.jar"
+         :job-args "--environment.parallelism 1 --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"}]}
diff --git a/flink-jepsen/docker/test-specs/standalone-session-kafka.edn b/flink-jepsen/docker/test-specs/standalone-session-kafka.edn
new file mode 100644
index 00000000000..5f6704d53f4
--- /dev/null
+++ b/flink-jepsen/docker/test-specs/standalone-session-kafka.edn
@@ -0,0 +1,24 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+{:dbs  [:hadoop :zookeeper :kafka :flink-standalone-session]
+ :jobs [{:job-jar    "/jepsen/bin/flink-state-machine-kafka-1.0-SNAPSHOT.jar"
+         :job-args   "--parallelism 1 --checkpointInterval 5000 --numKeys 1000 --topic kafka-test-topic --sleep 200 --semantic exactly-once --bootstrap.servers localhost:9092 --transaction.timeout.ms 600000 --checkpointDir hdfs:///flink-checkpoints"
+         :main-class "com.dataartisans.flink.example.eventpattern.DataGeneratorJob"}
+
+        {:job-jar    "/jepsen/bin/flink-state-machine-kafka-1.0-SNAPSHOT.jar"
+         :job-args   "--parallelism 1 --checkpointInterval 5000 --input-topic kafka-test-topic --bootstrap.servers localhost:9092 --checkpointDir hdfs:///flink-checkpoints --auto.offset.reset earliest"
+         :main-class "com.dataartisans.flink.example.eventpattern.StateMachineJob"}]}
diff --git a/flink-jepsen/docker/test-specs/standalone-session.edn b/flink-jepsen/docker/test-specs/standalone-session.edn
new file mode 100644
index 00000000000..763d592bda2
--- /dev/null
+++ b/flink-jepsen/docker/test-specs/standalone-session.edn
@@ -0,0 +1,19 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+{:dbs  [:hadoop :zookeeper :flink-standalone-session]
+ :jobs [{:job-jar  "/jepsen/bin/DataStreamAllroundTestProgram.jar"
+         :job-args "--environment.parallelism 1 --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"}]}
diff --git a/flink-jepsen/docker/test-specs/yarn-job.edn b/flink-jepsen/docker/test-specs/yarn-job.edn
new file mode 100644
index 00000000000..390f21d31f7
--- /dev/null
+++ b/flink-jepsen/docker/test-specs/yarn-job.edn
@@ -0,0 +1,19 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+{:dbs  [:hadoop :zookeeper :flink-yarn-job]
+ :jobs [{:job-jar  "/jepsen/bin/DataStreamAllroundTestProgram.jar"
+         :job-args "--environment.parallelism 1 --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"}]}
diff --git a/flink-jepsen/docker/test-specs/yarn-session.edn b/flink-jepsen/docker/test-specs/yarn-session.edn
new file mode 100644
index 00000000000..aa95c159405
--- /dev/null
+++ b/flink-jepsen/docker/test-specs/yarn-session.edn
@@ -0,0 +1,19 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+{:dbs  [:hadoop :zookeeper :flink-yarn-session]
+ :jobs [{:job-jar  "/jepsen/bin/DataStreamAllroundTestProgram.jar"
+         :job-args "--environment.parallelism 1 --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"}]}
diff --git a/flink-jepsen/scripts/run-tests.sh b/flink-jepsen/scripts/run-tests.sh
deleted file mode 100755
index a2b256b6f6a..00000000000
--- a/flink-jepsen/scripts/run-tests.sh
+++ /dev/null
@@ -1,50 +0,0 @@
-#!/usr/bin/env bash
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-set -euo pipefail
-
-scripts=$(dirname $0)
-scripts=$(cd ${scripts}; pwd)
-
-parallelism=${3}
-
-common_jepsen_args+=(--ha-storage-dir hdfs:///flink
---job-jar ${scripts}/../bin/DataStreamAllroundTestProgram.jar
---tarball ${2}
---job-args "--environment.parallelism ${parallelism} --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"
---ssh-private-key ~/.ssh/id_rsa)
-
-for i in $(seq 1 ${1})
-do
-  echo "Executing run #${i} of ${1}"
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-session
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-session
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-session
-
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-job
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-job
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-job
-
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode mesos-session
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode mesos-session
-
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode standalone-session
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --client-gen cancel-job --deployment-mode standalone-session
-  echo
-done
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj b/flink-jepsen/src/jepsen/flink/checker.clj
index 7e437e9d628..d5ff6e0ebb0 100644
--- a/flink-jepsen/src/jepsen/flink/checker.clj
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -155,34 +155,80 @@
   ([job-running-healthy-threshold job-recovery-grace-period]
    (job-running-within-grace-period job-running-healthy-threshold job-recovery-grace-period 10)))
 
-(defn get-job-running-history
+(defn- history->jobs-running?-value
   [history]
   (->>
     history
-    (remove #(= (:process %) :nemesis))
+    (filter #(= (:f %) :jobs-running?))
     (remove #(= (:type %) :invoke))
-    (map :value)
-    (map boolean)
-    (remove nil?)))
+    (map :value)))
+
+(defn- history->job-ids
+  "Extracts all job ids from a history."
+  [history]
+  (set (->> history
+            (history->jobs-running?-value)
+            (map keys)
+            (flatten)
+            (remove nil?))))
+
+(defn all-jobs-running?-history
+  [history]
+  (->>
+    history
+    (history->jobs-running?-value)
+    (map vals)
+    (map #(and
+            (not (empty? %))
+            (every? true? %)))))
 
 (defn- healthy?
   [model]
-  (>= (:healthy-count model) (:healthy-threshold model)))
+  (or (>= (:healthy-count model) (:healthy-threshold model))
+      (:job-canceled? model)))
+
+(defn- jobs-running?->job-running?
+  "Rewrites history entries of the form {:f :jobs-running? :value {...}}
+
+  Example: {:type ok :f :jobs-running? :value {job-id-1 true}} -> {:type ok :f :job-running? :value true}"
+  [history-entry job-id]
+  (let [job-running?-entry (assoc history-entry :f :job-running?)
+        job-running?-entry-ok (update job-running?-entry :value #(get % job-id))]
+    (if (= (:type history-entry) :ok)
+      job-running?-entry-ok
+      job-running?-entry)))
+
+(defn- history->single-job-history
+  "Rewrites a history to one that appears to run a single Flink job."
+  [history job-id]
+  (let [transform-history-entry (fn [history-entry]
+                                  (case (:f history-entry)
+                                    :jobs-running? (jobs-running?->job-running? history-entry job-id)
+                                    :cancel-jobs (assoc history-entry :f :cancel-job)
+                                    history-entry))]
+    (map transform-history-entry history)))
+
+(defn- compute-final-model
+  [model history]
+  (let [start-time (-> history first :time)]
+    (reduce knossos.model/step
+            (assoc model :last-failure start-time)
+            history)))
 
 (defn job-running-checker
   []
   (reify
     checker/Checker
     (check [_ test model history _]
-      (let [final (reduce model/step (assoc model :last-failure (:time (first history))) history)
-            result-map (conj {}
-                             (find test :nemesis-gen)
-                             (find test :deployment-mode))]
-        (if (or (model/inconsistent? final)
-                (and
-                  (not (healthy? final))
-                  (not (:job-canceled? final))))
-          (into result-map {:valid?      false
-                            :final-model final})
-          (into result-map {:valid?      true
-                            :final-model final}))))))
+      (let [job-ids (history->job-ids history)
+            individual-job-histories (map (partial history->single-job-history history) job-ids)
+            final-models (map (partial compute-final-model model) individual-job-histories)
+            inconsistent-or-unhealthy (or (empty? job-ids)
+                                          (some model/inconsistent? final-models)
+                                          (some (complement healthy?) final-models))
+            result-map (select-keys test [:nemesis-gen :deployment-mode])]
+        (if inconsistent-or-unhealthy
+          (into result-map {:valid?       false
+                            :final-models final-models})
+          (into result-map {:valid?       true
+                            :final-models final-models}))))))
diff --git a/flink-jepsen/src/jepsen/flink/client.clj b/flink-jepsen/src/jepsen/flink/client.clj
index 1ab987bd704..17746ec5c6b 100644
--- a/flink-jepsen/src/jepsen/flink/client.clj
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -119,6 +119,14 @@
                 (map :status)
                 (every? #(= "RUNNING" %)))))))
 
+(defn jobs-running?
+  "Checks if multiple jobs are running. Returns a map where the keys are job ids and the values are
+  booleans indicating whether the job is running or not."
+  [base-url job-ids]
+  (let [job-running-on-current-master? (partial job-running? base-url)
+        make-job-id-running?-pair (juxt identity job-running-on-current-master?)]
+    (into {} (map make-job-id-running?-pair job-ids))))
+
 (defn- cancel-job!
   "Cancels the specified job. Returns true if the job could be canceled.
   Returns false if the job does not exist. Throws an exception if the HTTP status
@@ -131,6 +139,10 @@
       (not (http/success? response)) (throw (ex-info "Job cancellation unsuccessful" {:job-id job-id :error error}))
       :else true)))
 
+(defn- cancel-jobs!
+  [base-url job-ids]
+  (doseq [job-id job-ids] (cancel-job! base-url job-id)))
+
 (defmacro dispatch-operation
   [op & body]
   `(try
@@ -148,24 +160,23 @@
                                                                     (System/exit 1)))))
 
 (defn- dispatch-rest-operation!
-  [rest-url job-id op]
-  (assert job-id)
+  [rest-url job-ids op]
+  (assert job-ids)
   (if-not rest-url
     (assoc op :type :fail :error "Have not determined REST URL yet.")
     (case (:f op)
-      :job-running? (dispatch-operation op (fu/retry
-                                             (partial job-running? rest-url job-id)
-                                             :retries 3
-                                             :fallback #(throw %)))
-      :cancel-job (dispatch-operation-or-fatal op (cancel-job! rest-url job-id)))))
+      :jobs-running? (dispatch-operation op (fu/retry
+                                              (partial jobs-running? rest-url job-ids)
+                                              :retries 3
+                                              :fallback #(throw %)))
+      :cancel-jobs (dispatch-operation-or-fatal op (cancel-jobs! rest-url job-ids)))))
 
 (defrecord Client
-  [deploy-cluster!                                          ; function that starts a non-standalone cluster and submits the job
-   closer                                                   ; function that closes the ZK client
+  [closer                                                   ; function that closes the ZK client
    rest-url                                                 ; atom storing the current rest-url
    init-future                                              ; future that completes if rest-url is set to an initial value
-   job-id                                                   ; atom storing the job-id
-   job-submitted?]                                          ; Has the job already been submitted? Used to avoid re-submission if the client is re-opened.
+   job-ids                                                  ; atom storing the job-ids
+   client-setup?]                                           ; Has the client already been setup? Used to avoid running setup! again if the client is re-opened.
   client/Client
   (open! [this test _]
     (info "Open client.")
@@ -174,23 +185,21 @@
                   :rest-url rest-url-atom
                   :init-future init-future)))
 
-  (setup! [_ test]
+  (setup! [_ _]
     (info "Setup client.")
-    (when (compare-and-set! job-submitted? false true)
-      (deploy-cluster! test)
+    (when (compare-and-set! client-setup? false true)
       (deref init-future)
       (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
                            :fallback (fn [e]
                                        (fatal e "Could not get running jobs.")
                                        (System/exit 1)))
-            num-jobs (count jobs)
-            job (first jobs)]
-        (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
-        (info "Submitted job" job)
-        (reset! job-id job))))
+            num-jobs (count jobs)]
+        (assert (pos? num-jobs) (str "Expected at least 1 job, was " num-jobs))
+        (info "Submitted jobs" jobs)
+        (reset! job-ids jobs))))
 
   (invoke! [_ _ op]
-    (dispatch-rest-operation! @rest-url @job-id op))
+    (dispatch-rest-operation! @rest-url @job-ids op))
 
   (teardown! [_ _])
   (close! [_ _]
@@ -198,5 +207,5 @@
     (closer)))
 
 (defn create-client
-  [deploy-cluster!]
-  (Client. deploy-cluster! nil nil nil (atom nil) (atom false)))
+  []
+  (Client. nil nil nil (atom nil) (atom false)))
diff --git a/flink-jepsen/src/jepsen/flink/db.clj b/flink-jepsen/src/jepsen/flink/db.clj
index 971c69e5000..6bac982622a 100644
--- a/flink-jepsen/src/jepsen/flink/db.clj
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -17,13 +17,11 @@
 (ns jepsen.flink.db
   (:require [clj-http.client :as http]
             [clojure.java.io]
-            [clojure.string :as str]
             [clojure.tools.logging :refer :all]
             [jepsen
              [control :as c]
              [db :as db]
-             [util :refer [meh]]
-             [zookeeper :as zk]]
+             [util :refer [meh]]]
             [jepsen.control.util :as cu]
             [jepsen.flink.hadoop :as hadoop]
             [jepsen.flink.mesos :as mesos]
@@ -36,12 +34,6 @@
 (def conf-file (str install-dir "/conf/flink-conf.yaml"))
 (def masters-file (str install-dir "/conf/masters"))
 
-(def default-flink-dist-url "https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop28-scala_2.11.tgz")
-(def hadoop-dist-url "https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz")
-(def deb-zookeeper-package "3.4.9-3+deb8u1")
-(def deb-mesos-package "1.5.0-2.0.2")
-(def deb-marathon-package "1.6.322")
-
 (def taskmanager-slots 3)
 
 (defn flink-configuration
@@ -69,6 +61,20 @@
     ;; TODO: write log4j.properties properly
     (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, file/' " install-dir "/conf/log4j.properties")))))
 
+(defn- file-name
+  [path]
+  (.getName (clojure.java.io/file path)))
+
+(defn upload-job-jar!
+  [job-jar]
+  (c/upload job-jar upload-dir)
+  (c/exec :mv (str upload-dir "/" (file-name job-jar)) install-dir))
+
+(defn upload-job-jars!
+  [job-jars]
+  (doseq [job-jar job-jars]
+    (upload-job-jar! job-jar)))
+
 (defn install-flink!
   [test node]
   (let [url (:tarball test)]
@@ -76,8 +82,7 @@
     (cu/install-archive! url install-dir)
     (info "Enable S3 FS")
     (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/lib")))
-    (c/upload (:job-jar test) upload-dir)
-    (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar test)))) install-dir)
+    (upload-job-jars! (->> test :test-spec :jobs (map :job-jar)))
     (write-configuration! test node)))
 
 (defn teardown-flink!
@@ -87,36 +92,16 @@
   (meh (c/exec :rm :-rf install-dir))
   (meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
 
-(defn get-log-files!
-  []
-  (if (cu/exists? log-dir) (cu/ls-full log-dir) []))
-
-(defn flink-db
-  []
-  (reify db/DB
-    (setup! [_ test node]
-      (c/su
-        (install-flink! test node)))
-
-    (teardown! [_ test node]
-      (c/su
-        (teardown-flink!)))
-
-    db/LogFiles
-    (log-files [_ test node]
-      (concat
-        (get-log-files!)))))
-
 (defn combined-db
   [dbs]
   (reify db/DB
     (setup! [_ test node]
       (c/su
-        (doall (map #(db/setup! % test node) dbs))))
+        (doseq [db dbs] (db/setup! db test node))))
     (teardown! [_ test node]
       (c/su
         (try
-          (doall (map #(db/teardown! % test node) dbs))
+          (doseq [db dbs] (db/teardown! db test node))
           (finally (fu/stop-all-supervised-services!)))))
     db/LogFiles
     (log-files [_ test node]
@@ -125,6 +110,22 @@
         (map #(db/log-files % test node))
         (flatten)))))
 
+(defn flink-db
+  [db]
+  (let [flink-base-db (reify db/DB
+                        (setup! [_ test node]
+                          (c/su
+                            (install-flink! test node)))
+
+                        (teardown! [_ _ _]
+                          (c/su
+                            (teardown-flink!)))
+
+                        db/LogFiles
+                        (log-files [_ _ _]
+                          (fu/find-files! log-dir)))]
+    (combined-db [flink-base-db db])))
+
 (defn- sorted-nodes
   [test]
   (-> test :nodes sort))
@@ -145,7 +146,7 @@
   [m]
   (->>
     (map #(str (name (first %)) "=" (second %)) m)
-    (clojure.string/join " ")
+    (apply fu/join-space)
     (#(str % " "))))
 
 (defn- hadoop-env-vars
@@ -156,28 +157,37 @@
 (defn exec-flink!
   [cmd args]
   (c/su
-    (c/exec (c/lit (str
+    (c/exec (c/lit (fu/join-space
                      (hadoop-env-vars)
-                     install-dir "/bin/flink " cmd " " args)))))
+                     (str install-dir "/bin/flink")
+                     cmd
+                     (apply fu/join-space args))))))
 
 (defn flink-run-cli-args
   "Returns the CLI args that should be passed to 'flink run'"
-  [test]
+  [job-spec]
   (concat
     ["-d"]
-    (if (:main-class test)
-      [(str "-c " (:main-class test))]
+    (if (:main-class job-spec)
+      [(str "-c " (:main-class job-spec))]
       [])))
 
 (defn submit-job!
   ([test] (submit-job! test []))
   ([test cli-args]
-   (exec-flink! "run" (clojure.string/join
-                        " "
-                        (concat cli-args
-                                (flink-run-cli-args test)
-                                [(str install-dir "/" (last (str/split (:job-jar test) #"/")))
-                                 (:job-args test)])))))
+   (doseq [{:keys [job-jar job-args] :as job-spec} (-> test :test-spec :jobs)]
+     (exec-flink! "run" (concat cli-args
+                                (flink-run-cli-args job-spec)
+                                [(str install-dir "/" (file-name job-jar))
+                                 job-args])))))
+
+(defn- submit-job-with-retry!
+  ([test] (submit-job-with-retry! test []))
+  ([test cli-args] (fu/retry
+                     (partial submit-job! test cli-args)
+                     :fallback (fn [e] (do
+                                         (fatal e "Could not submit job.")
+                                         (System/exit 1))))))
 
 ;;; Standalone
 
@@ -192,122 +202,130 @@
   (select-nodes test (partial drop standalone-master-count)))
 
 (defn- start-standalone-masters!
-  [test node]
-  (when (some #{node} (standalone-master-nodes test))
+  []
+  (let [jobmanager-script (str install-dir "/bin/jobmanager.sh")
+        jobmanager-log (str log-dir "/jobmanager.log")]
     (fu/create-supervised-service!
       "flink-master"
-      (str "env " (hadoop-env-vars)
-           install-dir "/bin/jobmanager.sh start-foreground "
-           ">> " log-dir "/jobmanager.log"))))
+      (fu/join-space "env" (hadoop-env-vars) jobmanager-script "start-foreground" ">>" jobmanager-log))))
 
 (defn- start-standalone-taskmanagers!
-  [test node]
-  (when (some #{node} (standalone-taskmanager-nodes test))
+  []
+  (let [taskmanager-script (str install-dir "/bin/taskmanager.sh")
+        taskmanager-log (str log-dir "/taskmanager.log")]
     (fu/create-supervised-service!
       "flink-taskmanager"
-      (str "env " (hadoop-env-vars)
-           install-dir "/bin/taskmanager.sh start-foreground "
-           ">> " log-dir "/taskmanager.log"))))
-
-(defn- start-flink-db
-  []
-  (reify db/DB
-    (setup! [_ test node]
-      (c/su
-        (start-standalone-masters! test node)
-        (start-standalone-taskmanagers! test node)))
-
-    (teardown! [_ test node]
-      (c/su
-        (when (some #{node} (standalone-master-nodes test))
-          (fu/stop-supervised-service! "flink-master"))
-        (when (some #{node} (standalone-taskmanager-nodes test))
-          (fu/stop-supervised-service! "flink-taskmanager"))))))
+      (fu/join-space "env" (hadoop-env-vars) taskmanager-script "start-foreground" ">>" taskmanager-log))))
 
-(defn flink-standalone-db
+(defn start-flink-db
   []
-  (let [zk (zk/db deb-zookeeper-package)
-        hadoop (hadoop/db hadoop-dist-url)
-        flink (flink-db)
-        start-flink (start-flink-db)]
-    (combined-db [hadoop zk flink start-flink])))
-
-(defn submit-job-from-first-node!
-  [test]
-  (c/on (first-node test)
-        (submit-job! test)))
+  (flink-db
+    (reify db/DB
+      (setup! [_ test node]
+        (c/su
+          (when (some #{node} (standalone-master-nodes test))
+            (start-standalone-masters!))
+          (when (some #{node} (standalone-taskmanager-nodes test))
+            (start-standalone-taskmanagers!))
+          (when (= (first-node test) node)
+            (submit-job-with-retry! test))))
+
+      (teardown! [_ test node]
+        (c/su
+          (when (some #{node} (standalone-master-nodes test))
+            (fu/stop-supervised-service! "flink-master"))
+          (when (some #{node} (standalone-taskmanager-nodes test))
+            (fu/stop-supervised-service! "flink-taskmanager")))))))
 
 ;;; YARN
 
-(defn flink-yarn-db
+(defn- start-yarn-session-cmd
   []
-  (let [zk (zk/db deb-zookeeper-package)
-        hadoop (hadoop/db hadoop-dist-url)
-        flink (flink-db)]
-    (combined-db [hadoop zk flink])))
+  (fu/join-space (hadoop-env-vars)
+                 (str install-dir "/bin/yarn-session.sh")
+                 "-d"
+                 "-jm 2048m"
+                 "-tm 2048m"))
 
-(defn start-yarn-session!
-  [test]
-  (let [node (first-node test)]
-    (c/on node
-          (info "Starting YARN session from" node)
-          (c/su
-            (c/exec (c/lit (str (hadoop-env-vars)
-                                " " install-dir "/bin/yarn-session.sh -d -jm 2048m -tm 2048m")))
-            (submit-job! test)))))
-
-(defn start-yarn-job!
+(defn- start-yarn-session!
+  []
+  (info "Starting YARN session")
+  (let [exec-start-yarn-session! #(c/su (c/exec (c/lit (start-yarn-session-cmd))))
+        log-failure! (fn [exception _] (info "Starting YARN session failed due to"
+                                             (.getMessage exception)
+                                             "Retrying..."))]
+    (fu/retry exec-start-yarn-session!
+              :delay 4000
+              :on-error log-failure!)))
+
+(defn yarn-session-db
+  []
+  (flink-db (reify db/DB
+              (setup! [_ test node]
+                (when (= node (first-node test))
+                  (start-yarn-session!)
+                  (submit-job! test)))
+              (teardown! [_ _ _]))))
+
+(defn- start-yarn-job!
   [test]
-  (c/on (first-node test)
-        (c/su
-          (submit-job! test ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]))))
-
-;;; Mesos
+  (c/su
+    (submit-job-with-retry! test ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"])))
 
-(defn flink-mesos-db
+(defn yarn-job-db
   []
-  (let [zk (zk/db deb-zookeeper-package)
-        hadoop (hadoop/db hadoop-dist-url)
-        mesos (mesos/db deb-mesos-package deb-marathon-package)
-        flink (flink-db)]
-    (combined-db [hadoop zk mesos flink])))
+  (flink-db (reify db/DB
+              (setup! [_ test node]
+                (when (= node (first-node test))
+                  (start-yarn-job! test)))
+              (teardown! [_ _ _]))))
 
-(defn submit-job-with-retry!
-  [test]
-  (fu/retry
-    (partial submit-job! test)
-    :fallback (fn [e] (do
-                        (fatal e "Could not submit job.")
-                        (System/exit 1)))))
+;;; Mesos
 
-(defn mesos-appmaster-cmd
+(defn- mesos-appmaster-cmd
   "Returns the command used by Marathon to start Flink's Mesos application master."
   [test]
-  (str (hadoop-env-vars)
-       install-dir "/bin/mesos-appmaster.sh "
-       "-Dmesos.master=" (zookeeper-uri
-                           test
-                           mesos/zk-namespace) " "
-       "-Djobmanager.rpc.address=$(hostname -f) "
-       "-Djobmanager.heap.mb=2048 "
-       "-Djobmanager.rpc.port=6123 "
-       "-Dmesos.resourcemanager.tasks.mem=2048 "
-       "-Dtaskmanager.heap.mb=2048 "
-       "-Dmesos.resourcemanager.tasks.cpus=1 "
-       "-Drest.bind-address=$(hostname -f) "))
-
-(defn start-mesos-session!
+  (fu/join-space
+    (hadoop-env-vars)
+    (str install-dir "/bin/mesos-appmaster.sh")
+    (str "-Dmesos.master=" (zookeeper-uri test mesos/zk-namespace))
+    "-Djobmanager.rpc.address=$(hostname -f)"
+    "-Djobmanager.heap.mb=2048"
+    "-Djobmanager.rpc.port=6123"
+    "-Dmesos.resourcemanager.tasks.mem=2048"
+    "-Dtaskmanager.heap.mb=2048"
+    "-Dmesos.resourcemanager.tasks.cpus=1"
+    "-Drest.bind-address=$(hostname -f)"))
+
+(defn- start-mesos-session!
   [test]
   (c/su
-    (let [r (fu/retry (fn []
-                        (http/post
-                          (str (mesos/marathon-base-url test) "/v2/apps")
-                          {:form-params  {:id                    "flink"
-                                          :cmd                   (mesos-appmaster-cmd test)
-                                          :cpus                  1.0
-                                          :mem                   2048
-                                          :maxLaunchDelaySeconds 3}
-                           :content-type :json})))]
-      (info "Submitted Flink Application via Marathon" r)
-      (c/on (-> test :nodes sort first)
-            (submit-job-with-retry! test)))))
+    (let [log-submission-failure! (fn [exception _]
+                                    (info "Submitting Flink Application via Marathon failed due to"
+                                          (.getMessage exception)
+                                          "Retrying..."))
+          submit-flink! (fn []
+                          (http/post
+                            (str (mesos/marathon-base-url test) "/v2/apps")
+                            {:form-params  {:id                    "flink"
+                                            :cmd                   (mesos-appmaster-cmd test)
+                                            :cpus                  1.0
+                                            :mem                   2048
+                                            :maxLaunchDelaySeconds 3}
+                             :content-type :json}))
+          marathon-response (fu/retry submit-flink!
+                                      :on-retry log-submission-failure!
+                                      :delay 4000)]
+      (info "Submitted Flink Application via Marathon" marathon-response))))
+
+(defn flink-mesos-app-master
+  []
+  (flink-db
+    (reify
+      db/DB
+      (setup! [_ test node]
+        (when (= (first-node test) node)
+          (start-mesos-session! test)
+          (submit-job-with-retry! test)))
+
+      (teardown! [_ _ _]))))
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj b/flink-jepsen/src/jepsen/flink/flink.clj
index c5d0d225932..a3698abd3ff 100644
--- a/flink-jepsen/src/jepsen/flink/flink.clj
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -20,32 +20,45 @@
             [jepsen
              [cli :as cli]
              [generator :as gen]
-             [tests :as tests]]
+             [tests :as tests]
+             [zookeeper :as zk]]
             [jepsen.os.debian :as debian]
-            [jepsen.flink.client :refer :all]
-            [jepsen.flink.checker :as flink-checker]
-            [jepsen.flink.db :as fdb]
-            [jepsen.flink.nemesis :as fn]))
+            [jepsen.flink
+             [client :refer :all]
+             [checker :as flink-checker]
+             [db :as fdb]
+             [generator :as fg]
+             [hadoop :as hadoop]
+             [kafka :as kafka]
+             [mesos :as mesos]
+             [nemesis :as fn]]))
 
-(def flink-test-config
-  {:yarn-session       {:db                  (fdb/flink-yarn-db)
-                        :deployment-strategy fdb/start-yarn-session!}
-   :yarn-job           {:db                  (fdb/flink-yarn-db)
-                        :deployment-strategy fdb/start-yarn-job!}
-   :mesos-session      {:db                  (fdb/flink-mesos-db)
-                        :deployment-strategy fdb/start-mesos-session!}
-   :standalone-session {:db                  (fdb/flink-standalone-db)
-                        :deployment-strategy fdb/submit-job-from-first-node!}})
+(def default-flink-dist-url "https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop28-scala_2.11.tgz")
+(def hadoop-dist-url "https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz")
+(def kafka-dist-url "http://mirror.funkfreundelandshut.de/apache/kafka/2.0.1/kafka_2.11-2.0.1.tgz")
+(def deb-zookeeper-package "3.4.9-3+deb8u1")
+(def deb-mesos-package "1.5.0-2.0.2")
+(def deb-marathon-package "1.6.322")
 
-(def poll-job-running {:type :invoke, :f :job-running?, :value nil})
-(def cancel-job {:type :invoke, :f :cancel-job, :value nil})
-(def poll-job-running-loop (gen/seq (cycle [poll-job-running (gen/sleep 5)])))
+(def dbs
+  {:flink-yarn-job           (fdb/yarn-job-db)
+   :flink-yarn-session       (fdb/yarn-session-db)
+   :flink-standalone-session (fdb/start-flink-db)
+   :flink-mesos-session      (fdb/flink-mesos-app-master)
+   :hadoop                   (hadoop/db hadoop-dist-url)
+   :kafka                    (kafka/db kafka-dist-url)
+   :mesos                    (mesos/db deb-mesos-package deb-marathon-package)
+   :zookeeper                (zk/db deb-zookeeper-package)})
+
+(def poll-jobs-running {:type :invoke, :f :jobs-running?, :value nil})
+(def cancel-jobs {:type :invoke, :f :cancel-jobs, :value nil})
+(def poll-jobs-running-loop (gen/seq (cycle [poll-jobs-running (gen/sleep 5)])))
 
 (defn default-client-gen
   "Client generator that polls for the job running status."
   []
   (->
-    poll-job-running-loop
+    poll-jobs-running-loop
     (gen/singlethreaded)))
 
 (defn cancelling-client-gen
@@ -53,39 +66,39 @@
   []
   (->
     (gen/concat (gen/time-limit 15 (default-client-gen))
-                (gen/once cancel-job)
+                (gen/once cancel-jobs)
                 (default-client-gen))
     (gen/singlethreaded)))
 
 (def client-gens
   {:poll-job-running default-client-gen
-   :cancel-job       cancelling-client-gen})
+   :cancel-jobs      cancelling-client-gen})
 
 (defn flink-test
   [opts]
   (merge tests/noop-test
-         (let [{:keys [db deployment-strategy]} (-> opts :deployment-mode flink-test-config)
+         (let [dbs (->> opts :test-spec :dbs (map dbs))
                {:keys [job-running-healthy-threshold job-recovery-grace-period]} opts
                client-gen ((:client-gen opts) client-gens)]
            {:name      "Apache Flink"
             :os        debian/os
-            :db        db
+            :db        (fdb/combined-db dbs)
             :nemesis   (fn/nemesis)
             :model     (flink-checker/job-running-within-grace-period
                          job-running-healthy-threshold
                          job-recovery-grace-period)
             :generator (let [stop (atom nil)]
-                         (->> (fn/stoppable-generator stop (client-gen))
+                         (->> (fg/stoppable-generator stop (client-gen))
                               (gen/nemesis
-                                (fn/stop-generator stop
+                                (fg/stop-generator stop
                                                    ((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
                                                    job-running-healthy-threshold
                                                    job-recovery-grace-period))))
-            :client    (create-client deployment-strategy)
+            :client    (create-client)
             :checker   (flink-checker/job-running-checker)})
          (assoc opts :concurrency 1)))
 
-(defn keys-as-allowed-values-help-text
+(defn- keys->allowed-values-help-text
   "Takes a map and returns a string explaining which values are allowed.
   This is a CLI helper function."
   [m]
@@ -94,34 +107,34 @@
        (clojure.string/join ", ")
        (str "Must be one of: ")))
 
+(defn read-test-spec
+  [path]
+  (clojure.edn/read-string (slurp path)))
+
 (defn -main
   [& args]
   (cli/run!
     (merge
       (cli/single-test-cmd
         {:test-fn  flink-test
-         :tarball  fdb/default-flink-dist-url
-         :opt-spec [[nil "--ha-storage-dir DIR" "high-availability.storageDir"]
-                    [nil "--job-jar JAR" "Path to the job jar"]
-                    [nil "--job-args ARGS" "CLI arguments for the flink job"]
-                    [nil "--main-class CLASS" "Job main class"]
+         :tarball  default-flink-dist-url
+         :opt-spec [[nil "--test-spec FILE" "Path to a test specification (.edn)"
+                     :parse-fn read-test-spec
+                     :validate [#(->> % :dbs (map dbs) (every? (complement nil?)))
+                                (str "Invalid :dbs specification. " (keys->allowed-values-help-text dbs))]]
+                    [nil "--ha-storage-dir DIR" "high-availability.storageDir"]
                     [nil "--nemesis-gen GEN" (str "Which nemesis should be used?"
-                                                  (keys-as-allowed-values-help-text fn/nemesis-generator-factories))
+                                                  (keys->allowed-values-help-text fn/nemesis-generator-factories))
                      :parse-fn keyword
                      :default :kill-task-managers
-                     :validate [#(fn/nemesis-generator-factories (keyword %))
-                                (keys-as-allowed-values-help-text fn/nemesis-generator-factories)]]
+                     :validate [#(fn/nemesis-generator-factories %)
+                                (keys->allowed-values-help-text fn/nemesis-generator-factories)]]
                     [nil "--client-gen GEN" (str "Which client should be used?"
-                                                 (keys-as-allowed-values-help-text client-gens))
+                                                 (keys->allowed-values-help-text client-gens))
                      :parse-fn keyword
                      :default :poll-job-running
-                     :validate [#(client-gens (keyword %))
-                                (keys-as-allowed-values-help-text client-gens)]]
-                    [nil "--deployment-mode MODE" (keys-as-allowed-values-help-text flink-test-config)
-                     :parse-fn keyword
-                     :default :yarn-session
-                     :validate [#(flink-test-config (keyword %))
-                                (keys-as-allowed-values-help-text flink-test-config)]]
+                     :validate [#(client-gens %)
+                                (keys->allowed-values-help-text client-gens)]]
                     [nil "--job-running-healthy-threshold TIMES" "Number of consecutive times the job must be running to be considered healthy."
                      :default 5
                      :parse-fn #(Long/parseLong %)
diff --git a/flink-jepsen/src/jepsen/flink/generator.clj b/flink-jepsen/src/jepsen/flink/generator.clj
index af928c4d826..053f14db61d 100644
--- a/flink-jepsen/src/jepsen/flink/generator.clj
+++ b/flink-jepsen/src/jepsen/flink/generator.clj
@@ -16,7 +16,8 @@
 
 (ns jepsen.flink.generator
   (:require [jepsen.util :as util]
-            [jepsen.generator :as gen]))
+            [jepsen.generator :as gen]
+            [jepsen.flink.checker :as flink-checker]))
 
 (gen/defgenerator TimeLimitGen
                   [dt source deadline-atom]
@@ -37,3 +38,61 @@
 (defn time-limit
   [dt source]
   (TimeLimitGen. dt source (atom nil)))
+
+(defn stoppable-generator
+  "Given an atom and a source generator, returns a generator that stops emitting operations from
+  the source if the atom is set to true."
+  [stop source]
+  (reify gen/Generator
+    (op [_ test process]
+      (if @stop
+        nil
+        (gen/op source test process)))))
+
+(defn- take-last-with-default
+  [n default coll]
+  (->>
+    (cycle [default])
+    (concat (reverse coll))
+    (take n)
+    (reverse)))
+
+(defn- inc-by-factor
+  [n factor]
+  (assert (>= factor 1))
+  (int (* n factor)))
+
+(defn stop-generator
+  "Returns a generator that emits operations from a given source generator. If the source is
+  exhausted and either job-recovery-grace-period has passed or the job has been running
+  job-running-healthy-threshold times consecutively, the stop atom is set to true."
+  [stop source job-running-healthy-threshold job-recovery-grace-period]
+  (gen/concat source
+              (let [t (atom nil)]
+                (reify gen/Generator
+                  (op [_ test process]
+                    (when (nil? @t)
+                      (compare-and-set! t nil (util/relative-time-nanos)))
+                    (let [history (->>
+                                    (:active-histories test)
+                                    deref
+                                    first
+                                    deref)
+                          job-running-history (->>
+                                                history
+                                                (filter (fn [op] (>= (- (:time op) @t) 0)))
+                                                (flink-checker/all-jobs-running?-history)
+                                                (take-last-with-default job-running-healthy-threshold false))]
+                      (if (or
+                            (every? true? job-running-history)
+                            (> (util/relative-time-nanos) (+ @t
+                                                             (util/secs->nanos
+                                                               (inc-by-factor
+                                                                 job-recovery-grace-period
+                                                                 1.1)))))
+                        (do
+                          (reset! stop true)
+                          nil)
+                        (do
+                          (Thread/sleep 1000)
+                          (recur test process)))))))))
diff --git a/flink-jepsen/src/jepsen/flink/kafka.clj b/flink-jepsen/src/jepsen/flink/kafka.clj
new file mode 100644
index 00000000000..d32114b8fa5
--- /dev/null
+++ b/flink-jepsen/src/jepsen/flink/kafka.clj
@@ -0,0 +1,99 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.kafka
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [db :as db]
+             [control :as c]
+             [util :refer [meh]]]
+            [jepsen.control.util :as cu]
+            [jepsen.flink.zookeeper :as fzk]
+            [jepsen.flink.utils :as fu]))
+
+(def install-dir "/opt/kafka")
+(def application-log-dir "/opt/kafka/logs")
+(def log-dirs "/opt/kafka/kafka-logs")
+(def server-properties (str install-dir "/config/server.properties"))
+(def start-script (str install-dir "/bin/kafka-server-start.sh"))
+(def topic-script (str install-dir "/bin/kafka-topics.sh"))
+(def stop-script (str install-dir "/bin/kafka-server-stop.sh"))
+
+(defn- broker-id
+  [nodes node]
+  (.indexOf (sort nodes) node))
+
+(defn- override-property
+  [name value]
+  (str "--override " name "=" value))
+
+(defn- start-server-command
+  [{:keys [nodes] :as test} node]
+  (fu/join-space
+    start-script
+    "-daemon"
+    server-properties
+    (override-property "zookeeper.connect" (fzk/zookeeper-quorum test))
+    (override-property "broker.id" (broker-id nodes node))
+    (override-property "log.dirs" log-dirs)
+    (override-property "retention.ms" "1800000")))
+
+(defn- start-server!
+  [test node]
+  (c/exec (c/lit (start-server-command test node))))
+
+(defn- stop-server!
+  []
+  (info "Stopping Kafka")
+  (cu/grepkill! "kafka"))
+
+(defn- create-topic-command
+  [{:keys [nodes] :as test}]
+  (fu/join-space
+    topic-script
+    "--create"
+    "--topic kafka-test-topic"
+    (str "--partitions " (count nodes))
+    "--replication-factor 1"
+    "--zookeeper"
+    (fzk/zookeeper-quorum test)))
+
+(defn- create-topic!
+  [test]
+  (info "Attempting to create Kafka topic")
+  (fu/retry (fn [] (c/exec (c/lit (create-topic-command test))))))
+
+(defn- delete-kafka!
+  []
+  (info "Deleting Kafka distribution and logs")
+  (c/exec :rm :-rf install-dir))
+
+(defn db
+  [kafka-dist-url]
+  (reify db/DB
+    (setup! [_ test node]
+      (c/su
+        (cu/install-archive! kafka-dist-url install-dir)
+        (start-server! test node)
+        (when (zero? (broker-id (:nodes test) node))
+          (create-topic! test))))
+    (teardown! [_ _ _]
+      (c/su
+        (stop-server!)
+        (delete-kafka!)))
+    db/LogFiles
+    (log-files [_ _ _]
+      (fu/find-files! application-log-dir))))
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj b/flink-jepsen/src/jepsen/flink/nemesis.clj
index 5335bba874c..37e842189af 100644
--- a/flink-jepsen/src/jepsen/flink/nemesis.clj
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -23,7 +23,6 @@
              [util :as ju]]
             [jepsen.control.util :as cu]
             [jepsen.flink.client :refer :all]
-            [jepsen.flink.checker :as flink-checker]
             [jepsen.flink.generator :as fgen]
             [jepsen.flink.hadoop :as fh]
             [jepsen.flink.zookeeper :refer :all]))
@@ -70,59 +69,6 @@
 
 ;;; Generators
 
-(defn stoppable-generator
-  [stop source]
-  (reify gen/Generator
-    (op [gen test process]
-      (if @stop
-        nil
-        (gen/op source test process)))))
-
-(defn take-last-with-default
-  [n default coll]
-  (->>
-    (cycle [default])
-    (concat (reverse coll))
-    (take n)
-    (reverse)))
-
-(defn- inc-by-factor
-  [n factor]
-  (assert (>= factor 1))
-  (int (* n factor)))
-
-(defn stop-generator
-  [stop source job-running-healthy-threshold job-recovery-grace-period]
-  (gen/concat source
-              (let [t (atom nil)]
-                (reify gen/Generator
-                  (op [_ test process]
-                    (when (nil? @t)
-                      (compare-and-set! t nil (ju/relative-time-nanos)))
-                    (let [history (->>
-                                    (:active-histories test)
-                                    deref
-                                    first
-                                    deref)
-                          job-running-history (->>
-                                                history
-                                                (filter (fn [op] (>= (- (:time op) @t) 0)))
-                                                (flink-checker/get-job-running-history)
-                                                (take-last-with-default job-running-healthy-threshold false))]
-                      (if (or
-                            (every? true? job-running-history)
-                            (> (ju/relative-time-nanos) (+ @t
-                                                           (ju/secs->nanos
-                                                             (inc-by-factor
-                                                               job-recovery-grace-period
-                                                               1.1)))))
-                        (do
-                          (reset! stop true)
-                          nil)
-                        (do
-                          (Thread/sleep 1000)
-                          (recur test process)))))))))
-
 (defn kill-taskmanagers-gen
   [time-limit dt op]
   (fgen/time-limit time-limit (gen/stagger dt (gen/seq (cycle [{:type :info, :f op}])))))
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj b/flink-jepsen/src/jepsen/flink/utils.clj
index 1aa53efe7ae..7ccae12d661 100644
--- a/flink-jepsen/src/jepsen/flink/utils.clj
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -35,7 +35,7 @@
           :or   {on-retry (fn [exception attempt] (warn "Retryable operation failed:"
                                                         (.getMessage exception)))
                  success  identity
-                 fallback :default
+                 fallback #(throw %)
                  retries  10
                  delay    2000}
           :as   keys}]
@@ -51,6 +51,10 @@
          (recur op (assoc keys :retries (dec retries))))
        (success r)))))
 
+(defn join-space
+  [& tokens]
+  (clojure.string/join " " tokens))
+
 (defn find-files!
   "Lists files recursively given a directory. If the directory does not exist, an empty collection
   is returned."
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj b/flink-jepsen/test/jepsen/flink/checker_test.clj
index c27d751e69e..d1ade1645b0 100644
--- a/flink-jepsen/test/jepsen/flink/checker_test.clj
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -20,96 +20,99 @@
              [checker :as checker]]
             [jepsen.flink.checker :refer :all]))
 
-(deftest get-job-running-history-test
+(deftest all-jobs-running?-history-test
   (let [history [{:type :info, :f :kill-random-subset-task-managers, :process :nemesis, :time 121898381144, :value '("172.31.33.170")}
-                 {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127443701575}
-                 {:type :ok, :f :job-running?, :value false, :process 0, :time 127453553462}
-                 {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127453553463}
-                 {:type :ok, :f :job-running?, :value true, :process 0, :time 127453553464}
-                 {:type :info, :f :job-running?, :value nil, :process 0, :time 127453553465}]]
-    (is (= (get-job-running-history history) [false true false]))))
+                 {:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 127443701575}
+                 {:type :ok, :f :jobs-running?, :value {"3886d6b547969c3f15c53896bb496b8f" false}, :process 0, :time 127453553462}
+                 {:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 127453553463}
+                 {:type :ok, :f :jobs-running?, :value {"3886d6b547969c3f15c53896bb496b8f" true}, :process 0, :time 127453553464}
+                 {:type :info, :f :jobs-running?, :value nil, :process 0, :time 127453553465}]]
+    (is (= [false true false] (all-jobs-running?-history history)))))
 
 (deftest job-running-checker-test
   (let [checker (job-running-checker)
         test {}
         model (job-running-within-grace-period 3 60 10)
         opts {}
-        check (fn [history] (checker/check checker test model history opts))]
+        check (fn [history] (checker/check checker test model history opts))
+        job-running-value {"3886d6b547969c3f15c53896bb496b8f" true}
+        job-not-running-value {"3886d6b547969c3f15c53896bb496b8f" false}]
     (testing "Model should be inconsistent if job is not running after grace period."
       (let [result (check
                      [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
-                      {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}])]
+                      {:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 60000000001}])]
         (is (= false (:valid? result)))
-        (is (= "Job is not running." (-> result :final-model :msg)))))
+        (is (= "Job is not running." (-> result :final-models first :msg)))))
     (testing "Model should be consistent if job is running after grace period."
       (is (= true (:valid? (check
                              [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000001}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000002}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000003}])))))
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000001}
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000002}
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000003}])))))
     (testing "Should tolerate non-running job during failures."
       (is (= true (:valid? (check
                              [{:type :info, :f :partition-start, :process :nemesis, :time -1}
                               {:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
-                              {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
+                              {:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 60000000001}
                               {:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
                               {:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000005}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000006}])))))
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000004}
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000005}
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000006}])))))
     (testing "Should not tolerate non-running job without a cause."
       (let [result (check
-                     [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
-                      {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
-                      {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
-                      {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000002}])]
+                     [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
+                      {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 1}
+                      {:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 60000000001}
+                      {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000002}])]
         (is (= false (:valid? result)))
-        (is (= "Job is not running." (-> result :final-model :msg)))))
+        (is (= "Job is not running." (-> result :final-models first :msg)))))
     (testing "Model should be inconsistent if job submission was unsuccessful."
-      (let [result (check [{:type :invoke, :f :job-running?, :value nil, :process 0, :time 239150413307}
-                           {:type :info, :f :job-running?, :value nil, :process 0, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])]
+      (let [result (check [{:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 239150413307}
+                           {:type :info, :f :jobs-running?, :value nil, :process 0, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])]
         (is (= false (:valid? result)))))
     (testing "Model should be inconsistent if the job status cannot be polled, i.e., if the cluster is unavailable."
-      (let [result (check [{:type :fail, :f :job-running?, :value nil, :process 0, :time 0 :error "Error"}
-                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000001 :error "Error"}
-                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000002 :error "Error"}])]
+      (let [result (check [{:type :fail, :f :jobs-running?, :value job-running-value, :process 0, :time 0 :error "Error"}
+                           {:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000001 :error "Error"}
+                           {:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000002 :error "Error"}])]
         (is (= false (:valid? result)))
-        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+        (is (= "Cluster is not running." (-> result :final-models first :msg)))))
     (testing "Should tolerate non-running job after cancellation."
-      (is (= true (:valid? (check [{:type :invoke, :f :cancel-job, :value nil, :process 0, :time 0}
-                                   {:type :ok, :f :cancel-job, :value true, :process 0, :time 1}
-                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
-                                   {:type :ok, :f :job-running?, :value false, :process 0, :time 3}])))))
+      (is (= true (:valid? (check [{:type :invoke, :f :cancel-jobs, :value nil, :process 0, :time 0}
+                                   {:type :ok, :f :cancel-jobs, :value nil, :process 0, :time 1}
+                                   {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 2}
+                                   {:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 3}])))))
     (testing "Model should be inconsistent if job is running after cancellation."
-      (let [result (check [{:type :invoke, :f :cancel-job, :value nil, :process 0, :time 0}
-                           {:type :ok, :f :cancel-job, :value true, :process 0, :time 1}
-                           {:type :ok, :f :job-running?, :value true, :process 0, :time 10000000002}])]
+      (let [result (check [{:type :invoke, :f :cancel-jobs, :value nil, :process 0, :time 0}
+                           {:type :ok, :f :cancel-jobs, :value true, :process 0, :time 1}
+                           {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 10000000002}])]
         (is (= false (:valid? result)))
-        (is (= "Job is running after cancellation." (-> result :final-model :msg)))))
+        (is (= "Job is running after cancellation." (-> result :final-models first :msg)))))
     (testing "Model should be inconsistent if Flink cluster is not available at the end."
-      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
-                           {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
-                           {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
-                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000003, :error "Error"}])]
+      (let [result (check [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
+                           {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 1}
+                           {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 2}
+                           {:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000003, :error "Error"}])]
         (is (= false (:valid? result)))
-        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+        (is (= "Cluster is not running." (-> result :final-models first :msg)))))
     (testing "Model should be inconsistent if Flink cluster is not available after job cancellation."
-      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
-                           {:type :invoke, :f :cancel-job, :value nil, :process 0, :time 1}
-                           {:type :ok, :f :cancel-job, :value true, :process 0, :time 2}
-                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000001, :error "Error"}])]
+      (let [result (check [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
+                           {:type :invoke, :f :cancel-jobs, :value nil, :process 0, :time 1}
+                           {:type :ok, :f :cancel-jobs, :value job-running-value, :process 0, :time 2}
+                           {:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000001, :error "Error"}])]
         (is (= false (:valid? result)))
-        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+        (is (= "Cluster is not running." (-> result :final-models first :msg)))))
     (testing "Should throw AssertionError if job cancelling operation failed."
       (is (thrown-with-msg? AssertionError
                             #":cancel-job must not fail"
-                            (check [{:type :fail, :f :cancel-job, :value nil, :process 0, :time 0}]))))
+                            (check [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
+                                    {:type :fail, :f :cancel-jobs, :value nil, :process 0, :time 1}]))))
     (testing "Should tolerate non-running job if grace period has not passed."
-      (is (= true (:valid? (check [{:type :invoke, :f :job-running?, :value nil, :process 0, :time 0}
-                                   {:type :ok, :f :job-running?, :value false, :process 0, :time 1}
-                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
-                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 3}
-                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 4}])))))))
+      (is (= true (:valid? (check [{:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 0}
+                                   {:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 1}
+                                   {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 2}
+                                   {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 3}
+                                   {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 4}])))))))
 
 (deftest safe-inc-test
   (is (= (safe-inc nil) 1))
diff --git a/flink-jepsen/test/jepsen/flink/utils_test.clj b/flink-jepsen/test/jepsen/flink/utils_test.clj
index 607f90d1170..17d5e5f4371 100644
--- a/flink-jepsen/test/jepsen/flink/utils_test.clj
+++ b/flink-jepsen/test/jepsen/flink/utils_test.clj
@@ -29,7 +29,7 @@
 
   (testing "Exhaust all attempts."
     (let [failing-always (fn [] (throw (Exception. "Expected")))]
-      (is (nil? (retry failing-always :retries 1 :delay 0)))))
+      (is (nil? (retry failing-always :retries 1 :delay 0 :fallback :nil)))))
 
   (testing "Propagate exception."
     (let [failing-always (fn [] (throw (Exception. "Expected")))]


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Jepsen: Deploy Kafka Broker on DB Nodes
> ---------------------------------------
>
>                 Key: FLINK-10986
>                 URL: https://issues.apache.org/jira/browse/FLINK-10986
>             Project: Flink
>          Issue Type: New Feature
>          Components: Tests
>    Affects Versions: 1.8.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> *Description*
> Kafka broker should be deployed on DB nodes so that Flink's Kafka Producer and Consumer can be tested.
> *Acceptance Criteria*
> * Kafka 2.0 broker is set up automatically oall DB nodes before running test suite
> * one topic ("kafka-test-topic") is automatically created with as many partitions as there are DB nodes and replication of 1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)