You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/28 19:08:22 UTC

[GitHub] asfgit closed pull request #6712: [FLINK-10311][tests] HA end-to-end/Jepsen tests for standby Dispatchers

asfgit closed pull request #6712: [FLINK-10311][tests] HA end-to-end/Jepsen tests for standby Dispatchers
URL: https://github.com/apache/flink/pull/6712
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
index 934324607f0..a3e2668c26b 100644
--- a/flink-jepsen/README.md
+++ b/flink-jepsen/README.md
@@ -5,10 +5,11 @@ distributed coordination of Apache FlinkĀ®.
 
 ## Test Coverage
 Jepsen is a framework built to test the behavior of distributed systems
-under faults. The tests in this particular project deploy Flink on either YARN or Mesos, submit a
+under faults. The tests in this particular project deploy Flink on YARN, Mesos, or as a standalone cluster, submit a
 job, and examine the availability of the job after injecting faults.
 A job is said to be available if all the tasks of the job are running.
 The faults that can be currently introduced to the Flink cluster include:
+
 * Killing of TaskManager/JobManager processes
 * Stopping HDFS NameNode
 * Network partitions
diff --git a/flink-jepsen/project.clj b/flink-jepsen/project.clj
index 78935d71187..8c3e8451536 100644
--- a/flink-jepsen/project.clj
+++ b/flink-jepsen/project.clj
@@ -18,6 +18,7 @@
   :license {:name "Apache License"
             :url  "http://www.apache.org/licenses/LICENSE-2.0"}
   :main jepsen.flink.flink
+  :aot [jepsen.flink.flink]
   :dependencies [[org.clojure/clojure "1.9.0"],
                  [cheshire "5.8.0"]
                  [clj-http "3.8.0"]
diff --git a/flink-jepsen/scripts/run-tests.sh b/flink-jepsen/scripts/run-tests.sh
index e44812402f6..a2b256b6f6a 100755
--- a/flink-jepsen/scripts/run-tests.sh
+++ b/flink-jepsen/scripts/run-tests.sh
@@ -36,8 +36,15 @@ do
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-session
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-session
   lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-session
+
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-job
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-job
   lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-job
+
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode mesos-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode mesos-session
+
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode standalone-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --client-gen cancel-job --deployment-mode standalone-session
   echo
 done
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj b/flink-jepsen/src/jepsen/flink/checker.clj
index 02cc863bef5..7e437e9d628 100644
--- a/flink-jepsen/src/jepsen/flink/checker.clj
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -48,59 +48,126 @@
   [[_ v]]
   (zero? v))
 
