You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:31 UTC

[46/50] [abbrv] storm git commit: BUG-40864: Handing zookeeper failures that can result in nimbus ephemeral entries getting deleted. Adding a sleep before cody-sycn thread executes ls /code-distributor/topology-id to ensure it gets the correct id back so

BUG-40864: Handing zookeeper failures that can result in nimbus ephemeral entries getting deleted. Adding a sleep before cody-sycn thread executes ls /code-distributor/topology-id to ensure it gets the correct id back so users dont have to wait for upto 5 minutes to submit topology.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ef3cee6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ef3cee6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ef3cee6d

Branch: refs/heads/master
Commit: ef3cee6d5d9d673246dd1c7d2caa44240546e228
Parents: 21ba9c1
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Jul 7 12:16:22 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Aug 12 09:58:57 2015 -0700

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/cluster.clj   | 21 ++++++++++++++++++--
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  2 ++
 storm-core/src/clj/backtype/storm/zookeeper.clj | 14 ++++++++++++-
 3 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ef3cee6d/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index 65cdb47..04f5b89 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -19,6 +19,8 @@
            [backtype.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary]
            [java.io Serializable])
   (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
+  (:import [org.apache.curator.framework.state ConnectionStateListener ConnectionState])
+  (:import [org.apache.curator.framework CuratorFramework])
   (:import [backtype.storm.utils Utils])
   (:import [java.security MessageDigest])
   (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
@@ -41,7 +43,8 @@
   (exists-node? [this path watch?])
   (close [this])
   (register [this callback])
-  (unregister [this id]))
+  (unregister [this id])
+  (add-listener [this listener]))
 
 (defn mk-topo-only-acls
   [topo-conf]
@@ -137,7 +140,12 @@
      (close
        [this]
        (reset! active false)
-       (.close zk)))))
+       (.close zk))
+
+      (add-listener
+        [this listener]
+        (zk/add-listener zk listener))
+      )))
 
 (defprotocol StormClusterState
   (assignments [this callback])
@@ -368,6 +376,15 @@
         [this nimbus-id nimbus-summary]
         ;explicit delete for ephmeral node to ensure this session creates the entry.
         (delete-node cluster-state (nimbus-path nimbus-id))
+
+        (add-listener cluster-state (reify ConnectionStateListener
+                        (^void stateChanged[this ^CuratorFramework client ^ConnectionState newState]
+                          (log-message "Connection state listener invoked, zookeeper connection state has changed to " newState)
+                          (if (.equals newState ConnectionState/RECONNECTED)
+                            (do
+                              (log-message "Connection state has changed to reconnected so setting nimbuses entry one more time")
+                              (set-ephemeral-node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))))))
+
         (set-ephemeral-node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))
 
       (code-distributor-info

http://git-wip-us.apache.org/repos/asf/storm/blob/ef3cee6d/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 4f516ce..fa3d8ec 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -1492,6 +1492,8 @@
         (.removeFromLeaderLockQueue (:leader-elector nimbus))
         (doseq [missing missing-topologies]
           (log-message "missing topology " missing " has state on zookeeper but doesn't have a local dir on this host.")
+          ;; complete heck to get around zookeeper eventual consistency issue. zk/sync is not helping us so adding a sleep.
+          (sleep-secs 5)
           (let [nimbuses-with-missing (.code-distributor-info storm-cluster-state missing)]
             (log-message "trying to download missing topology code from " (clojure.string/join "," nimbuses-with-missing))
             (doseq [nimbus-host-port nimbuses-with-missing]

http://git-wip-us.apache.org/repos/asf/storm/blob/ef3cee6d/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj b/storm-core/src/clj/backtype/storm/zookeeper.clj
index f714a9e..26def33 100644
--- a/storm-core/src/clj/backtype/storm/zookeeper.clj
+++ b/storm-core/src/clj/backtype/storm/zookeeper.clj
@@ -18,6 +18,7 @@
   (:import [org.apache.curator.retry RetryNTimes]
            [backtype.storm Config])
   (:import [org.apache.curator.framework.api CuratorEvent CuratorEventType CuratorListener UnhandledErrorListener])
+  (:import [org.apache.curator.framework.state ConnectionStateListener])
   (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory])
   (:import [org.apache.curator.framework.recipes.leader LeaderLatch LeaderLatch$State Participant LeaderLatchListener])
   (:import [org.apache.zookeeper ZooKeeper Watcher KeeperException$NoNodeException
@@ -128,6 +129,17 @@
           ))
       )))
 
+
+(defn sync-path
+  [^CuratorFramework zk ^String path]
+  (try
+    (.. zk (sync) (forPath (normalize-path path)))
+    (catch Exception e (throw (wrap-in-runtime e)))))
+
+
+(defn add-listener [^CuratorFramework zk ^ConnectionStateListener listener]
+  (.. zk (getConnectionStateListenable) (addListener listener)))
+
 (defn get-data
   [^CuratorFramework zk ^String path watch?]
   (let [path (normalize-path path)]
@@ -146,7 +158,7 @@
   (let [stats (org.apache.zookeeper.data.Stat. )
         path (normalize-path path)]
     (try-cause
-     (if-let [data 
+     (if-let [data
               (if (exists-node? zk path watch?)
                 (if watch?
                   (.. zk (getData) (watched) (storingStatIn stats) (forPath path))