You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2018/09/28 19:06:27 UTC

[flink] 01/10: [FLINK-10311][tests] Test job cancellation with standby masters.

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit abc6453ccde474b07c41fd405565db22ff1b00c0
Author: gyao <ga...@data-artisans.com>
AuthorDate: Sat Sep 15 12:14:53 2018 +0200

    [FLINK-10311][tests] Test job cancellation with standby masters.
    
    This adds tests to verify that jobs can be cancelled properly when there are
    standby masters. To enable this, we added tests to install Flink as a
    standalone cluster. The first two DB nodes will be running the master
    processes, while the others are running the TaskManagers. All Flink processes
    are supervised by runit – this allows killing for Flink processes by the
    nemesis.
    
    The client now implements a cancel operation. The model used by the checker
    had to be rewritten to address the fact that the job can be canceled. The
    cancel operation is "reliable", i.e., it either cancels the job successfully,
    or it fails the whole test fatally. This way we can be that the job should
    be eventually not running if the cancel operation completes successfully.
    
    This closes #6712.
---
 flink-jepsen/README.md                          |   2 +-
 flink-jepsen/scripts/run-tests.sh               |   2 +
 flink-jepsen/src/jepsen/flink/checker.clj       | 158 ++++++++++++++++--------
 flink-jepsen/src/jepsen/flink/client.clj        | 136 +++++++++++++-------
 flink-jepsen/src/jepsen/flink/db.clj            | 158 +++++++++++++++++-------
 flink-jepsen/src/jepsen/flink/flink.clj         |  53 +++++---
 flink-jepsen/src/jepsen/flink/mesos.clj         |  32 +----
 flink-jepsen/src/jepsen/flink/nemesis.clj       |  14 ++-
 flink-jepsen/src/jepsen/flink/utils.clj         |  36 +++++-
 flink-jepsen/test/jepsen/flink/checker_test.clj | 111 ++++++++++++-----
 flink-jepsen/test/jepsen/flink/client_test.clj  |  58 +++++++--
 flink-jepsen/test/jepsen/flink/nemesis_test.clj |  32 +++++
 12 files changed, 564 insertions(+), 228 deletions(-)

diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
index 9343246..dcad287 100644
--- a/flink-jepsen/README.md
+++ b/flink-jepsen/README.md
@@ -5,7 +5,7 @@ distributed coordination of Apache Flink®.
 
 ## Test Coverage
 Jepsen is a framework built to test the behavior of distributed systems
-under faults. The tests in this particular project deploy Flink on either YARN or Mesos, submit a
+under faults. The tests in this particular project deploy Flink on YARN, Mesos, or as a standalone cluster, submit a
 job, and examine the availability of the job after injecting faults.
 A job is said to be available if all the tasks of the job are running.
 The faults that can be currently introduced to the Flink cluster include:
diff --git a/flink-jepsen/scripts/run-tests.sh b/flink-jepsen/scripts/run-tests.sh
index e448124..67f171a 100755
--- a/flink-jepsen/scripts/run-tests.sh
+++ b/flink-jepsen/scripts/run-tests.sh
@@ -39,5 +39,7 @@ do
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-job
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-job
   lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-job
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode standalone-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --client-gen cancel-job --deployment-mode standalone-session
   echo
 done
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj b/flink-jepsen/src/jepsen/flink/checker.clj
index 02cc863..7e437e9 100644
--- a/flink-jepsen/src/jepsen/flink/checker.clj
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -48,59 +48,126 @@
   [[_ v]]
   (zero? v))
 
