You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/10 10:51:50 UTC

[GitHub] GJL closed pull request #7166: [FLINK-10985][tests] Enable submission of multiple jobs.

GJL closed pull request #7166: [FLINK-10985][tests] Enable submission of multiple jobs.
URL: https://github.com/apache/flink/pull/7166
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
index a3e2668c26b..d2678f89302 100644
--- a/flink-jepsen/README.md
+++ b/flink-jepsen/README.md
@@ -20,7 +20,7 @@ semantics.
 
 ## Usage
 See the [Jepsen documentation](https://github.com/jepsen-io/jepsen#setting-up-a-jepsen-environment)
-for how to set up the environment to run tests. The script under `scripts/run-tests.sh` documents how to invoke
+for how to set up the environment to run tests. The script under `docker/run-tests.sh` documents how to invoke
 tests. The Flink job used for testing is located under
 `flink-end-to-end-tests/flink-datastream-allround-test`. You have to build the job first and copy
 the resulting jar (`DataStreamAllroundTestProgram.jar`) to the `./bin` directory of this project's
diff --git a/flink-jepsen/docker/data-stream-test-program-parallelism-1.edn b/flink-jepsen/docker/data-stream-test-program-parallelism-1.edn
new file mode 100644
index 00000000000..a20903e8897
--- /dev/null
+++ b/flink-jepsen/docker/data-stream-test-program-parallelism-1.edn
@@ -0,0 +1,18 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+{:jobs [{:job-jar  "/jepsen/bin/DataStreamAllroundTestProgram.jar"
+         :job-args "--environment.parallelism 1 --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"}]}
diff --git a/flink-jepsen/docker/run-tests.sh b/flink-jepsen/docker/run-tests.sh
index 8b2b1e6d18f..a81b873d692 100755
--- a/flink-jepsen/docker/run-tests.sh
+++ b/flink-jepsen/docker/run-tests.sh
@@ -17,6 +17,8 @@
 # limitations under the License.
 ################################################################################
 
+set -euo pipefail
+
 dockerdir=$(dirname $0)
 dockerdir=$(cd ${dockerdir}; pwd)
 
@@ -26,6 +28,27 @@ n2
 n3
 EOF
 
-common_jepsen_args+=(--nodes-file ${dockerdir}/nodes)
+common_jepsen_args+=(--ha-storage-dir hdfs:///flink
+--test-spec "${dockerdir}/data-stream-test-program-parallelism-1.edn"
+--tarball ${2}
+--ssh-private-key ~/.ssh/id_rsa
+--nodes-file ${dockerdir}/nodes)
+
+for i in $(seq 1 ${1})
+do
+  echo "Executing run #${i} of ${1}"
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-session
+
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-job
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-job
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-job
+
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode mesos-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode mesos-session
 
-. ${dockerdir}/../scripts/run-tests.sh ${1} ${2} 1
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode standalone-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --client-gen cancel-jobs --deployment-mode standalone-session
+  echo
+done
diff --git a/flink-jepsen/scripts/run-tests.sh b/flink-jepsen/scripts/run-tests.sh
deleted file mode 100755
index a2b256b6f6a..00000000000
--- a/flink-jepsen/scripts/run-tests.sh
+++ /dev/null
@@ -1,50 +0,0 @@
-#!/usr/bin/env bash
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-set -euo pipefail
-
-scripts=$(dirname $0)
-scripts=$(cd ${scripts}; pwd)
-
-parallelism=${3}
-
-common_jepsen_args+=(--ha-storage-dir hdfs:///flink
---job-jar ${scripts}/../bin/DataStreamAllroundTestProgram.jar
---tarball ${2}
---job-args "--environment.parallelism ${parallelism} --state_backend.checkpoint_directory hdfs:///checkpoints --state_backend rocks --state_backend.rocks.incremental true"
---ssh-private-key ~/.ssh/id_rsa)
-
-for i in $(seq 1 ${1})
-do
-  echo "Executing run #${i} of ${1}"
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-session
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-session
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-session
-
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode yarn-job
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode yarn-job
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen fail-name-node-during-recovery --deployment-mode yarn-job
-
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers --deployment-mode mesos-session
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode mesos-session
-
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --deployment-mode standalone-session
-  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers --client-gen cancel-job --deployment-mode standalone-session
-  echo
-done
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj b/flink-jepsen/src/jepsen/flink/checker.clj
index 7e437e9d628..b036b0a27d6 100644
--- a/flink-jepsen/src/jepsen/flink/checker.clj
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -155,34 +155,80 @@
   ([job-running-healthy-threshold job-recovery-grace-period]
    (job-running-within-grace-period job-running-healthy-threshold job-recovery-grace-period 10)))
 