+(defn- set-job-not-running
+  [model] (assoc model :healthy-count 0))
+
+(defn- track-job-running
+  [model]
+  (update model :healthy-count inc))
+
+(defn- elapsed-seconds
+  [start end]
+  (ju/nanos->secs (- end start)))
+
+(defn- should-cluster-be-healthy?
+  [model op]
+  (let [{:keys [active-nemeses last-failure job-recovery-grace-period]} model]
+    (and
+      (not (nemeses-active? active-nemeses))
+      (> (elapsed-seconds last-failure (:time op)) job-recovery-grace-period))))
+
+(defn- start-fault
+  [model op]
+  (let [{:keys [active-nemeses]} model]
+    (assoc
+      model
+      :active-nemeses (update active-nemeses
+                              (strip-op-suffix op)
+                              safe-inc))))
+
+(defn- stop-fault
+  [model op]
+  (let [{:keys [active-nemeses]} model]
+    (assoc
+      model
+      :active-nemeses (dissoc-if zero-value?
+                                 (update active-nemeses (strip-op-suffix op) dec))
+      :last-failure (:time op))))
+
+(defn- job-allowed-to-be-running?
+  [model op]
+  (let [{:keys [job-canceled? job-canceled-time job-cancellation-grace-period]} model
+        now (:time op)]
+    (cond
+      (not job-canceled?) true
+      :else (> job-cancellation-grace-period (elapsed-seconds job-canceled-time now)))))
+
+(defn- handle-job-running?-op
+  "Returns the new model for an op {:f :job-running? ...}."
+  [model op]
+  (assert (#{:ok :fail :info} (:type op)) "Unexpected type")
+  (let [{:keys [job-canceled?]} model
+        job-running (:value op)
+        request-failed (#{:info :fail} (:type op))]
+    (if (and request-failed
+             (should-cluster-be-healthy? model op))
+      (model/inconsistent "Cluster is not running.")
+      (if job-running                                       ; cluster is running, check if job is running
+        (if (job-allowed-to-be-running? model op)           ; job is running but is it supposed to be running?
+          (track-job-running model)
+          (model/inconsistent
+            "Job is running after cancellation."))
+        (if (and                                            ; job is not running
+              (should-cluster-be-healthy? model op)
+              (not job-canceled?))
+          (model/inconsistent "Job is not running.")        ; job is not running but it should be running because grace period passed
+          (set-job-not-running model))))))
+
 (defrecord
   JobRunningWithinGracePeriod
-  ^{:doc "A Model which is consistent iff. the Flink job became available within
+  ^{:doc "A Model which is consistent if the Flink job and the Flink cluster became available within
   `job-recovery-grace-period` seconds after the last fault injected by the nemesis.
   Note that some faults happen at a single point in time (e.g., killing of processes). Other faults,
   such as network splits, happen during a period of time, and can thus be interleaving. As long as
-  there are active faults, the job is allowed not to be available."}
+  there are active faults, the job and the cluster are allowed to be unavailable.
+
+  Note that this model assumes that the client dispatches the operations reliably, i.e., in case of
+  exceptions, the operations are retried or failed fatally."}
   [active-nemeses                                           ; stores active failures
    healthy-count                                            ; how many consecutive times was the job running?
    last-failure                                             ; timestamp when the last failure was injected/ended
    healthy-threshold                                        ; after how many times is the job considered healthy
-   job-recovery-grace-period]                               ; after how many seconds should the job be recovered
+   job-recovery-grace-period                                ; after how many seconds should the job be recovered
+   job-cancellation-grace-period                            ; after how many seconds should the job be canceled?
+   job-canceled?                                            ; is the job canceled?
+   job-canceled-time]                                       ; timestamp of cancellation
   Model
   (step [this op]
     (case (:process op)
       :nemesis (cond
                  (nil? (:value op)) this
-                 (stoppable-op? op) (assoc
-                                      this
-                                      :active-nemeses (update active-nemeses
-                                                              (strip-op-suffix op)
-                                                              safe-inc))
-                 (stop-op? op) (assoc
-                                 this
-                                 :active-nemeses (dissoc-if zero-value?
-                                                            (update active-nemeses (strip-op-suffix op) dec))
-                                 :last-failure (:time op))
+                 (stoppable-op? op) (start-fault this op)
+                 (stop-op? op) (stop-fault this op)
                  :else (assoc this :last-failure (:time op)))
-      (case (:f op)
-        :job-running? (case (:type op)
-                        :info this                          ; ignore :info operations
-                        :fail this                          ; ignore :fail operations
-                        :invoke this                        ; ignore :invoke operations
-                        :ok (if (:value op)                 ; check if job is running
-                              (assoc                        ; job is running
-                                this
-                                :healthy-count
-                                (inc healthy-count))
-                              (if (and                      ; job is not running
-                                    (not (nemeses-active? active-nemeses))
-                                    (< healthy-count healthy-threshold)
-                                    (> (ju/nanos->secs (- (:time op) last-failure)) job-recovery-grace-period))
-                                ; job is not running but it should be running
-                                ; because grace period passed
-                                (model/inconsistent "Job is not running.")
-                                (conj this
-                                      [:healthy-count 0]))))
-        ; ignore other client operations
-        this))))
+      (if (= :invoke (:type op))
+        this                                                ; ignore :invoke operations
+        (case (:f op)
+          :job-running? (handle-job-running?-op this op)
+          :cancel-job (do
+                        (assert (= :ok (:type op)) ":cancel-job must not fail")
+                        (assoc this :job-canceled? true :job-canceled-time (:time op)))
+          ; ignore other client operations
+          this)))))
 
 (defn job-running-within-grace-period
-  [job-running-healthy-threshold job-recovery-grace-period]
-  (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold job-recovery-grace-period))
+  ([job-running-healthy-threshold job-recovery-grace-period job-cancellation-grace-period]
+   (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold job-recovery-grace-period job-cancellation-grace-period false nil))
+  ([job-running-healthy-threshold job-recovery-grace-period]
+   (job-running-within-grace-period job-running-healthy-threshold job-recovery-grace-period 10)))
+
+(defn get-job-running-history
+  [history]
+  (->>
+    history
+    (remove #(= (:process %) :nemesis))
+    (remove #(= (:type %) :invoke))
+    (map :value)
+    (map boolean)
+    (remove nil?)))
+
+(defn- healthy?
+  [model]
+  (>= (:healthy-count model) (:healthy-threshold model)))
 
 (defn job-running-checker
   []
@@ -111,18 +178,11 @@
             result-map (conj {}
                              (find test :nemesis-gen)
                              (find test :deployment-mode))]
-        (if (or (model/inconsistent? final) (zero? (:healthy-count final 0)))
-          (into result-map {:valid? false
-                            :error  (:msg final)})
+        (if (or (model/inconsistent? final)
+                (and
+                  (not (healthy? final))
+                  (not (:job-canceled? final))))
+          (into result-map {:valid?      false
+                            :final-model final})
           (into result-map {:valid?      true
                             :final-model final}))))))
-
-(defn get-job-running-history
-  [history]
-  (->>
-    history
-    (remove #(= (:process %) :nemesis))
-    (remove #(= (:type %) :invoke))
-    (map :value)
-    (map boolean)
-    (remove nil?)))
diff --git a/flink-jepsen/src/jepsen/flink/client.clj b/flink-jepsen/src/jepsen/flink/client.clj
index 905dc48911f..1ab987bd704 100644
--- a/flink-jepsen/src/jepsen/flink/client.clj
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -47,6 +47,12 @@
   (info "Waiting for path" path "in ZK.")
   (wait-for-zk-operation zk-client zk/exists path))
 
+(defn get-only-application-id
+  [coll]
+  (assert (= 1 (count coll)) (str "Expected 1 application id, got " coll ". "
+                                  "Failed to deploy the Flink cluster, or there are lingering Flink clusters."))
+  (first coll))
+
 (defn wait-for-children-to-exist
   [zk-client path]
   (wait-for-zk-operation zk-client zk/children path))
@@ -60,7 +66,7 @@
     (->
       (wait-for-children-to-exist zk-client "/flink")
       (deref)
-      (first))))
+      (get-only-application-id))))
 
 (defn watch-node-bytes
   [zk-client path callback]
@@ -97,54 +103,100 @@
     :jobs
     (map :id)))
 