+(defn- set-job-not-running
+  [model] (assoc model :healthy-count 0))
+
+(defn- track-job-running
+  [model]
+  (update model :healthy-count inc))
+
+(defn- elapsed-seconds
+  [start end]
+  (ju/nanos->secs (- end start)))
+
+(defn- should-cluster-be-healthy?
+  [model op]
+  (let [{:keys [active-nemeses last-failure job-recovery-grace-period]} model]
+    (and
+      (not (nemeses-active? active-nemeses))
+      (> (elapsed-seconds last-failure (:time op)) job-recovery-grace-period))))
+
+(defn- start-fault
+  [model op]
+  (let [{:keys [active-nemeses]} model]
+    (assoc
+      model
+      :active-nemeses (update active-nemeses
+                              (strip-op-suffix op)
+                              safe-inc))))
+
+(defn- stop-fault
+  [model op]
+  (let [{:keys [active-nemeses]} model]
+    (assoc
+      model
+      :active-nemeses (dissoc-if zero-value?
+                                 (update active-nemeses (strip-op-suffix op) dec))
+      :last-failure (:time op))))
+
+(defn- job-allowed-to-be-running?
+  [model op]
+  (let [{:keys [job-canceled? job-canceled-time job-cancellation-grace-period]} model
+        now (:time op)]
+    (cond
+      (not job-canceled?) true
+      :else (> job-cancellation-grace-period (elapsed-seconds job-canceled-time now)))))
+
+(defn- handle-job-running?-op
+  "Returns the new model for an op {:f :job-running? ...}."
+  [model op]
+  (assert (#{:ok :fail :info} (:type op)) "Unexpected type")
+  (let [{:keys [job-canceled?]} model
+        job-running (:value op)
+        request-failed (#{:info :fail} (:type op))]
+    (if (and request-failed
+             (should-cluster-be-healthy? model op))
+      (model/inconsistent "Cluster is not running.")
+      (if job-running                                       ; cluster is running, check if job is running
+        (if (job-allowed-to-be-running? model op)           ; job is running but is it supposed to be running?
+          (track-job-running model)
+          (model/inconsistent
+            "Job is running after cancellation."))
+        (if (and                                            ; job is not running
+              (should-cluster-be-healthy? model op)
+              (not job-canceled?))
+          (model/inconsistent "Job is not running.")        ; job is not running but it should be running because grace period passed
+          (set-job-not-running model))))))
+
 (defrecord
   JobRunningWithinGracePeriod
-  ^{:doc "A Model which is consistent iff. the Flink job became available within
+  ^{:doc "A Model which is consistent if the Flink job and the Flink cluster became available within
   `job-recovery-grace-period` seconds after the last fault injected by the nemesis.
   Note that some faults happen at a single point in time (e.g., killing of processes). Other faults,
   such as network splits, happen during a period of time, and can thus be interleaving. As long as
-  there are active faults, the job is allowed not to be available."}
+  there are active faults, the job and the cluster are allowed to be unavailable.
+
+  Note that this model assumes that the client dispatches the operations reliably, i.e., in case of
+  exceptions, the operations are retried or failed fatally."}
   [active-nemeses                                           ; stores active failures
    healthy-count                                            ; how many consecutive times was the job running?
    last-failure                                             ; timestamp when the last failure was injected/ended
    healthy-threshold                                        ; after how many times is the job considered healthy
-   job-recovery-grace-period]                               ; after how many seconds should the job be recovered
+   job-recovery-grace-period                                ; after how many seconds should the job be recovered
+   job-cancellation-grace-period                            ; after how many seconds should the job be canceled?
+   job-canceled?                                            ; is the job canceled?
+   job-canceled-time]                                       ; timestamp of cancellation
   Model
   (step [this op]
     (case (:process op)
       :nemesis (cond
                  (nil? (:value op)) this
-                 (stoppable-op? op) (assoc
-                                      this
-                                      :active-nemeses (update active-nemeses
-                                                              (strip-op-suffix op)
-                                                              safe-inc))
-                 (stop-op? op) (assoc
-                                 this
-                                 :active-nemeses (dissoc-if zero-value?
-                                                            (update active-nemeses (strip-op-suffix op) dec))
-                                 :last-failure (:time op))
+                 (stoppable-op? op) (start-fault this op)
+                 (stop-op? op) (stop-fault this op)
                  :else (assoc this :last-failure (:time op)))
-      (case (:f op)
-        :job-running? (case (:type op)
-                        :info this                          ; ignore :info operations
-                        :fail this                          ; ignore :fail operations
-                        :invoke this                        ; ignore :invoke operations
-                        :ok (if (:value op)                 ; check if job is running
-                              (assoc                        ; job is running
-                                this
-                                :healthy-count
-                                (inc healthy-count))
-                              (if (and                      ; job is not running
-                                    (not (nemeses-active? active-nemeses))
-                                    (< healthy-count healthy-threshold)
-                                    (> (ju/nanos->secs (- (:time op) last-failure)) job-recovery-grace-period))
-                                ; job is not running but it should be running
-                                ; because grace period passed
-                                (model/inconsistent "Job is not running.")
-                                (conj this
-                                      [:healthy-count 0]))))
-        ; ignore other client operations
-        this))))
+      (if (= :invoke (:type op))
+        this                                                ; ignore :invoke operations
+        (case (:f op)
+          :job-running? (handle-job-running?-op this op)
+          :cancel-job (do
+                        (assert (= :ok (:type op)) ":cancel-job must not fail")
+                        (assoc this :job-canceled? true :job-canceled-time (:time op)))
+          ; ignore other client operations
+          this)))))
 
 (defn job-running-within-grace-period
-  [job-running-healthy-threshold job-recovery-grace-period]
-  (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold job-recovery-grace-period))
+  ([job-running-healthy-threshold job-recovery-grace-period job-cancellation-grace-period]
+   (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold job-recovery-grace-period job-cancellation-grace-period false nil))
+  ([job-running-healthy-threshold job-recovery-grace-period]
+   (job-running-within-grace-period job-running-healthy-threshold job-recovery-grace-period 10)))
+
+(defn get-job-running-history
+  [history]
+  (->>
+    history
+    (remove #(= (:process %) :nemesis))
+    (remove #(= (:type %) :invoke))
+    (map :value)
+    (map boolean)
+    (remove nil?)))
+
+(defn- healthy?
+  [model]
+  (>= (:healthy-count model) (:healthy-threshold model)))
 
 (defn job-running-checker
   []
@@ -111,18 +178,11 @@
             result-map (conj {}
                              (find test :nemesis-gen)
                              (find test :deployment-mode))]
-        (if (or (model/inconsistent? final) (zero? (:healthy-count final 0)))
-          (into result-map {:valid? false
-                            :error  (:msg final)})
+        (if (or (model/inconsistent? final)
+                (and
+                  (not (healthy? final))
+                  (not (:job-canceled? final))))
+          (into result-map {:valid?      false
+                            :final-model final})
           (into result-map {:valid?      true
                             :final-model final}))))))
-
-(defn get-job-running-history
-  [history]
-  (->>
-    history
-    (remove #(= (:process %) :nemesis))
-    (remove #(= (:type %) :invoke))
-    (map :value)
-    (map boolean)
-    (remove nil?)))
diff --git a/flink-jepsen/src/jepsen/flink/client.clj b/flink-jepsen/src/jepsen/flink/client.clj
index 905dc48..f513756 100644
--- a/flink-jepsen/src/jepsen/flink/client.clj
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -97,54 +97,100 @@
     :jobs
     (map :id)))
 