-(defn get-job-running-history
+(defn- history->jobs-running?-value
   [history]
   (->>
     history
-    (remove #(= (:process %) :nemesis))
+    (filter #(= (:f %) :jobs-running?))
     (remove #(= (:type %) :invoke))
-    (map :value)
-    (map boolean)
-    (remove nil?)))
+    (map :value)))
+
+(defn- history->job-ids
+  "Extracts all job ids from a history."
+  [history]
+  (set (->> history
+            (history->jobs-running?-value)
+            (map keys)
+            (flatten)
+            (remove nil?))))
+
+(defn all-jobs-running?-history
+  [history]
+  (->>
+    history
+    (history->jobs-running?-value)
+    (map vals)
+    (map #(and
+            (not (empty? %))
+            (every? true? %)))))
 
 (defn- healthy?
   [model]
-  (>= (:healthy-count model) (:healthy-threshold model)))
+  (or (>= (:healthy-count model) (:healthy-threshold model))
+      (:job-canceled? model)))
+
+(defn- jobs-running?->job-running?
+  "Rewrites history entries of the form {:f :jobs-running? :value {...}}
+
+  Example: {:type ok :f :jobs-running? :value {job-id-1 true}} -> {:type ok :f :job-running? :value true}"
+  [history-entry job-id]
+  (let [job-running?-entry (assoc history-entry :f :job-running?)
+        job-running?-entry-ok (update job-running?-entry :value #(get % job-id))]
+    (if (= (:type history-entry) :ok)
+      job-running?-entry-ok
+      job-running?-entry)))
+
+(defn- history->single-job-history
+  "Rewrites a history to one that appears to run a single Flink job."
+  [history job-id]
+  (let [transform-history-entry (fn [history-entry]
+                                  (case (:f history-entry)
+                                    :jobs-running? (jobs-running?->job-running? history-entry job-id)
+                                    :cancel-jobs (assoc history-entry :f :cancel-job)
+                                    history-entry))]
+    (map transform-history-entry history)))
+
+(defn- compute-final-model
+  [model history]
+  (let [start-time (-> history first :time)]
+    (reduce model/step
+            (assoc model :last-failure start-time)
+            history)))
 
 (defn job-running-checker
   []
   (reify
     checker/Checker
     (check [_ test model history _]
-      (let [final (reduce model/step (assoc model :last-failure (:time (first history))) history)
-            result-map (conj {}
-                             (find test :nemesis-gen)
-                             (find test :deployment-mode))]
-        (if (or (model/inconsistent? final)
-                (and
-                  (not (healthy? final))
-                  (not (:job-canceled? final))))
-          (into result-map {:valid?      false
-                            :final-model final})
-          (into result-map {:valid?      true
-                            :final-model final}))))))
+      (let [job-ids (history->job-ids history)
+            individual-job-histories (map (partial history->single-job-history history) job-ids)
+            final-models (map (partial compute-final-model model) individual-job-histories)
+            inconsistent-or-unhealthy (or (empty? job-ids)
+                                          (some model/inconsistent? final-models)
+                                          (some (complement healthy?) final-models))
+            result-map (select-keys test [:nemesis-gen :deployment-mode])]
+        (if inconsistent-or-unhealthy
+          (into result-map {:valid?       false
+                            :final-models final-models})
+          (into result-map {:valid?       true
+                            :final-models final-models}))))))
diff --git a/flink-jepsen/src/jepsen/flink/client.clj b/flink-jepsen/src/jepsen/flink/client.clj
index 1ab987bd704..afbfe56e59b 100644
--- a/flink-jepsen/src/jepsen/flink/client.clj
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -119,6 +119,14 @@
                 (map :status)
                 (every? #(= "RUNNING" %)))))))
 
+(defn jobs-running?
+  "Checks if multiple jobs are running. Returns a map where the keys are job ids and the values are
+  booleans indicating whether the job is running or not."
+  [base-url job-ids]
+  (let [job-running-on-current-master? (partial job-running? base-url)
+        make-job-id-running?-pair (juxt identity job-running-on-current-master?)]
+    (into {} (map make-job-id-running?-pair job-ids))))
+
 (defn- cancel-job!
   "Cancels the specified job. Returns true if the job could be canceled.
   Returns false if the job does not exist. Throws an exception if the HTTP status
@@ -131,6 +139,10 @@
       (not (http/success? response)) (throw (ex-info "Job cancellation unsuccessful" {:job-id job-id :error error}))
       :else true)))
 
