You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:31 UTC
[46/50] [abbrv] storm git commit: BUG-40864: Handing zookeeper
failures that can result in nimbus ephemeral entries getting deleted. Adding
a sleep before cody-sycn thread executes ls /code-distributor/topology-id to
ensure it gets the correct id back so
BUG-40864: Handing zookeeper failures that can result in nimbus ephemeral entries getting deleted. Adding a sleep before cody-sycn thread executes ls /code-distributor/topology-id to ensure it gets the correct id back so users dont have to wait for upto 5 minutes to submit topology.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ef3cee6d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ef3cee6d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ef3cee6d
Branch: refs/heads/master
Commit: ef3cee6d5d9d673246dd1c7d2caa44240546e228
Parents: 21ba9c1
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Jul 7 12:16:22 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Aug 12 09:58:57 2015 -0700
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/cluster.clj | 21 ++++++++++++++++++--
.../src/clj/backtype/storm/daemon/nimbus.clj | 2 ++
storm-core/src/clj/backtype/storm/zookeeper.clj | 14 ++++++++++++-
3 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ef3cee6d/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index 65cdb47..04f5b89 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -19,6 +19,8 @@
[backtype.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary]
[java.io Serializable])
(:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
+ (:import [org.apache.curator.framework.state ConnectionStateListener ConnectionState])
+ (:import [org.apache.curator.framework CuratorFramework])
(:import [backtype.storm.utils Utils])
(:import [java.security MessageDigest])
(:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
@@ -41,7 +43,8 @@
(exists-node? [this path watch?])
(close [this])
(register [this callback])
- (unregister [this id]))
+ (unregister [this id])
+ (add-listener [this listener]))
(defn mk-topo-only-acls
[topo-conf]
@@ -137,7 +140,12 @@
(close
[this]
(reset! active false)
- (.close zk)))))
+ (.close zk))
+
+ (add-listener
+ [this listener]
+ (zk/add-listener zk listener))
+ )))
(defprotocol StormClusterState
(assignments [this callback])
@@ -368,6 +376,15 @@
[this nimbus-id nimbus-summary]
;explicit delete for ephmeral node to ensure this session creates the entry.
(delete-node cluster-state (nimbus-path nimbus-id))
+
+ (add-listener cluster-state (reify ConnectionStateListener
+ (^void stateChanged[this ^CuratorFramework client ^ConnectionState newState]
+ (log-message "Connection state listener invoked, zookeeper connection state has changed to " newState)
+ (if (.equals newState ConnectionState/RECONNECTED)
+ (do
+ (log-message "Connection state has changed to reconnected so setting nimbuses entry one more time")
+ (set-ephemeral-node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))))))
+
(set-ephemeral-node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))
(code-distributor-info
http://git-wip-us.apache.org/repos/asf/storm/blob/ef3cee6d/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 4f516ce..fa3d8ec 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -1492,6 +1492,8 @@
(.removeFromLeaderLockQueue (:leader-elector nimbus))
(doseq [missing missing-topologies]
(log-message "missing topology " missing " has state on zookeeper but doesn't have a local dir on this host.")
+ ;; complete heck to get around zookeeper eventual consistency issue. zk/sync is not helping us so adding a sleep.
+ (sleep-secs 5)
(let [nimbuses-with-missing (.code-distributor-info storm-cluster-state missing)]
(log-message "trying to download missing topology code from " (clojure.string/join "," nimbuses-with-missing))
(doseq [nimbus-host-port nimbuses-with-missing]
http://git-wip-us.apache.org/repos/asf/storm/blob/ef3cee6d/storm-core/src/clj/backtype/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/zookeeper.clj b/storm-core/src/clj/backtype/storm/zookeeper.clj
index f714a9e..26def33 100644
--- a/storm-core/src/clj/backtype/storm/zookeeper.clj
+++ b/storm-core/src/clj/backtype/storm/zookeeper.clj
@@ -18,6 +18,7 @@
(:import [org.apache.curator.retry RetryNTimes]
[backtype.storm Config])
(:import [org.apache.curator.framework.api CuratorEvent CuratorEventType CuratorListener UnhandledErrorListener])
+ (:import [org.apache.curator.framework.state ConnectionStateListener])
(:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory])
(:import [org.apache.curator.framework.recipes.leader LeaderLatch LeaderLatch$State Participant LeaderLatchListener])
(:import [org.apache.zookeeper ZooKeeper Watcher KeeperException$NoNodeException
@@ -128,6 +129,17 @@
))
)))
+
+(defn sync-path
+ [^CuratorFramework zk ^String path]
+ (try
+ (.. zk (sync) (forPath (normalize-path path)))
+ (catch Exception e (throw (wrap-in-runtime e)))))
+
+
+(defn add-listener [^CuratorFramework zk ^ConnectionStateListener listener]
+ (.. zk (getConnectionStateListenable) (addListener listener)))
+
(defn get-data
[^CuratorFramework zk ^String path watch?]
(let [path (normalize-path path)]
@@ -146,7 +158,7 @@
(let [stats (org.apache.zookeeper.data.Stat. )
path (normalize-path path)]
(try-cause
- (if-let [data
+ (if-let [data
(if (exists-node? zk path watch?)
(if watch?
(.. zk (getData) (watched) (storingStatIn stats) (forPath path))