-(defn get-job-details!
-  [base-url job-id]
-  (assert base-url)
-  (assert job-id)
-  (let [job-details (->
-                      (http/get (str base-url "/jobs/" job-id) {:as :json})
-                      :body)]
-    (assert (:vertices job-details) "Job does not have vertices")
-    job-details))
-
 (defn job-running?
   [base-url job-id]
-  (->>
-    (get-job-details! base-url job-id)
-    :vertices
-    (map :status)
-    (every? #(= "RUNNING" %))))
+  (let [response (http/get (str base-url "/jobs/" job-id) {:as :json :throw-exceptions false})
+        body (:body response)
+        error (:errors body)]
+    (cond
+      (http/missing? response) false
+      (not (http/success? response)) (throw (ex-info "Could not determine if job is running" {:job-id job-id :error error}))
+      :else (do
+              (assert (:vertices body) "Job does not have vertices")
+              (->>
+                body
+                :vertices
+                (map :status)
+                (every? #(= "RUNNING" %)))))))
+
+(defn- cancel-job!
+  "Cancels the specified job. Returns true if the job could be canceled.
+  Returns false if the job does not exist. Throws an exception if the HTTP status
+  is not successful."
+  [base-url job-id]
+  (let [response (http/patch (str base-url "/jobs/" job-id) {:as :json :throw-exceptions false})
+        error (-> response :body :errors)]
+    (cond
+      (http/missing? response) false
+      (not (http/success? response)) (throw (ex-info "Job cancellation unsuccessful" {:job-id job-id :error error}))
+      :else true)))
+
+(defmacro dispatch-operation
+  [op & body]
+  `(try
+     (assoc ~op :type :ok :value ~@body)
+     (catch Exception e# (do
+                           (warn e# "An exception occurred while running" (quote ~@body))
+                           (assoc ~op :type :fail :error (.getMessage e#))))))
+
+(defmacro dispatch-operation-or-fatal
+  "Dispatches op by evaluating body, retrying a number of times if needed.
+  Fails fatally if all retries are exhausted."
+  [op & body]
+  `(assoc ~op :type :ok :value (fu/retry (fn [] ~@body) :fallback (fn [e#]
+                                                                    (fatal e# "Required operation did not succeed" (quote ~@body))
+                                                                    (System/exit 1)))))
+
+(defn- dispatch-rest-operation!
+  [rest-url job-id op]
+  (assert job-id)
+  (if-not rest-url
+    (assoc op :type :fail :error "Have not determined REST URL yet.")
+    (case (:f op)
+      :job-running? (dispatch-operation op (fu/retry
+                                             (partial job-running? rest-url job-id)
+                                             :retries 3
+                                             :fallback #(throw %)))
+      :cancel-job (dispatch-operation-or-fatal op (cancel-job! rest-url job-id)))))
 
 (defrecord Client
-  [deploy-cluster! closer rest-url init-future job-id]
+  [deploy-cluster!                                          ; function that starts a non-standalone cluster and submits the job
+   closer                                                   ; function that closes the ZK client
+   rest-url                                                 ; atom storing the current rest-url
+   init-future                                              ; future that completes if rest-url is set to an initial value
+   job-id                                                   ; atom storing the job-id
+   job-submitted?]                                          ; Has the job already been submitted? Used to avoid re-submission if the client is re-opened.
   client/Client
-  (open! [this test node]
+  (open! [this test _]
+    (info "Open client.")
     (let [{:keys [rest-url-atom closer init-future]} (make-job-manager-url test)]
-      (assoc this :closer closer :rest-url rest-url-atom :init-future init-future :job-id (atom nil))))
-
-  (setup! [this test] this)
-
-  (invoke! [this test op]
-    (case (:f op)
-      :submit (do
-                (deploy-cluster! test)
-                (deref init-future)
-                (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
-                                     :fallback (fn [e] (do
-                                                         (fatal e "Could not get running jobs.")
-                                                         (System/exit 1))))
-                      num-jobs (count jobs)]
-                  (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
-                  (reset! job-id (first jobs)))
-                (assoc op :type :ok))
-      :job-running? (let [base-url @rest-url]
-                      (if base-url
-                        (try
-                          (assoc op :type :ok :value (job-running? base-url @job-id))
-                          (catch Exception e (do
-                                               (warn e "Get job details from" base-url "failed.")
-                                               (assoc op :type :fail))))
-                        (assoc op :type :fail :value "Cluster not deployed yet.")))))
-
-  (teardown! [this test])
-  (close! [this test] (closer)))
+      (assoc this :closer closer
+                  :rest-url rest-url-atom
+                  :init-future init-future)))
+
+  (setup! [_ test]
+    (info "Setup client.")
+    (when (compare-and-set! job-submitted? false true)
+      (deploy-cluster! test)
+      (deref init-future)
+      (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
+                           :fallback (fn [e]
+                                       (fatal e "Could not get running jobs.")
+                                       (System/exit 1)))
+            num-jobs (count jobs)
+            job (first jobs)]
+        (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
+        (info "Submitted job" job)
+        (reset! job-id job))))
+
+  (invoke! [_ _ op]
+    (dispatch-rest-operation! @rest-url @job-id op))
+
+  (teardown! [_ _])
+  (close! [_ _]
+    (info "Closing client.")
+    (closer)))
+
+(defn create-client
+  [deploy-cluster!]
+  (Client. deploy-cluster! nil nil nil (atom nil) (atom false)))
diff --git a/flink-jepsen/src/jepsen/flink/db.clj b/flink-jepsen/src/jepsen/flink/db.clj
index 79ed8a4..5b7de63 100644
--- a/flink-jepsen/src/jepsen/flink/db.clj
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -43,14 +43,15 @@
 (def deb-marathon-package "1.6.322")
 
 (def taskmanager-slots 1)
-(def master-count 1)
 
 (defn flink-configuration
-  [test]
+  [test node]
   {:high-availability                  "zookeeper"
    :high-availability.zookeeper.quorum (zookeeper-quorum test)
    :high-availability.storageDir       (str (:ha-storage-dir test) "/ha")
+   :jobmanager.rpc.address             node
    :state.savepoints.dir               (str (:ha-storage-dir test) "/savepoints")
+   :rest.address                       node
    :rest.port                          8081
    :rest.bind-address                  "0.0.0.0"
    :taskmanager.numberOfTaskSlots      taskmanager-slots
@@ -59,23 +60,17 @@
    :state.backend.local-recovery       "false"
    :taskmanager.registration.timeout   "30 s"})
 
-(defn master-nodes
-  [test]
-  (take master-count (sort (:nodes test))))
-
 (defn write-configuration!
-  "Writes the flink-conf.yaml and masters file to the flink conf directory"
-  [test]
+  "Writes the flink-conf.yaml to the flink conf directory"
+  [test node]
   (let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
-                                         (seq (flink-configuration test))))
-        m (clojure.string/join "\n" (master-nodes test))]
+                                         (seq (flink-configuration test node))))]
     (c/exec :echo c :> conf-file)
-    (c/exec :echo m :> masters-file)
     ;; TODO: write log4j.properties properly
     (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, file/' " install-dir "/conf/log4j.properties")))))
 
 (defn install-flink!
-  [test]
+  [test node]
   (let [url (:tarball test)]
     (info "Installing Flink from" url)
     (cu/install-archive! url install-dir)
@@ -83,12 +78,12 @@
     (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/lib")))
     (c/upload (:job-jar test) upload-dir)
     (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar test)))) install-dir)
-    (write-configuration! test)))
+    (write-configuration! test node)))
 
 (defn teardown-flink!
   []
   (info "Tearing down Flink")
-  (cu/grepkill! "flink")
+  (meh (cu/grepkill! "flink"))
   (meh (c/exec :rm :-rf install-dir))
   (meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
 
@@ -101,7 +96,7 @@
   (reify db/DB
     (setup! [_ test node]
       (c/su
-        (install-flink! test)))
+        (install-flink! test node)))
 
     (teardown! [_ test node]
       (c/su
@@ -123,23 +118,44 @@
         (doall (map #(db/teardown! % test node) dbs))))
     db/LogFiles
     (log-files [_ test node]
-      (flatten (map #(db/log-files % test node) dbs)))))
+      (->>
+        (filter (partial satisfies? db/LogFiles) dbs)
+        (map #(db/log-files % test node))
+        (flatten)))))
 
-;;; YARN
+(defn- sorted-nodes
+  [test]
+  (-> test :nodes sort))
 
-(defn flink-yarn-db
+(defn- select-nodes
+  [test selector]
+  (-> (sorted-nodes test)
+      selector))
+
+(defn- first-node
+  [test]
+  (select-nodes test first))
+
+(defn- create-env-vars
+  "Expects a map containing environment variables, and returns a string that can be used to set
+  environment variables for a child process using Bash's quick assignment and inheritance trick.
+  For example, for a map {:FOO \"bar\"}, this function returns \"FOO=bar \"."
+  [m]
+  (->>
+    (map #(str (name (first %)) "=" (second %)) m)
+    (clojure.string/join " ")
+    (#(str % " "))))
+
+(defn- hadoop-env-vars
   []
-  (let [zk (zk/db deb-zookeeper-package)
-        hadoop (hadoop/db hadoop-dist-url)
-        flink (flink-db)]
-    (combined-db [hadoop zk flink])))
+  (create-env-vars {:HADOOP_CLASSPATH (str "`" hadoop/install-dir "/bin/hadoop classpath`")
+                    :HADOOP_CONF_DIR  hadoop/hadoop-conf-dir}))
 
 (defn exec-flink!
-  [test cmd args]
+  [cmd args]
   (c/su
     (c/exec (c/lit (str
-                     "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
-                     "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+                     (hadoop-env-vars)
                      install-dir "/bin/flink " cmd " " args)))))
 
 (defn flink-run-cli-args
@@ -149,24 +165,84 @@
     ["-d"]
     (if (:main-class test)
       [(str "-c " (:main-class test))]
-      [])
-    (if (= :yarn-job (:deployment-mode test))
-      ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]
       [])))
 
 (defn submit-job!
   ([test] (submit-job! test []))
   ([test cli-args]
-   (exec-flink! test "run" (clojure.string/join
-                             " "
-                             (concat cli-args
-                                     (flink-run-cli-args test)
-                                     [(str install-dir "/" (last (str/split (:job-jar test) #"/")))
-                                      (:job-args test)])))))
-
-(defn first-node
+   (exec-flink! "run" (clojure.string/join
+                        " "
+                        (concat cli-args
+                                (flink-run-cli-args test)
+                                [(str install-dir "/" (last (str/split (:job-jar test) #"/")))
+                                 (:job-args test)])))))
+
+;;; Standalone
+
+(def standalone-master-count 2)
+
+(defn- standalone-master-nodes
+  [test]
+  (select-nodes test (partial take standalone-master-count)))
+
+(defn- standalone-taskmanager-nodes
   [test]
-  (-> test :nodes sort first))
+  (select-nodes test (partial drop standalone-master-count)))
+
+(defn- start-standalone-masters!
+  [test node]
+  (when (some #{node} (standalone-master-nodes test))
+    (fu/create-supervised-service!
+      "flink-master"
+      (str "env " (hadoop-env-vars)
+           install-dir "/bin/jobmanager.sh start-foreground "
+           ">> " log-dir "/jobmanager.log"))))
+
+(defn- start-standalone-taskmanagers!
+  [test node]
+  (when (some #{node} (standalone-taskmanager-nodes test))
+    (fu/create-supervised-service!
+      "flink-taskmanager"
+      (str "env " (hadoop-env-vars)
+           install-dir "/bin/taskmanager.sh start-foreground "
+           ">> " log-dir "/taskmanager.log"))))
+
+(defn- start-flink-db
+  []
+  (reify db/DB
+    (setup! [_ test node]
+      (c/su
+        (start-standalone-masters! test node)
+        (start-standalone-taskmanagers! test node)))
+
+    (teardown! [_ test node]
+      (c/su
+        (when (some #{node} (standalone-master-nodes test))
+          (fu/stop-supervised-service! "flink-master"))
+        (when (some #{node} (standalone-taskmanager-nodes test))
+          (fu/stop-supervised-service! "flink-taskmanager"))))))
+
+(defn flink-standalone-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        flink (flink-db)
+        start-flink (start-flink-db)]
+    (combined-db [hadoop zk flink start-flink])))
+
+(defn submit-job-from-first-node!
+  [test]
+  (c/on (first-node test)
+        (submit-job! test)))
+
+;;; YARN
+
+(defn flink-yarn-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        flink (flink-db)]
+    (combined-db [hadoop zk flink])))
 
 (defn start-yarn-session!
   [test]
@@ -174,8 +250,7 @@
     (c/on node
           (info "Starting YARN session from" node)
           (c/su
-            (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
-                                "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
+            (c/exec (c/lit (str (hadoop-env-vars)
                                 " " install-dir "/bin/yarn-session.sh -d -jm 2048m -tm 2048m")))
             (submit-job! test)))))
 
@@ -183,7 +258,7 @@
   [test]
   (c/on (first-node test)
         (c/su
-          (submit-job! test))))
+          (submit-job! test ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]))))
 
 ;;; Mesos
 
@@ -210,8 +285,7 @@
                         (http/post
                           (str (mesos/marathon-base-url test) "/v2/apps")
                           {:form-params  {:id                    "flink"
-                                          :cmd                   (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
-                                                                      "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+                                          :cmd                   (str (hadoop-env-vars)
                                                                       install-dir "/bin/mesos-appmaster.sh "
                                                                       "-Dmesos.master=" (zookeeper-uri
                                                                                           test
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj b/flink-jepsen/src/jepsen/flink/flink.clj
index d5d4157..10ea48a 100644
--- a/flink-jepsen/src/jepsen/flink/flink.clj
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -24,31 +24,48 @@
             [jepsen.flink.client :refer :all]
             [jepsen.flink.checker :as flink-checker]
             [jepsen.flink.db :as fdb]
-            [jepsen.flink.nemesis :as fn])
-  (:import (jepsen.flink.client Client)))
+            [jepsen.flink.nemesis :as fn]))
 
 (def flink-test-config
-  {:yarn-session  {:db                  (fdb/flink-yarn-db)
-                   :deployment-strategy fdb/start-yarn-session!}
-   :yarn-job      {:db                  (fdb/flink-yarn-db)
-                   :deployment-strategy fdb/start-yarn-job!}
-   :mesos-session {:db                  (fdb/flink-mesos-db)
-                   :deployment-strategy fdb/start-mesos-session!}})
+  {:yarn-session       {:db                  (fdb/flink-yarn-db)
+                        :deployment-strategy fdb/start-yarn-session!}
+   :yarn-job           {:db                  (fdb/flink-yarn-db)
+                        :deployment-strategy fdb/start-yarn-job!}
+   :mesos-session      {:db                  (fdb/flink-mesos-db)
+                        :deployment-strategy fdb/start-mesos-session!}
+   :standalone-session {:db                  (fdb/flink-standalone-db)
+                        :deployment-strategy fdb/submit-job-from-first-node!}})
 
-(defn client-gen
+(def poll-job-running {:type :invoke, :f :job-running?, :value nil})
+(def cancel-job {:type :invoke, :f :cancel-job, :value nil})
+(def poll-job-running-loop (gen/seq (cycle [poll-job-running (gen/sleep 5)])))
+
+(defn default-client-gen
+  "Client generator that polls for the job running status."
   []
   (->
-    (cons {:type :invoke, :f :submit, :value nil}
-          (cycle [{:type :invoke, :f :job-running?, :value nil}
-                  (gen/sleep 5)]))
-    (gen/seq)
+    poll-job-running-loop
     (gen/singlethreaded)))
 
+(defn cancelling-client-gen
+  "Client generator that polls for the job running status, and cancels the job after 15 seconds."
+  []
+  (->
+    (gen/concat (gen/time-limit 15 (default-client-gen))
+                (gen/once cancel-job)
+                (default-client-gen))
+    (gen/singlethreaded)))
+
+(def client-gens
+  {:poll-job-running default-client-gen
+   :cancel-job       cancelling-client-gen})
+
 (defn flink-test
   [opts]
   (merge tests/noop-test
          (let [{:keys [db deployment-strategy]} (-> opts :deployment-mode flink-test-config)
-               {:keys [job-running-healthy-threshold job-recovery-grace-period]} opts]
+               {:keys [job-running-healthy-threshold job-recovery-grace-period]} opts
+               client-gen ((:client-gen opts) client-gens)]
            {:name      "Apache Flink"
             :os        debian/os
             :db        db
@@ -63,7 +80,7 @@
                                                    ((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
                                                    job-running-healthy-threshold
                                                    job-recovery-grace-period))))
-            :client    (Client. deployment-strategy nil nil nil nil)
+            :client    (create-client deployment-strategy)
             :checker   (flink-checker/job-running-checker)})
          (assoc opts :concurrency 1)))
 
@@ -93,6 +110,12 @@
                      :default :kill-task-managers
                      :validate [#(fn/nemesis-generator-factories (keyword %))
                                 (keys-as-allowed-values-help-text fn/nemesis-generator-factories)]]
+                    [nil "--client-gen GEN" (str "Which client should be used?"
+                                                 (keys-as-allowed-values-help-text client-gens))
+                     :parse-fn keyword
+                     :default :poll-job-running
+                     :validate [#(client-gens (keyword %))
+                                (keys-as-allowed-values-help-text client-gens)]]
                     [nil "--deployment-mode MODE" (keys-as-allowed-values-help-text flink-test-config)
                      :parse-fn keyword
                      :default :yarn-session
diff --git a/flink-jepsen/src/jepsen/flink/mesos.clj b/flink-jepsen/src/jepsen/flink/mesos.clj
index a73f25f..aef7359 100644
--- a/flink-jepsen/src/jepsen/flink/mesos.clj
+++ b/flink-jepsen/src/jepsen/flink/mesos.clj
@@ -22,36 +22,9 @@
              [util :as util :refer [meh]]]
             [jepsen.control.util :as cu]
             [jepsen.os.debian :as debian]
+            [jepsen.flink.utils :refer [create-supervised-service! stop-supervised-service!]]
             [jepsen.flink.zookeeper :refer [zookeeper-uri]]))
 
-;;; runit process supervisor (http://smarden.org/runit/)
-;;;
-;;; We use runit to supervise Mesos processes because Mesos uses a "fail-fast" approach to
-;;; error handling, e.g., the Mesos master will exit when it discovers it has been partitioned away
-;;; from the Zookeeper quorum.
-
-(def runit-version "2.1.2-3")
-
-(defn create-supervised-service!
-  "Registers a service with the process supervisor and starts it."
-  [service-name cmd]
-  (let [service-dir (str "/etc/sv/" service-name)
-        run-script (str service-dir "/run")]
-    (c/su
-      (c/exec :mkdir :-p service-dir)
-      (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
-                                               "exec 2>&1"
-                                               (str "exec " cmd)]) :> run-script)
-      (c/exec :chmod :+x run-script)
-      (c/exec :ln :-sf service-dir (str "/etc/service/" service-name)))))
-
-(defn stop-supervised-service!
-  "Stops a service and removes it from supervision."
-  [service-name]
-  (c/su
-    (c/exec :sv :down service-name)
-    (c/exec :rm :-f (str "/etc/service/" service-name))))
-
 ;;; Mesos
 
 (def master-count 1)
@@ -154,8 +127,7 @@
                       "keyserver.ubuntu.com"
                       "E56151BF")
     (debian/install {:mesos    mesos-version
-                     :marathon marathon-version
-                     :runit    runit-version})
+                     :marathon marathon-version})
     (c/exec :mkdir :-p "/var/run/mesos")
     (c/exec :mkdir :-p master-dir)
     (c/exec :mkdir :-p slave-dir)))
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj b/flink-jepsen/src/jepsen/flink/nemesis.clj
index 3047eeb..5335b24 100644
--- a/flink-jepsen/src/jepsen/flink/nemesis.clj
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -86,6 +86,11 @@
     (take n)
     (reverse)))
 