-(defn get-job-details!
-  [base-url job-id]
-  (assert base-url)
-  (assert job-id)
-  (let [job-details (->
-                      (http/get (str base-url "/jobs/" job-id) {:as :json})
-                      :body)]
-    (assert (:vertices job-details) "Job does not have vertices")
-    job-details))
-
 (defn job-running?
   [base-url job-id]
-  (->>
-    (get-job-details! base-url job-id)
-    :vertices
-    (map :status)
-    (every? #(= "RUNNING" %))))
+  (let [response (http/get (str base-url "/jobs/" job-id) {:as :json :throw-exceptions false})
+        body (:body response)
+        error (:errors body)]
+    (cond
+      (http/missing? response) false
+      (not (http/success? response)) (throw (ex-info "Could not determine if job is running" {:job-id job-id :error error}))
+      :else (do
+              (assert (:vertices body) "Job does not have vertices")
+              (->>
+                body
+                :vertices
+                (map :status)
+                (every? #(= "RUNNING" %)))))))
+
+(defn- cancel-job!
+  "Cancels the specified job. Returns true if the job could be canceled.
+  Returns false if the job does not exist. Throws an exception if the HTTP status
+  is not successful."
+  [base-url job-id]
+  (let [response (http/patch (str base-url "/jobs/" job-id) {:as :json :throw-exceptions false})
+        error (-> response :body :errors)]
+    (cond
+      (http/missing? response) false
+      (not (http/success? response)) (throw (ex-info "Job cancellation unsuccessful" {:job-id job-id :error error}))
+      :else true)))
+
+(defmacro dispatch-operation
+  [op & body]
+  `(try
+     (assoc ~op :type :ok :value ~@body)
+     (catch Exception e# (do
+                           (warn e# "An exception occurred while running" (quote ~@body))
+                           (assoc ~op :type :fail :error (.getMessage e#))))))
+
+(defmacro dispatch-operation-or-fatal
+  "Dispatches op by evaluating body, retrying a number of times if needed.
+  Fails fatally if all retries are exhausted."
+  [op & body]
+  `(assoc ~op :type :ok :value (fu/retry (fn [] ~@body) :fallback (fn [e#]
+                                                                    (fatal e# "Required operation did not succeed" (quote ~@body))
+                                                                    (System/exit 1)))))
+
+(defn- dispatch-rest-operation!
+  [rest-url job-id op]
+  (assert job-id)
+  (if-not rest-url
+    (assoc op :type :fail :error "Have not determined REST URL yet.")
+    (case (:f op)
+      :job-running? (dispatch-operation op (fu/retry
+                                             (partial job-running? rest-url job-id)
+                                             :retries 3
+                                             :fallback #(throw %)))
+      :cancel-job (dispatch-operation-or-fatal op (cancel-job! rest-url job-id)))))
 
 (defrecord Client
-  [deploy-cluster! closer rest-url init-future job-id]
+  [deploy-cluster!                                          ; function that starts a non-standalone cluster and submits the job
+   closer                                                   ; function that closes the ZK client
+   rest-url                                                 ; atom storing the current rest-url
+   init-future                                              ; future that completes if rest-url is set to an initial value
+   job-id                                                   ; atom storing the job-id
+   job-submitted?]                                          ; Has the job already been submitted? Used to avoid re-submission if the client is re-opened.
   client/Client
-  (open! [this test node]
+  (open! [this test _]
+    (info "Open client.")
     (let [{:keys [rest-url-atom closer init-future]} (make-job-manager-url test)]
-      (assoc this :closer closer :rest-url rest-url-atom :init-future init-future :job-id (atom nil))))
-
-  (setup! [this test] this)
-
-  (invoke! [this test op]
-    (case (:f op)
-      :submit (do
-                (deploy-cluster! test)
-                (deref init-future)
-                (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
-                                     :fallback (fn [e] (do
-                                                         (fatal e "Could not get running jobs.")
-                                                         (System/exit 1))))
-                      num-jobs (count jobs)]
-                  (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
-                  (reset! job-id (first jobs)))
-                (assoc op :type :ok))
-      :job-running? (let [base-url @rest-url]
-                      (if base-url
-                        (try
-                          (assoc op :type :ok :value (job-running? base-url @job-id))
-                          (catch Exception e (do
-                                               (warn e "Get job details from" base-url "failed.")
-                                               (assoc op :type :fail))))
-                        (assoc op :type :fail :value "Cluster not deployed yet.")))))
-
-  (teardown! [this test])
-  (close! [this test] (closer)))
+      (assoc this :closer closer
+                  :rest-url rest-url-atom
+                  :init-future init-future)))
+
+  (setup! [_ test]
+    (info "Setup client.")
+    (when (compare-and-set! job-submitted? false true)
+      (deploy-cluster! test)
+      (deref init-future)
+      (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
+                           :fallback (fn [e]
+                                       (fatal e "Could not get running jobs.")
+                                       (System/exit 1)))
+            num-jobs (count jobs)
+            job (first jobs)]
+        (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
+        (info "Submitted job" job)
+        (reset! job-id job))))
+
+  (invoke! [_ _ op]
+    (dispatch-rest-operation! @rest-url @job-id op))
+
+  (teardown! [_ _])
+  (close! [_ _]
+    (info "Closing client.")
+    (closer)))
+
+(defn create-client
+  [deploy-cluster!]
+  (Client. deploy-cluster! nil nil nil (atom nil) (atom false)))
diff --git a/flink-jepsen/src/jepsen/flink/db.clj b/flink-jepsen/src/jepsen/flink/db.clj
index 79ed8a45b4d..e0f5ff856d4 100644
--- a/flink-jepsen/src/jepsen/flink/db.clj
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -36,21 +36,22 @@
 (def conf-file (str install-dir "/conf/flink-conf.yaml"))
 (def masters-file (str install-dir "/conf/masters"))
 
-(def default-flink-dist-url "https://archive.apache.org/dist/flink/flink-1.5.0/flink-1.5.0-bin-hadoop28-scala_2.11.tgz")
+(def default-flink-dist-url "https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop28-scala_2.11.tgz")
 (def hadoop-dist-url "https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz")
 (def deb-zookeeper-package "3.4.9-3+deb8u1")
 (def deb-mesos-package "1.5.0-2.0.2")
 (def deb-marathon-package "1.6.322")
 
 (def taskmanager-slots 1)
-(def master-count 1)
 
 (defn flink-configuration
-  [test]
+  [test node]
   {:high-availability                  "zookeeper"
    :high-availability.zookeeper.quorum (zookeeper-quorum test)
    :high-availability.storageDir       (str (:ha-storage-dir test) "/ha")
+   :jobmanager.rpc.address             node
    :state.savepoints.dir               (str (:ha-storage-dir test) "/savepoints")
+   :rest.address                       node
    :rest.port                          8081
    :rest.bind-address                  "0.0.0.0"
    :taskmanager.numberOfTaskSlots      taskmanager-slots
@@ -59,23 +60,17 @@
    :state.backend.local-recovery       "false"
    :taskmanager.registration.timeout   "30 s"})
 
-(defn master-nodes
-  [test]
-  (take master-count (sort (:nodes test))))
-
 (defn write-configuration!
-  "Writes the flink-conf.yaml and masters file to the flink conf directory"
-  [test]
+  "Writes the flink-conf.yaml to the flink conf directory"
+  [test node]
   (let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
-                                         (seq (flink-configuration test))))
-        m (clojure.string/join "\n" (master-nodes test))]
+                                         (seq (flink-configuration test node))))]
     (c/exec :echo c :> conf-file)
-    (c/exec :echo m :> masters-file)
     ;; TODO: write log4j.properties properly
     (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, file/' " install-dir "/conf/log4j.properties")))))
 
 (defn install-flink!
-  [test]
+  [test node]
   (let [url (:tarball test)]
     (info "Installing Flink from" url)
     (cu/install-archive! url install-dir)
@@ -83,12 +78,12 @@
     (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/lib")))
     (c/upload (:job-jar test) upload-dir)
     (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar test)))) install-dir)
-    (write-configuration! test)))
+    (write-configuration! test node)))
 
 (defn teardown-flink!
   []
   (info "Tearing down Flink")
-  (cu/grepkill! "flink")
+  (meh (cu/grepkill! "flink"))
   (meh (c/exec :rm :-rf install-dir))
   (meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
 
@@ -101,7 +96,7 @@
   (reify db/DB
     (setup! [_ test node]
       (c/su
-        (install-flink! test)))
+        (install-flink! test node)))
 
     (teardown! [_ test node]
       (c/su
@@ -120,26 +115,49 @@
         (doall (map #(db/setup! % test node) dbs))))
     (teardown! [_ test node]
       (c/su
-        (doall (map #(db/teardown! % test node) dbs))))
+        (try
+          (doall (map #(db/teardown! % test node) dbs))
+          (finally (fu/stop-all-supervised-services!)))))
     db/LogFiles
     (log-files [_ test node]
-      (flatten (map #(db/log-files % test node) dbs)))))
+      (->>
+        (filter (partial satisfies? db/LogFiles) dbs)
+        (map #(db/log-files % test node))
+        (flatten)))))
 
-;;; YARN
+(defn- sorted-nodes
+  [test]
+  (-> test :nodes sort))
 
-(defn flink-yarn-db
+(defn- select-nodes
+  [test selector]
+  (-> (sorted-nodes test)
+      selector))
+
+(defn- first-node
+  [test]
+  (select-nodes test first))
+
+(defn- create-env-vars
+  "Expects a map containing environment variables, and returns a string that can be used to set
+  environment variables for a child process using Bash's quick assignment and inheritance trick.
+  For example, for a map {:FOO \"bar\"}, this function returns \"FOO=bar \"."
+  [m]
+  (->>
+    (map #(str (name (first %)) "=" (second %)) m)
+    (clojure.string/join " ")
+    (#(str % " "))))
+
+(defn- hadoop-env-vars
   []
-  (let [zk (zk/db deb-zookeeper-package)
-        hadoop (hadoop/db hadoop-dist-url)
-        flink (flink-db)]
-    (combined-db [hadoop zk flink])))
+  (create-env-vars {:HADOOP_CLASSPATH (str "`" hadoop/install-dir "/bin/hadoop classpath`")
+                    :HADOOP_CONF_DIR  hadoop/hadoop-conf-dir}))
 
 (defn exec-flink!
-  [test cmd args]
+  [cmd args]
   (c/su
     (c/exec (c/lit (str
-                     "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
-                     "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+                     (hadoop-env-vars)
                      install-dir "/bin/flink " cmd " " args)))))
 
 (defn flink-run-cli-args
@@ -149,24 +167,84 @@
     ["-d"]
     (if (:main-class test)
       [(str "-c " (:main-class test))]
-      [])
-    (if (= :yarn-job (:deployment-mode test))
-      ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]
       [])))
 
 (defn submit-job!
   ([test] (submit-job! test []))
   ([test cli-args]
-   (exec-flink! test "run" (clojure.string/join
-                             " "
-                             (concat cli-args
-                                     (flink-run-cli-args test)
-                                     [(str install-dir "/" (last (str/split (:job-jar test) #"/")))
-                                      (:job-args test)])))))
-
-(defn first-node
+   (exec-flink! "run" (clojure.string/join
+                        " "
+                        (concat cli-args
+                                (flink-run-cli-args test)
+                                [(str install-dir "/" (last (str/split (:job-jar test) #"/")))
+                                 (:job-args test)])))))
+
+;;; Standalone
+
+(def standalone-master-count 2)
+
+(defn- standalone-master-nodes
   [test]
-  (-> test :nodes sort first))
+  (select-nodes test (partial take standalone-master-count)))
+
+(defn- standalone-taskmanager-nodes
+  [test]
+  (select-nodes test (partial drop standalone-master-count)))
+
+(defn- start-standalone-masters!
+  [test node]
+  (when (some #{node} (standalone-master-nodes test))
+    (fu/create-supervised-service!
+      "flink-master"
+      (str "env " (hadoop-env-vars)
+           install-dir "/bin/jobmanager.sh start-foreground "
+           ">> " log-dir "/jobmanager.log"))))
+
+(defn- start-standalone-taskmanagers!
+  [test node]
+  (when (some #{node} (standalone-taskmanager-nodes test))
+    (fu/create-supervised-service!
+      "flink-taskmanager"
+      (str "env " (hadoop-env-vars)
+           install-dir "/bin/taskmanager.sh start-foreground "
+           ">> " log-dir "/taskmanager.log"))))
+
+(defn- start-flink-db
+  []
+  (reify db/DB
+    (setup! [_ test node]
+      (c/su
+        (start-standalone-masters! test node)
+        (start-standalone-taskmanagers! test node)))
+
+    (teardown! [_ test node]
+      (c/su
+        (when (some #{node} (standalone-master-nodes test))
+          (fu/stop-supervised-service! "flink-master"))
+        (when (some #{node} (standalone-taskmanager-nodes test))
+          (fu/stop-supervised-service! "flink-taskmanager"))))))
+
+(defn flink-standalone-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        flink (flink-db)
+        start-flink (start-flink-db)]
+    (combined-db [hadoop zk flink start-flink])))
+
+(defn submit-job-from-first-node!
+  [test]
+  (c/on (first-node test)
+        (submit-job! test)))
+
+;;; YARN
+
+(defn flink-yarn-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        flink (flink-db)]
+    (combined-db [hadoop zk flink])))
 
 (defn start-yarn-session!
   [test]
@@ -174,8 +252,7 @@
     (c/on node
           (info "Starting YARN session from" node)
           (c/su
-            (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
-                                "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
+            (c/exec (c/lit (str (hadoop-env-vars)
                                 " " install-dir "/bin/yarn-session.sh -d -jm 2048m -tm 2048m")))
             (submit-job! test)))))
 
@@ -183,7 +260,7 @@
   [test]
   (c/on (first-node test)
         (c/su
-          (submit-job! test))))
+          (submit-job! test ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]))))
 
 ;;; Mesos
 
@@ -203,6 +280,23 @@
                         (fatal e "Could not submit job.")
                         (System/exit 1)))))
 
+(defn mesos-appmaster-cmd
+  "Returns the command used by Marathon to start Flink's Mesos application master."
+  [test]
+  (str (hadoop-env-vars)
+       install-dir "/bin/mesos-appmaster.sh "
+       "-Dmesos.master=" (zookeeper-uri
+                           test
+                           mesos/zk-namespace) " "
+       "-Djobmanager.rpc.address=$(hostname -f) "
+       "-Djobmanager.heap.mb=2048 "
+       "-Djobmanager.rpc.port=6123 "
+       "-Dmesos.resourcemanager.tasks.mem=2048 "
+       "-Dtaskmanager.heap.mb=2048 "
+       "-Dtaskmanager.numberOfTaskSlots=2 "
+       "-Dmesos.resourcemanager.tasks.cpus=1 "
+       "-Drest.bind-address=$(hostname -f) "))
+
 (defn start-mesos-session!
   [test]
   (c/su
@@ -210,21 +304,7 @@
                         (http/post
                           (str (mesos/marathon-base-url test) "/v2/apps")
                           {:form-params  {:id                    "flink"
-                                          :cmd                   (str "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
-                                                                      "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
-                                                                      install-dir "/bin/mesos-appmaster.sh "
-                                                                      "-Dmesos.master=" (zookeeper-uri
-                                                                                          test
-                                                                                          mesos/zk-namespace) " "
-                                                                      "-Djobmanager.rpc.address=$(hostname -f) "
-                                                                      "-Djobmanager.heap.mb=2048 "
-                                                                      "-Djobmanager.rpc.port=6123 "
-                                                                      "-rest.port=8081 "
-                                                                      "-Dmesos.resourcemanager.tasks.mem=2048 "
-                                                                      "-Dtaskmanager.heap.mb=2048 "
-                                                                      "-Dtaskmanager.numberOfTaskSlots=2 "
-                                                                      "-Dmesos.resourcemanager.tasks.cpus=1 "
-                                                                      "-Drest.bind-address=$(hostname -f) ")
+                                          :cmd                   (mesos-appmaster-cmd test)
                                           :cpus                  1.0
                                           :mem                   2048
                                           :maxLaunchDelaySeconds 3}
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj b/flink-jepsen/src/jepsen/flink/flink.clj
index d5d41579c38..c5d0d225932 100644
--- a/flink-jepsen/src/jepsen/flink/flink.clj
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -15,6 +15,7 @@
 ;; limitations under the License.
 
 (ns jepsen.flink.flink
+  (:gen-class)
   (:require [clojure.tools.logging :refer :all]
             [jepsen
              [cli :as cli]
@@ -24,31 +25,48 @@
             [jepsen.flink.client :refer :all]
             [jepsen.flink.checker :as flink-checker]
             [jepsen.flink.db :as fdb]
-            [jepsen.flink.nemesis :as fn])
-  (:import (jepsen.flink.client Client)))
+            [jepsen.flink.nemesis :as fn]))
 
 (def flink-test-config
-  {:yarn-session  {:db                  (fdb/flink-yarn-db)
-                   :deployment-strategy fdb/start-yarn-session!}
-   :yarn-job      {:db                  (fdb/flink-yarn-db)
-                   :deployment-strategy fdb/start-yarn-job!}
-   :mesos-session {:db                  (fdb/flink-mesos-db)
-                   :deployment-strategy fdb/start-mesos-session!}})
+  {:yarn-session       {:db                  (fdb/flink-yarn-db)
+                        :deployment-strategy fdb/start-yarn-session!}
+   :yarn-job           {:db                  (fdb/flink-yarn-db)
+                        :deployment-strategy fdb/start-yarn-job!}
+   :mesos-session      {:db                  (fdb/flink-mesos-db)
+                        :deployment-strategy fdb/start-mesos-session!}
+   :standalone-session {:db                  (fdb/flink-standalone-db)
+                        :deployment-strategy fdb/submit-job-from-first-node!}})
 
-(defn client-gen
+(def poll-job-running {:type :invoke, :f :job-running?, :value nil})
+(def cancel-job {:type :invoke, :f :cancel-job, :value nil})
+(def poll-job-running-loop (gen/seq (cycle [poll-job-running (gen/sleep 5)])))
+
+(defn default-client-gen
+  "Client generator that polls for the job running status."
   []
   (->
-    (cons {:type :invoke, :f :submit, :value nil}
-          (cycle [{:type :invoke, :f :job-running?, :value nil}
-                  (gen/sleep 5)]))
-    (gen/seq)
+    poll-job-running-loop
     (gen/singlethreaded)))
 
+(defn cancelling-client-gen
+  "Client generator that polls for the job running status, and cancels the job after 15 seconds."
+  []
+  (->
+    (gen/concat (gen/time-limit 15 (default-client-gen))
+                (gen/once cancel-job)
+                (default-client-gen))
+    (gen/singlethreaded)))
+
+(def client-gens
+  {:poll-job-running default-client-gen
+   :cancel-job       cancelling-client-gen})
+
 (defn flink-test
   [opts]
   (merge tests/noop-test
          (let [{:keys [db deployment-strategy]} (-> opts :deployment-mode flink-test-config)
-               {:keys [job-running-healthy-threshold job-recovery-grace-period]} opts]
+               {:keys [job-running-healthy-threshold job-recovery-grace-period]} opts
+               client-gen ((:client-gen opts) client-gens)]
            {:name      "Apache Flink"
             :os        debian/os
             :db        db
@@ -63,7 +81,7 @@
                                                    ((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
                                                    job-running-healthy-threshold
                                                    job-recovery-grace-period))))
-            :client    (Client. deployment-strategy nil nil nil nil)
+            :client    (create-client deployment-strategy)
             :checker   (flink-checker/job-running-checker)})
          (assoc opts :concurrency 1)))
 
@@ -93,6 +111,12 @@
                      :default :kill-task-managers
                      :validate [#(fn/nemesis-generator-factories (keyword %))
                                 (keys-as-allowed-values-help-text fn/nemesis-generator-factories)]]
+                    [nil "--client-gen GEN" (str "Which client should be used?"
+                                                 (keys-as-allowed-values-help-text client-gens))
+                     :parse-fn keyword
+                     :default :poll-job-running
+                     :validate [#(client-gens (keyword %))
+                                (keys-as-allowed-values-help-text client-gens)]]
                     [nil "--deployment-mode MODE" (keys-as-allowed-values-help-text flink-test-config)
                      :parse-fn keyword
                      :default :yarn-session
diff --git a/flink-jepsen/src/jepsen/flink/mesos.clj b/flink-jepsen/src/jepsen/flink/mesos.clj
index a73f25fd489..aef73598da9 100644
--- a/flink-jepsen/src/jepsen/flink/mesos.clj
+++ b/flink-jepsen/src/jepsen/flink/mesos.clj
@@ -22,36 +22,9 @@
              [util :as util :refer [meh]]]
             [jepsen.control.util :as cu]
             [jepsen.os.debian :as debian]
+            [jepsen.flink.utils :refer [create-supervised-service! stop-supervised-service!]]
             [jepsen.flink.zookeeper :refer [zookeeper-uri]]))
 
-;;; runit process supervisor (http://smarden.org/runit/)
-;;;
-;;; We use runit to supervise Mesos processes because Mesos uses a "fail-fast" approach to
-;;; error handling, e.g., the Mesos master will exit when it discovers it has been partitioned away
-;;; from the Zookeeper quorum.
-
-(def runit-version "2.1.2-3")
-
-(defn create-supervised-service!
-  "Registers a service with the process supervisor and starts it."
-  [service-name cmd]
-  (let [service-dir (str "/etc/sv/" service-name)
-        run-script (str service-dir "/run")]
-    (c/su
-      (c/exec :mkdir :-p service-dir)
-      (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
-                                               "exec 2>&1"
-                                               (str "exec " cmd)]) :> run-script)
-      (c/exec :chmod :+x run-script)
-      (c/exec :ln :-sf service-dir (str "/etc/service/" service-name)))))
-
-(defn stop-supervised-service!
-  "Stops a service and removes it from supervision."
-  [service-name]
-  (c/su
-    (c/exec :sv :down service-name)
-    (c/exec :rm :-f (str "/etc/service/" service-name))))
-
 ;;; Mesos
 
 (def master-count 1)
@@ -154,8 +127,7 @@
                       "keyserver.ubuntu.com"
                       "E56151BF")
     (debian/install {:mesos    mesos-version
-                     :marathon marathon-version
-                     :runit    runit-version})
+                     :marathon marathon-version})
     (c/exec :mkdir :-p "/var/run/mesos")
     (c/exec :mkdir :-p master-dir)
     (c/exec :mkdir :-p slave-dir)))
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj b/flink-jepsen/src/jepsen/flink/nemesis.clj
index 3047eeb9cc1..5335bba874c 100644
--- a/flink-jepsen/src/jepsen/flink/nemesis.clj
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -86,6 +86,11 @@
     (take n)
     (reverse)))
 