+(defn- cancel-jobs!
+  [base-url job-ids]
+  (doseq [job-id job-ids] (cancel-job! base-url job-id)))
+
 (defmacro dispatch-operation
   [op & body]
   `(try
@@ -148,24 +160,24 @@
                                                                     (System/exit 1)))))
 
 (defn- dispatch-rest-operation!
-  [rest-url job-id op]
-  (assert job-id)
+  [rest-url job-ids op]
+  (assert job-ids)
   (if-not rest-url
     (assoc op :type :fail :error "Have not determined REST URL yet.")
     (case (:f op)
-      :job-running? (dispatch-operation op (fu/retry
-                                             (partial job-running? rest-url job-id)
-                                             :retries 3
-                                             :fallback #(throw %)))
-      :cancel-job (dispatch-operation-or-fatal op (cancel-job! rest-url job-id)))))
+      :jobs-running? (dispatch-operation op (fu/retry
+                                              (partial jobs-running? rest-url job-ids)
+                                              :retries 3
+                                              :fallback #(throw %)))
+      :cancel-jobs (dispatch-operation-or-fatal op (cancel-jobs! rest-url job-ids)))))
 
 (defrecord Client
-  [deploy-cluster!                                          ; function that starts a non-standalone cluster and submits the job
+  [deploy-cluster!                                          ; function that starts a non-standalone cluster and submits the jobs
    closer                                                   ; function that closes the ZK client
    rest-url                                                 ; atom storing the current rest-url
    init-future                                              ; future that completes if rest-url is set to an initial value
-   job-id                                                   ; atom storing the job-id
-   job-submitted?]                                          ; Has the job already been submitted? Used to avoid re-submission if the client is re-opened.
+   job-ids                                                  ; atom storing the job-ids
+   job-submitted?]                                          ; Have the jobs already been submitted? Used to avoid re-submission if the client is re-opened.
   client/Client
   (open! [this test _]
     (info "Open client.")
@@ -183,14 +195,13 @@
                            :fallback (fn [e]
                                        (fatal e "Could not get running jobs.")
                                        (System/exit 1)))
-            num-jobs (count jobs)
-            job (first jobs)]
-        (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
-        (info "Submitted job" job)
-        (reset! job-id job))))
+            num-jobs (count jobs)]
+        (assert (pos? num-jobs) (str "Expected at least 1 job, was " num-jobs))
+        (info "Submitted jobs" jobs)
+        (reset! job-ids jobs))))
 
   (invoke! [_ _ op]
-    (dispatch-rest-operation! @rest-url @job-id op))
+    (dispatch-rest-operation! @rest-url @job-ids op))
 
   (teardown! [_ _])
   (close! [_ _]
diff --git a/flink-jepsen/src/jepsen/flink/db.clj b/flink-jepsen/src/jepsen/flink/db.clj
index 971c69e5000..30467767481 100644
--- a/flink-jepsen/src/jepsen/flink/db.clj
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -69,6 +69,20 @@
     ;; TODO: write log4j.properties properly
     (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, file/' " install-dir "/conf/log4j.properties")))))
 
+(defn- file-name
+  [path]
+  (.getName (clojure.java.io/file path)))
+
+(defn upload-job-jar!
+  [job-jar]
+  (c/upload job-jar upload-dir)
+  (c/exec :mv (str upload-dir "/" (file-name job-jar)) install-dir))
+
+(defn upload-job-jars!
+  [job-jars]
+  (doseq [job-jar job-jars]
+    (upload-job-jar! job-jar)))
+
 (defn install-flink!
   [test node]
   (let [url (:tarball test)]
@@ -76,8 +90,7 @@
     (cu/install-archive! url install-dir)
     (info "Enable S3 FS")
     (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I {} mv {} " install-dir "/lib")))
-    (c/upload (:job-jar test) upload-dir)
-    (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar test)))) install-dir)
+    (upload-job-jars! (->> test :test-spec :jobs (map :job-jar)))
     (write-configuration! test node)))
 
 (defn teardown-flink!
@@ -145,7 +158,7 @@
   [m]
   (->>
     (map #(str (name (first %)) "=" (second %)) m)
-    (clojure.string/join " ")
+    (apply fu/join-space)
     (#(str % " "))))
 
 (defn- hadoop-env-vars
@@ -158,26 +171,25 @@
   (c/su
     (c/exec (c/lit (str
                      (hadoop-env-vars)
-                     install-dir "/bin/flink " cmd " " args)))))
+                     install-dir "/bin/flink " cmd " " (apply fu/join-space args))))))
 
 (defn flink-run-cli-args
   "Returns the CLI args that should be passed to 'flink run'"
-  [test]
+  [job-spec]
   (concat
     ["-d"]
-    (if (:main-class test)
-      [(str "-c " (:main-class test))]
+    (if (:main-class job-spec)
+      [(str "-c " (:main-class job-spec))]
       [])))
 
 (defn submit-job!
   ([test] (submit-job! test []))
   ([test cli-args]
-   (exec-flink! "run" (clojure.string/join
-                        " "
-                        (concat cli-args
-                                (flink-run-cli-args test)
-                                [(str install-dir "/" (last (str/split (:job-jar test) #"/")))
-                                 (:job-args test)])))))
+   (doseq [{:keys [job-jar job-args] :as job-spec} (-> test :test-spec :jobs)]
+     (exec-flink! "run" (concat cli-args
+                                (flink-run-cli-args job-spec)
+                                [(str install-dir "/" (file-name job-jar))
+                                 job-args])))))
 
 ;;; Standalone
 
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj b/flink-jepsen/src/jepsen/flink/flink.clj
index c5d0d225932..b2b7644cd02 100644
--- a/flink-jepsen/src/jepsen/flink/flink.clj
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -37,15 +37,15 @@
    :standalone-session {:db                  (fdb/flink-standalone-db)
                         :deployment-strategy fdb/submit-job-from-first-node!}})
 
-(def poll-job-running {:type :invoke, :f :job-running?, :value nil})
-(def cancel-job {:type :invoke, :f :cancel-job, :value nil})
-(def poll-job-running-loop (gen/seq (cycle [poll-job-running (gen/sleep 5)])))
+(def poll-jobs-running {:type :invoke, :f :jobs-running?, :value nil})
+(def cancel-jobs {:type :invoke, :f :cancel-jobs, :value nil})
+(def poll-jobs-running-loop (gen/seq (cycle [poll-jobs-running (gen/sleep 5)])))
 
 (defn default-client-gen
   "Client generator that polls for the job running status."
   []
   (->
-    poll-job-running-loop
+    poll-jobs-running-loop
     (gen/singlethreaded)))
 
 (defn cancelling-client-gen
@@ -53,13 +53,13 @@
   []
   (->
     (gen/concat (gen/time-limit 15 (default-client-gen))
-                (gen/once cancel-job)
+                (gen/once cancel-jobs)
                 (default-client-gen))
     (gen/singlethreaded)))
 
 (def client-gens
   {:poll-job-running default-client-gen
-   :cancel-job       cancelling-client-gen})
+   :cancel-jobs      cancelling-client-gen})
 
 (defn flink-test
   [opts]
@@ -94,6 +94,10 @@
        (clojure.string/join ", ")
        (str "Must be one of: ")))
 
+(defn read-test-spec
+  [path]
+  (clojure.edn/read-string (slurp path)))
+
 (defn -main
   [& args]
   (cli/run!
@@ -101,10 +105,8 @@
       (cli/single-test-cmd
         {:test-fn  flink-test
          :tarball  fdb/default-flink-dist-url
-         :opt-spec [[nil "--ha-storage-dir DIR" "high-availability.storageDir"]
-                    [nil "--job-jar JAR" "Path to the job jar"]
-                    [nil "--job-args ARGS" "CLI arguments for the flink job"]
-                    [nil "--main-class CLASS" "Job main class"]
+         :opt-spec [[nil "--test-spec FILE" "" :parse-fn read-test-spec]
+                    [nil "--ha-storage-dir DIR" "high-availability.storageDir"]
                     [nil "--nemesis-gen GEN" (str "Which nemesis should be used?"
                                                   (keys-as-allowed-values-help-text fn/nemesis-generator-factories))
                      :parse-fn keyword
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj b/flink-jepsen/src/jepsen/flink/nemesis.clj
index 5335bba874c..07d69ead9cf 100644
--- a/flink-jepsen/src/jepsen/flink/nemesis.clj
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -107,7 +107,7 @@
                           job-running-history (->>
                                                 history
                                                 (filter (fn [op] (>= (- (:time op) @t) 0)))
-                                                (flink-checker/get-job-running-history)
+                                                (flink-checker/all-jobs-running?-history)
                                                 (take-last-with-default job-running-healthy-threshold false))]
                       (if (or
                             (every? true? job-running-history)
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj b/flink-jepsen/src/jepsen/flink/utils.clj
index 1aa53efe7ae..6634a7ec83a 100644
--- a/flink-jepsen/src/jepsen/flink/utils.clj
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -51,6 +51,10 @@
          (recur op (assoc keys :retries (dec retries))))
        (success r)))))
 
+(defn join-space
+  [& tokens]
+  (clojure.string/join " " tokens))
+
 (defn find-files!
   "Lists files recursively given a directory. If the directory does not exist, an empty collection
   is returned."
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj b/flink-jepsen/test/jepsen/flink/checker_test.clj
index c27d751e69e..d1ade1645b0 100644
--- a/flink-jepsen/test/jepsen/flink/checker_test.clj
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -20,96 +20,99 @@
              [checker :as checker]]
             [jepsen.flink.checker :refer :all]))
 
-(deftest get-job-running-history-test
+(deftest all-jobs-running?-history-test
   (let [history [{:type :info, :f :kill-random-subset-task-managers, :process :nemesis, :time 121898381144, :value '("172.31.33.170")}
-                 {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127443701575}
-                 {:type :ok, :f :job-running?, :value false, :process 0, :time 127453553462}
-                 {:type :invoke, :f :job-running?, :value nil, :process 0, :time 127453553463}
-                 {:type :ok, :f :job-running?, :value true, :process 0, :time 127453553464}
-                 {:type :info, :f :job-running?, :value nil, :process 0, :time 127453553465}]]
-    (is (= (get-job-running-history history) [false true false]))))
+                 {:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 127443701575}
+                 {:type :ok, :f :jobs-running?, :value {"3886d6b547969c3f15c53896bb496b8f" false}, :process 0, :time 127453553462}
+                 {:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 127453553463}
+                 {:type :ok, :f :jobs-running?, :value {"3886d6b547969c3f15c53896bb496b8f" true}, :process 0, :time 127453553464}
+                 {:type :info, :f :jobs-running?, :value nil, :process 0, :time 127453553465}]]
+    (is (= [false true false] (all-jobs-running?-history history)))))
 
 (deftest job-running-checker-test
   (let [checker (job-running-checker)
         test {}
         model (job-running-within-grace-period 3 60 10)
         opts {}
-        check (fn [history] (checker/check checker test model history opts))]
+        check (fn [history] (checker/check checker test model history opts))
+        job-running-value {"3886d6b547969c3f15c53896bb496b8f" true}
+        job-not-running-value {"3886d6b547969c3f15c53896bb496b8f" false}]
     (testing "Model should be inconsistent if job is not running after grace period."
       (let [result (check
                      [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
-                      {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}])]
+                      {:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 60000000001}])]
         (is (= false (:valid? result)))
-        (is (= "Job is not running." (-> result :final-model :msg)))))
+        (is (= "Job is not running." (-> result :final-models first :msg)))))
     (testing "Model should be consistent if job is running after grace period."
       (is (= true (:valid? (check
                              [{:type :info, :f :kill-task-managers, :process :nemesis, :time 0, :value ["172.31.32.48"]}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000001}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000002}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000003}])))))
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000001}
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000002}
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000003}])))))
     (testing "Should tolerate non-running job during failures."
       (is (= true (:valid? (check
                              [{:type :info, :f :partition-start, :process :nemesis, :time -1}
                               {:type :info, :f :partition-start, :process :nemesis, :time 0, :value "Cut off [...]"}
-                              {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
+                              {:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 60000000001}
                               {:type :info, :f :partition-stop, :process :nemesis, :time 60000000002}
                               {:type :info, :f :partition-stop, :process :nemesis, :time 60000000003, :value "fully connected"}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000004}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000005}
-                              {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000006}])))))
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000004}
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000005}
+                              {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000006}])))))
     (testing "Should not tolerate non-running job without a cause."
       (let [result (check
-                     [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
-                      {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
-                      {:type :ok, :f :job-running?, :value false, :process 0, :time 60000000001}
-                      {:type :ok, :f :job-running?, :value true, :process 0, :time 60000000002}])]
+                     [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
+                      {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 1}
+                      {:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 60000000001}
+                      {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 60000000002}])]
         (is (= false (:valid? result)))
-        (is (= "Job is not running." (-> result :final-model :msg)))))
+        (is (= "Job is not running." (-> result :final-models first :msg)))))
     (testing "Model should be inconsistent if job submission was unsuccessful."
-      (let [result (check [{:type :invoke, :f :job-running?, :value nil, :process 0, :time 239150413307}
-                           {:type :info, :f :job-running?, :value nil, :process 0, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])]
+      (let [result (check [{:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 239150413307}
+                           {:type :info, :f :jobs-running?, :value nil, :process 0, :time 239150751938, :error "indeterminate: Assert failed: job-id"}])]
         (is (= false (:valid? result)))))
     (testing "Model should be inconsistent if the job status cannot be polled, i.e., if the cluster is unavailable."
-      (let [result (check [{:type :fail, :f :job-running?, :value nil, :process 0, :time 0 :error "Error"}
-                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000001 :error "Error"}
-                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000002 :error "Error"}])]
+      (let [result (check [{:type :fail, :f :jobs-running?, :value job-running-value, :process 0, :time 0 :error "Error"}
+                           {:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000001 :error "Error"}
+                           {:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000002 :error "Error"}])]
         (is (= false (:valid? result)))
-        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+        (is (= "Cluster is not running." (-> result :final-models first :msg)))))
     (testing "Should tolerate non-running job after cancellation."
-      (is (= true (:valid? (check [{:type :invoke, :f :cancel-job, :value nil, :process 0, :time 0}
-                                   {:type :ok, :f :cancel-job, :value true, :process 0, :time 1}
-                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
-                                   {:type :ok, :f :job-running?, :value false, :process 0, :time 3}])))))
+      (is (= true (:valid? (check [{:type :invoke, :f :cancel-jobs, :value nil, :process 0, :time 0}
+                                   {:type :ok, :f :cancel-jobs, :value nil, :process 0, :time 1}
+                                   {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 2}
+                                   {:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 3}])))))
     (testing "Model should be inconsistent if job is running after cancellation."
-      (let [result (check [{:type :invoke, :f :cancel-job, :value nil, :process 0, :time 0}
-                           {:type :ok, :f :cancel-job, :value true, :process 0, :time 1}
-                           {:type :ok, :f :job-running?, :value true, :process 0, :time 10000000002}])]
+      (let [result (check [{:type :invoke, :f :cancel-jobs, :value nil, :process 0, :time 0}
+                           {:type :ok, :f :cancel-jobs, :value true, :process 0, :time 1}
+                           {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 10000000002}])]
         (is (= false (:valid? result)))
-        (is (= "Job is running after cancellation." (-> result :final-model :msg)))))
+        (is (= "Job is running after cancellation." (-> result :final-models first :msg)))))
     (testing "Model should be inconsistent if Flink cluster is not available at the end."
-      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
-                           {:type :ok, :f :job-running?, :value true, :process 0, :time 1}
-                           {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
-                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000003, :error "Error"}])]
+      (let [result (check [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
+                           {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 1}
+                           {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 2}
+                           {:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000003, :error "Error"}])]
         (is (= false (:valid? result)))
-        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+        (is (= "Cluster is not running." (-> result :final-models first :msg)))))
     (testing "Model should be inconsistent if Flink cluster is not available after job cancellation."
-      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 0, :time 0}
-                           {:type :invoke, :f :cancel-job, :value nil, :process 0, :time 1}
-                           {:type :ok, :f :cancel-job, :value true, :process 0, :time 2}
-                           {:type :fail, :f :job-running?, :value nil, :process 0, :time 60000000001, :error "Error"}])]
+      (let [result (check [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
+                           {:type :invoke, :f :cancel-jobs, :value nil, :process 0, :time 1}
+                           {:type :ok, :f :cancel-jobs, :value job-running-value, :process 0, :time 2}
+                           {:type :fail, :f :jobs-running?, :value nil, :process 0, :time 60000000001, :error "Error"}])]
         (is (= false (:valid? result)))
-        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+        (is (= "Cluster is not running." (-> result :final-models first :msg)))))
     (testing "Should throw AssertionError if job cancelling operation failed."
       (is (thrown-with-msg? AssertionError
                             #":cancel-job must not fail"
-                            (check [{:type :fail, :f :cancel-job, :value nil, :process 0, :time 0}]))))
+                            (check [{:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 0}
+                                    {:type :fail, :f :cancel-jobs, :value nil, :process 0, :time 1}]))))
     (testing "Should tolerate non-running job if grace period has not passed."
-      (is (= true (:valid? (check [{:type :invoke, :f :job-running?, :value nil, :process 0, :time 0}
-                                   {:type :ok, :f :job-running?, :value false, :process 0, :time 1}
-                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 2}
-                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 3}
-                                   {:type :ok, :f :job-running?, :value true, :process 0, :time 4}])))))))
+      (is (= true (:valid? (check [{:type :invoke, :f :jobs-running?, :value nil, :process 0, :time 0}
+                                   {:type :ok, :f :jobs-running?, :value job-not-running-value, :process 0, :time 1}
+                                   {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 2}
+                                   {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 3}
+                                   {:type :ok, :f :jobs-running?, :value job-running-value, :process 0, :time 4}])))))))
 
 (deftest safe-inc-test
   (is (= (safe-inc nil) 1))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services