+(defn- inc-by-factor
+  [n factor]
+  (assert (>= factor 1))
+  (int (* n factor)))
+
 (defn stop-generator
   [stop source job-running-healthy-threshold job-recovery-grace-period]
   (gen/concat source
@@ -105,9 +110,12 @@
                                                 (flink-checker/get-job-running-history)
                                                 (take-last-with-default job-running-healthy-threshold false))]
                       (if (or
-                            (and
-                              (every? true? job-running-history))
-                            (> (ju/relative-time-nanos) (+ @t (ju/secs->nanos job-recovery-grace-period))))
+                            (every? true? job-running-history)
+                            (> (ju/relative-time-nanos) (+ @t
+                                                           (ju/secs->nanos
+                                                             (inc-by-factor
+                                                               job-recovery-grace-period
+                                                               1.1)))))
                         (do
                           (reset! stop true)
                           nil)
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj b/flink-jepsen/src/jepsen/flink/utils.clj
index 3fd9f96..5d73d29 100644
--- a/flink-jepsen/src/jepsen/flink/utils.clj
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -15,7 +15,10 @@
 ;; limitations under the License.
 
 (ns jepsen.flink.utils
-  (:require [clojure.tools.logging :refer :all]))
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]]
+            [jepsen.os.debian :as debian]))
 
 (defn retry
   "Runs a function op and retries on exception.
@@ -46,3 +49,34 @@
          (Thread/sleep delay)
          (recur op (assoc keys :retries (dec retries))))
        (success r)))))