+(defn- inc-by-factor
+  [n factor]
+  (assert (>= factor 1))
+  (int (* n factor)))
+
 (defn stop-generator
   [stop source job-running-healthy-threshold job-recovery-grace-period]
   (gen/concat source
@@ -105,9 +110,12 @@
                                                 (flink-checker/get-job-running-history)
                                                 (take-last-with-default job-running-healthy-threshold false))]
                       (if (or
-                            (and
-                              (every? true? job-running-history))
-                            (> (ju/relative-time-nanos) (+ @t (ju/secs->nanos job-recovery-grace-period))))
+                            (every? true? job-running-history)
+                            (> (ju/relative-time-nanos) (+ @t
+                                                           (ju/secs->nanos
+                                                             (inc-by-factor
+                                                               job-recovery-grace-period
+                                                               1.1)))))
                         (do
                           (reset! stop true)
                           nil)
@@ -122,14 +130,14 @@
 (defn kill-taskmanagers-bursts-gen
   [time-limit]
   (fgen/time-limit time-limit
-                  (gen/seq (cycle (concat (repeat 20 {:type :info, :f :kill-task-managers})
-                                          [(gen/sleep 300)])))))
+                   (gen/seq (cycle (concat (repeat 20 {:type :info, :f :kill-task-managers})
+                                           [(gen/sleep 300)])))))
 
 (defn kill-jobmanagers-gen
   [time-limit]
   (fgen/time-limit (+ time-limit job-submit-grace-period)
-                  (gen/seq (cons (gen/sleep job-submit-grace-period)
-                                 (cycle [{:type :info, :f :kill-job-manager}])))))
+                   (gen/seq (cons (gen/sleep job-submit-grace-period)
+                                  (cycle [{:type :info, :f :kill-job-manager}])))))
 
 (defn fail-name-node-during-recovery
   []
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj b/flink-jepsen/src/jepsen/flink/utils.clj
index 3fd9f961e13..50d0075ca35 100644
--- a/flink-jepsen/src/jepsen/flink/utils.clj
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -15,7 +15,10 @@
 ;; limitations under the License.
 
 (ns jepsen.flink.utils
-  (:require [clojure.tools.logging :refer :all]))
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]]
+            [jepsen.os.debian :as debian]))
 
 (defn retry
   "Runs a function op and retries on exception.
@@ -46,3 +49,41 @@
          (Thread/sleep delay)
          (recur op (assoc keys :retries (dec retries))))
        (success r)))))