+
+;;; runit process supervisor (http://smarden.org/runit/)
+
+(def runit-version "2.1.2-3")
+
+(defn- install-process-supervisor!
+  "Installs the process supervisor."
+  []
+  (debian/install {:runit runit-version}))
+
+(defn create-supervised-service!
+  "Registers a service with the process supervisor and starts it."
+  [service-name cmd]
+  (let [service-dir (str "/etc/sv/" service-name)
+        run-script (str service-dir "/run")]
+    (info "Create service" service-name)
+    (c/su
+      (install-process-supervisor!)
+      (c/exec :mkdir :-p service-dir)
+      (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
+                                               "exec 2>&1"
+                                               (str "exec " cmd)]) :> run-script)
+      (c/exec :chmod :+x run-script)
+      (c/exec :ln :-sfT service-dir (str "/etc/service/" service-name)))))
+
+(defn stop-supervised-service!
+  "Stops a service and removes it from supervision."
+  [service-name]
+  (info "Stop service" service-name)
+  (c/su
+    (c/exec :rm :-f (str "/etc/service/" service-name))))
\ No newline at end of file
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj b/flink-jepsen/test/jepsen/flink/checker_test.clj
index 7389bbc..c27d751 100644
--- a/flink-jepsen/test/jepsen/flink/checker_test.clj
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -25,46 +25,91 @@
                  {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127443701575}
                  {:type :ok, :f :job-running?, :value false, :process 0, :time 127453553462}
                  {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127453553463}