+
+;;; runit process supervisor (http://smarden.org/runit/)
+
+(def runit-version "2.1.2-3")
+
+(defn- install-process-supervisor!
+  "Installs the process supervisor."
+  []
+  (debian/install {:runit runit-version}))
+
+(defn create-supervised-service!
+  "Registers a service with the process supervisor and starts it."
+  [service-name cmd]
+  (let [service-dir (str "/etc/sv/" service-name)
+        run-script (str service-dir "/run")]
+    (info "Create supervised service" service-name)
+    (c/su
+      (install-process-supervisor!)
+      (c/exec :mkdir :-p service-dir)
+      (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
+                                               "exec 2>&1"
+                                               (str "exec " cmd)]) :> run-script)
+      (c/exec :chmod :+x run-script)
+      (c/exec :ln :-sfT service-dir (str "/etc/service/" service-name)))))
+
+(defn stop-supervised-service!
+  "Stops a service and removes it from supervision."
+  [service-name]
+  (info "Stop supervised service" service-name)
+  (c/su
+    (c/exec :rm :-f (str "/etc/service/" service-name))))
+
+(defn stop-all-supervised-services!
+  "Stops and removes all services from supervision if any."
+  []
+  (info "Stop all supervised services.")
+  (c/su
+    (c/exec :rm :-f (c/lit (str "/etc/service/*")))))
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj b/flink-jepsen/test/jepsen/flink/checker_test.clj
index 7389bbc15e9..c27d751e69e 100644
--- a/flink-jepsen/test/jepsen/flink/checker_test.clj
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -25,46 +25,91 @@
                  {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127443701575}
                  {:type :ok, :f :job-running?, :value false, :process 0, :time 127453553462}
                  {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127453553463}