-                 {:type :ok, :f :job-running?, :value true, :process 0, :time 127453553464}]]
-    (is (= (get-job-running-history history) [false true]))))
+                 {:type :ok, :f :job-running?, :value true, :process 0, :time 127453553464}
+                 {:type :info, :f :job-running?, :value nil, :process 0, :time 127453553465}]]
+    (is (= (get-job-running-history history) [false true false]))))
 
 (deftest job-running-checker-test
   (let [checker (job-running-checker)
         test {}
-        model (job-running-within-grace-period 3 60)
+        model (job-running-within-grace-period 3 60 10)
         opts {}
         check (fn [history] (checker/check checker test model history opts))]
-    (testing "Job is not running after grace period."
-      (is (= (:valid? (check
-                        [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
-                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}])) false)))
-    (testing "Job is running after grace period."
-      (is (= (:valid? (check
-                        [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000001}])) true)))
+    (testing "Model should be inconsistent if job is not running after grace period."
+      (let [result (check
+                     [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+                      {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is not running." (-> result :final-model :msg)))))
+    (testing "Model should be consistent if job is running after grace period."
+      (is (= true (:valid? (check
+                             [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000001}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000002}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000003}])))))
     (testing "Should tolerate non-running job during failures."
-      (is (= (:valid? (check
-                        [{:type :info, :f :partition-start, :process :nemesis, :time -1}
-                         {:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
-                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
-                         {:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
-                         {:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true)))
-    (testing "Should respect healthy threshold."
-      (is (= (:valid? (check
-                        [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
-                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000003}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true))
-      (is (= (:valid? (check
-                        [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
-                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000002}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) false)))
-    (testing "Job was not deployed successfully."
-      (is (= (:valid? (check [{:type :invoke, :f :job-running?, :value nil, :process 45, :time 239150413307}
-                              {:type :info, :f :job-running?, :value nil, :process 45, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])) false)))))
+      (is (= true (:valid? (check
+                             [{:type :info, :f :partition-start, :process :nemesis, :time -1}
+                              {:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
+                              {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
+                              {:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
+                              {:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000005}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000006}])))))
+    (testing "Should not tolerate non-running job without a cause."
+      (let [result (check
+                     [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+                      {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+                      {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
+                      {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000002}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is not running." (-> result :final-model :msg)))))
+    (testing "Model should be inconsistent if job submission was unsuccessful."
+      (let [result (check [{:type :invoke, :f :job-running?, :value nil, :process 0, :time 239150413307}
+                           {:type :info, :f :job-running?, :value nil, :process 0, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])]
+        (is (= false (:valid? result)))))
+    (testing "Model should be inconsistent if the job status cannot be polled, i.e., if the cluster is unavailable."
+      (let [result (check [{:type :fail, :f :job-running?, :value nil, :process 0, :time 0 :error "Error"}
+                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000001 :error "Error"}
+                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000002 :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Should tolerate non-running job after cancellation."
+      (is (= true (:valid? (check [{:type :invoke, :f :cancel-job, :value nil, :process 0, :time 0}
+                                   {:type :ok, :f :cancel-job, :value true, :process 0, :time 1}
+                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
+                                   {:type :ok, :f :job-running?, :value false, :process 0, :time 3}])))))
+    (testing "Model should be inconsistent if job is running after cancellation."
+      (let [result (check [{:type :invoke, :f :cancel-job, :value nil, :process 0, :time 0}
+                           {:type :ok, :f :cancel-job, :value true, :process 0, :time 1}
+                           {:type :ok, :f :job-running?, :value true, :process 0, :time 10000000002}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is running after cancellation." (-> result :final-model :msg)))))
+    (testing "Model should be inconsistent if Flink cluster is not available at the end."
+      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+                           {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+                           {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
+                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000003, :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Model should be inconsistent if Flink cluster is not available after job cancellation."
+      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+                           {:type :invoke, :f :cancel-job, :value nil, :process 0, :time 1}
+                           {:type :ok, :f :cancel-job, :value true, :process 0, :time 2}
+                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000001, :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Should throw AssertionError if job cancelling operation failed."
+      (is (thrown-with-msg? AssertionError
+                            #":cancel-job must not fail"
+                            (check [{:type :fail, :f :cancel-job, :value nil, :process 0, :time 0}]))))
+    (testing "Should tolerate non-running job if grace period has not passed."
+      (is (= true (:valid? (check [{:type :invoke, :f :job-running?, :value nil, :process 0, :time 0}
+                                   {:type :ok, :f :job-running?, :value false, :process 0, :time 1}
+                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
+                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 3}
+                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 4}])))))))
 
 (deftest safe-inc-test
   (is (= (safe-inc nil) 1))
diff --git a/flink-jepsen/test/jepsen/flink/client_test.clj b/flink-jepsen/test/jepsen/flink/client_test.clj
index b4373bf..a73c936 100644
--- a/flink-jepsen/test/jepsen/flink/client_test.clj
+++ b/flink-jepsen/test/jepsen/flink/client_test.clj
@@ -17,7 +17,8 @@
 (ns jepsen.flink.client-test
   (:require [clojure.test :refer :all]
             [clj-http.fake :as fake]
-            [jepsen.flink.client :refer :all]))
+            [jepsen.flink.client :refer :all])
+  (:import (clojure.lang ExceptionInfo)))
 
 (deftest read-url-test
   (is (= "https://www.asdf.de" (read-url (byte-array [0xAC 0xED 0x00 0x05 0x77 0x15 0x00 0x13 0x68 0x74 0x74 0x70 0x73 0x3A 0x2F 0x2F 0x77 0x77 0x77 0x2E 0x61 0x73 0x64 0x66 0x2E 0x64 0x65])))))
@@ -25,13 +26,52 @@
 (deftest job-running?-test
   (fake/with-fake-routes
     {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c"
-     (fn [request] {:status  200
-                    :headers {}
-                    :body    "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df [...]
-     "http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196d"
-     (fn [request] {:status  200
-                    :headers {}
-                    :body    "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df [...]
+     (fn [_] {:status 200
+              :body   "{\"jid\":\"a718f168ec6be8eff1345a17bf64196c\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee [...]
+
+     "http://localhost:8081/jobs/54ae4d8ec01d85053d7eb5d139492df2"
+     (fn [_] {:status 200
+              :body   "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee [...]
+
+     "http://localhost:8081/jobs/ec3a61df646e665d31899bb26aba10b7"
+     (fn [_] {:status 404})}
 
     (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196c") true))
-    (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196d") false))))
+    (is (= (job-running? "http://localhost:8081" "54ae4d8ec01d85053d7eb5d139492df2") false))
+    (is (= (job-running? "http://localhost:8081" "ec3a61df646e665d31899bb26aba10b7") false))))
+
+(deftest cancel-job!-test
+  (fake/with-fake-routes
+    {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c"
+     {:patch (fn [_] {:status 202})}
+
+     "http://localhost:8081/jobs/54ae4d8ec01d85053d7eb5d139492df2"
+     {:patch (fn [_] {:status 404})}
+
+     "http://localhost:8081/jobs/ec3a61df646e665d31899bb26aba10b7"
+     {:patch (fn [_] {:status 503})}}
+
+    (testing "Successful job cancellation."
+      (is (= true (@#'jepsen.flink.client/cancel-job!
+                    "http://localhost:8081"
+                    "a718f168ec6be8eff1345a17bf64196c"))))
+
+    (testing "Job not found."
+      (is (= false (@#'jepsen.flink.client/cancel-job!
+                     "http://localhost:8081"
+                     "54ae4d8ec01d85053d7eb5d139492df2"))))
+
+    (testing "Throw if HTTP status code is 503."
+      (is (thrown-with-msg? ExceptionInfo
+                            #"Job cancellation unsuccessful"
+                            (@#'jepsen.flink.client/cancel-job!
+                              "http://localhost:8081"
+                              "ec3a61df646e665d31899bb26aba10b7"))))))
+
+(deftest dispatch-operation-test
+  (let [op {:type :invoke, :f :job-running?, :value nil}
+        test-fn (constantly 1)]
+    (testing "Dispatching operation completes normally."
+      (is (= {:type :ok :value 1} (select-keys (dispatch-operation op (test-fn)) [:type :value]))))
+    (testing "Returned operation should be of type :fail and have a nil value on exception."
+      (is (= {:type :fail :value nil :error "expected"} (select-keys (dispatch-operation op (throw (Exception. "expected"))) [:type :value :error]))))))
diff --git a/flink-jepsen/test/jepsen/flink/nemesis_test.clj b/flink-jepsen/test/jepsen/flink/nemesis_test.clj
new file mode 100644
index 0000000..488631f
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/nemesis_test.clj
@@ -0,0 +1,32 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.nemesis-test
+  (:require [clojure.test :refer :all])
+  (:require [jepsen.flink.nemesis :refer :all]))
+
+(deftest inc-by-factor-test
+  (testing "Should not increase if factor is 1."
+    (is (= 10 (@#'jepsen.flink.nemesis/inc-by-factor 10 1))))
+
+  (testing "Should increase by factor."
+    (is (= 15 (@#'jepsen.flink.nemesis/inc-by-factor 10 1.5))))
+
+  (testing "Should round down."
+    (is (= 15 (@#'jepsen.flink.nemesis/inc-by-factor 10 1.52))))
+
+  (testing "Should throw if factor < 1."
+    (is (thrown? AssertionError (@#'jepsen.flink.nemesis/inc-by-factor 1 0)))))