-                 {:type :ok, :f :job-running?, :value true, :process 0, :time 127453553464}]]
-    (is (= (get-job-running-history history) [false true]))))
+                 {:type :ok, :f :job-running?, :value true, :process 0, :time 127453553464}
+                 {:type :info, :f :job-running?, :value nil, :process 0, :time 127453553465}]]
+    (is (= (get-job-running-history history) [false true false]))))
 
 (deftest job-running-checker-test
   (let [checker (job-running-checker)
         test {}
-        model (job-running-within-grace-period 3 60)
+        model (job-running-within-grace-period 3 60 10)
         opts {}
         check (fn [history] (checker/check checker test model history opts))]
-    (testing "Job is not running after grace period."
-      (is (= (:valid? (check
-                        [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
-                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}])) false)))
-    (testing "Job is running after grace period."
-      (is (= (:valid? (check
-                        [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000001}])) true)))
+    (testing "Model should be inconsistent if job is not running after grace period."
+      (let [result (check
+                     [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+                      {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is not running." (-> result :final-model :msg)))))
+    (testing "Model should be consistent if job is running after grace period."
+      (is (= true (:valid? (check
+                             [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000001}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000002}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000003}])))))
     (testing "Should tolerate non-running job during failures."
-      (is (= (:valid? (check
-                        [{:type :info, :f :partition-start, :process :nemesis, :time -1}
-                         {:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
-                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
-                         {:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
-                         {:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true)))
-    (testing "Should respect healthy threshold."
-      (is (= (:valid? (check
-                        [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
-                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000003}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) true))
-      (is (= (:valid? (check
-                        [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
-                         {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000002}
-                         {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}])) false)))
-    (testing "Job was not deployed successfully."
-      (is (= (:valid? (check [{:type :invoke, :f :job-running?, :value nil, :process 45, :time 239150413307}
-                              {:type :info, :f :job-running?, :value nil, :process 45, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])) false)))))
+      (is (= true (:valid? (check
+                             [{:type :info, :f :partition-start, :process :nemesis, :time -1}
+                              {:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
+                              {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
+                              {:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
+                              {:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000005}
+                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000006}])))))
+    (testing "Should not tolerate non-running job without a cause."
+      (let [result (check
+                     [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+                      {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+                      {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
+                      {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000002}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is not running." (-> result :final-model :msg)))))
+    (testing "Model should be inconsistent if job submission was unsuccessful."
+      (let [result (check [{:type :invoke, :f :job-running?, :value nil, :process 0, :time 239150413307}
+                           {:type :info, :f :job-running?, :value nil, :process 0, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])]
+        (is (= false (:valid? result)))))
+    (testing "Model should be inconsistent if the job status cannot be polled, i.e., if the cluster is unavailable."
+      (let [result (check [{:type :fail, :f :job-running?, :value nil, :process 0, :time 0 :error "Error"}
+                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000001 :error "Error"}
+                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000002 :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Should tolerate non-running job after cancellation."
+      (is (= true (:valid? (check [{:type :invoke, :f :cancel-job, :value nil, :process 0, :time 0}
+                                   {:type :ok, :f :cancel-job, :value true, :process 0, :time 1}
+                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
+                                   {:type :ok, :f :job-running?, :value false, :process 0, :time 3}])))))
+    (testing "Model should be inconsistent if job is running after cancellation."
+      (let [result (check [{:type :invoke, :f :cancel-job, :value nil, :process 0, :time 0}
+                           {:type :ok, :f :cancel-job, :value true, :process 0, :time 1}
+                           {:type :ok, :f :job-running?, :value true, :process 0, :time 10000000002}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is running after cancellation." (-> result :final-model :msg)))))
+    (testing "Model should be inconsistent if Flink cluster is not available at the end."
+      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+                           {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
+                           {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
+                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000003, :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Model should be inconsistent if Flink cluster is not available after job cancellation."
+      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
+                           {:type :invoke, :f :cancel-job, :value nil, :process 0, :time 1}
+                           {:type :ok, :f :cancel-job, :value true, :process 0, :time 2}
+                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000001, :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Should throw AssertionError if job cancelling operation failed."
+      (is (thrown-with-msg? AssertionError
+                            #":cancel-job must not fail"
+                            (check [{:type :fail, :f :cancel-job, :value nil, :process 0, :time 0}]))))
+    (testing "Should tolerate non-running job if grace period has not passed."
+      (is (= true (:valid? (check [{:type :invoke, :f :job-running?, :value nil, :process 0, :time 0}
+                                   {:type :ok, :f :job-running?, :value false, :process 0, :time 1}
+                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
+                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 3}
+                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 4}])))))))
 
 (deftest safe-inc-test
   (is (= (safe-inc nil) 1))
diff --git a/flink-jepsen/test/jepsen/flink/client_test.clj b/flink-jepsen/test/jepsen/flink/client_test.clj
index b4373bfb124..a73c936d08d 100644
--- a/flink-jepsen/test/jepsen/flink/client_test.clj
+++ b/flink-jepsen/test/jepsen/flink/client_test.clj
@@ -17,7 +17,8 @@
 (ns jepsen.flink.client-test
   (:require [clojure.test :refer :all]
             [clj-http.fake :as fake]
-            [jepsen.flink.client :refer :all]))
+            [jepsen.flink.client :refer :all])
+  (:import (clojure.lang ExceptionInfo)))
 
 (deftest read-url-test
   (is (= "https://www.asdf.de" (read-url (byte-array [0xAC 0xED 0x00 0x05 0x77 0x15 0x00 0x13 0x68 0x74 0x74 0x70 0x73 0x3A 0x2F 0x2F 0x77 0x77 0x77 0x2E 0x61 0x73 0x64 0x66 0x2E 0x64 0x65])))))
@@ -25,13 +26,52 @@
 (deftest job-running?-test
   (fake/with-fake-routes
     {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c"
-     (fn [request] {:status  200
-                    :headers {}
-                    :body    "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
-     "http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196d"
-     (fn [request] {:status  200
-                    :headers {}
-                    :body    "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"CREATED\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})}
+     (fn [_] {:status 200
+              :body   "{\"jid\":\"a718f168ec6be8eff1345a17bf64196c\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
+
+     "http://localhost:8081/jobs/54ae4d8ec01d85053d7eb5d139492df2"
+     (fn [_] {:status 200
+              :body   "{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source: Socket Stream -> Flat Map\",\"parallelism\":1,\"status\":\"CREATED\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: Print to Std. Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; Sink: Print to Std. Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
+
+     "http://localhost:8081/jobs/ec3a61df646e665d31899bb26aba10b7"
+     (fn [_] {:status 404})}
 
     (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196c") true))
-    (is (= (job-running? "http://localhost:8081" "a718f168ec6be8eff1345a17bf64196d") false))))
+    (is (= (job-running? "http://localhost:8081" "54ae4d8ec01d85053d7eb5d139492df2") false))
+    (is (= (job-running? "http://localhost:8081" "ec3a61df646e665d31899bb26aba10b7") false))))
+
+(deftest cancel-job!-test
+  (fake/with-fake-routes
+    {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c"
+     {:patch (fn [_] {:status 202})}
+
+     "http://localhost:8081/jobs/54ae4d8ec01d85053d7eb5d139492df2"
+     {:patch (fn [_] {:status 404})}
+
+     "http://localhost:8081/jobs/ec3a61df646e665d31899bb26aba10b7"
+     {:patch (fn [_] {:status 503})}}
+
+    (testing "Successful job cancellation."
+      (is (= true (@#'jepsen.flink.client/cancel-job!
+                    "http://localhost:8081"
+                    "a718f168ec6be8eff1345a17bf64196c"))))
+
+    (testing "Job not found."
+      (is (= false (@#'jepsen.flink.client/cancel-job!
+                     "http://localhost:8081"
+                     "54ae4d8ec01d85053d7eb5d139492df2"))))
+
+    (testing "Throw if HTTP status code is 503."
+      (is (thrown-with-msg? ExceptionInfo
+                            #"Job cancellation unsuccessful"
+                            (@#'jepsen.flink.client/cancel-job!
+                              "http://localhost:8081"
+                              "ec3a61df646e665d31899bb26aba10b7"))))))
+
+(deftest dispatch-operation-test
+  (let [op {:type :invoke, :f :job-running?, :value nil}
+        test-fn (constantly 1)]
+    (testing "Dispatching operation completes normally."
+      (is (= {:type :ok :value 1} (select-keys (dispatch-operation op (test-fn)) [:type :value]))))
+    (testing "Returned operation should be of type :fail and have a nil value on exception."
+      (is (= {:type :fail :value nil :error "expected"} (select-keys (dispatch-operation op (throw (Exception. "expected"))) [:type :value :error]))))))
diff --git a/flink-jepsen/test/jepsen/flink/nemesis_test.clj b/flink-jepsen/test/jepsen/flink/nemesis_test.clj
new file mode 100644
index 00000000000..488631ffea7
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/nemesis_test.clj
@@ -0,0 +1,32 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.nemesis-test
+  (:require [clojure.test :refer :all])
+  (:require [jepsen.flink.nemesis :refer :all]))
+
+(deftest inc-by-factor-test
+  (testing "Should not increase if factor is 1."
+    (is (= 10 (@#'jepsen.flink.nemesis/inc-by-factor 10 1))))
+
+  (testing "Should increase by factor."
+    (is (= 15 (@#'jepsen.flink.nemesis/inc-by-factor 10 1.5))))
+
+  (testing "Should round down."
+    (is (= 15 (@#'jepsen.flink.nemesis/inc-by-factor 10 1.52))))
+
+  (testing "Should throw if factor < 1."
+    (is (thrown? AssertionError (@#'jepsen.flink.nemesis/inc-by-factor 1 0)))))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services