You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/01 14:29:00 UTC

[1/2] storm git commit: STORM-2901: Reuse ZK connection for Nimbus for 1.x-branch

Repository: storm
Updated Branches:
  refs/heads/1.x-branch c1a1511f1 -> 49c2fc39f


STORM-2901: Reuse ZK connection for Nimbus for 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60dfa7d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60dfa7d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60dfa7d1

Branch: refs/heads/1.x-branch
Commit: 60dfa7d1b3c8e2500a64db37777de38393354046
Parents: 25fa9dd
Author: chenyuzhao <ch...@meituan.com>
Authored: Tue Jan 23 12:42:46 2018 +0800
Committer: chenyuzhao <ch...@meituan.com>
Committed: Tue Jan 23 12:42:46 2018 +0800

----------------------------------------------------------------------
 .../apache/storm/command/shell_submission.clj   |  7 +++-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 36 ++++++++++++++------
 .../src/clj/org/apache/storm/zookeeper.clj      | 10 +++---
 .../apache/storm/blobstore/BlobStoreUtils.java  |  6 ++--
 .../storm/blobstore/BlobSynchronizer.java       | 16 ++++-----
 .../storm/blobstore/KeySequenceNumber.java      | 25 +++++---------
 .../storm/zookeeper/LeaderElectorImp.java       |  3 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   | 22 ++++++++----
 8 files changed, 72 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 887ab3b..3efcc14 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -23,7 +23,12 @@
 (defn -main [^String tmpjarpath & args]
   (let [conf (read-storm-config)
         ; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok
-        zk-leader-elector (zk-leader-elector conf nil)
+        zk (mk-client conf
+                      (conf STORM-ZOOKEEPER-SERVERS)
+                      (conf STORM-ZOOKEEPER-PORT)
+                      :root (conf STORM-ZOOKEEPER-ROOT)
+                      :auth-conf conf)
+        zk-leader-elector (zk-leader-elector conf zk nil)
         leader-nimbus (.getLeader zk-leader-elector)
         host (.getHost leader-nimbus)
         port (.getPort leader-nimbus)

http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 7607b1b..bc72b29 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -133,6 +133,13 @@
     scheduler
     ))
 
+(defn mk-zk-client [conf]
+  (let [zk-servers (conf STORM-ZOOKEEPER-SERVERS)
+        zk-port (conf STORM-ZOOKEEPER-PORT)
+        zk-root (conf STORM-ZOOKEEPER-ROOT)]
+    (if (and zk-servers zk-port)
+      (mk-client conf zk-servers zk-port :root zk-root :auth-conf conf))))
+
 (defmulti blob-sync cluster-mode)
 
 (defnk is-leader [nimbus :throw-exception true]
@@ -183,7 +190,8 @@
 
 (defn nimbus-data [conf inimbus]
   (let [forced-scheduler (.getForcedScheduler inimbus)
-        blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))]
+        blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))
+        zk-client (mk-zk-client conf)]
     {:conf conf
      :nimbus-host-port-info (NimbusInfo/fromConf conf)
      :inimbus inimbus
@@ -213,7 +221,8 @@
                                  (exit-process! 20 "Error when processing an event")
                                  ))
      :scheduler (mk-scheduler conf inimbus)
-     :leader-elector (zk-leader-elector conf blob-store)
+     :zk-client zk-client
+     :leader-elector (zk-leader-elector conf zk-client blob-store)
      :id->sched-status (atom {})
      :node-id->resources (atom {}) ;;resources of supervisors
      :id->resources (atom {}) ;;resources of topologies
@@ -452,9 +461,9 @@
               supervisor-ids))
        )))
 
-(defn- get-version-for-key [key nimbus-host-port-info conf]
+(defn- get-version-for-key [key nimbus-host-port-info zk-client]
   (let [version (KeySequenceNumber. key nimbus-host-port-info)]
-    (.getKeySequenceNumber version conf)))
+    (.getKeySequenceNumber version zk-client)))
 
 (defn get-key-seq-from-blob-store [blob-store]
   (let [key-iter (.listKeys blob-store)]
@@ -464,6 +473,7 @@
   (let [subject (get-subject)
         storm-cluster-state (:storm-cluster-state nimbus)
         blob-store (:blob-store nimbus)
+        zk-client (:zk-client nimbus)
         jar-key (master-stormjar-key storm-id)
         code-key (master-stormcode-key storm-id)
         conf-key (master-stormconf-key storm-id)
@@ -471,13 +481,13 @@
     (when tmp-jar-location ;;in local mode there is no jar
       (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
       (if (instance? LocalFsBlobStore blob-store)
-        (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info conf))))
+        (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info zk-client))))
     (.createBlob blob-store conf-key (Utils/toCompressedJsonConf storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
     (if (instance? LocalFsBlobStore blob-store)
-      (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info conf)))
+      (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info zk-client)))
     (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
     (if (instance? LocalFsBlobStore blob-store)
-      (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info conf)))))
+      (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info zk-client)))))
 
 (defn- read-storm-topology [storm-id blob-store]
   (Utils/deserialize
@@ -1115,7 +1125,7 @@
 
 (defn try-read-storm-conf [conf storm-id blob-store]
   (try-cause
-    (read-storm-conf-as-nimbus conf storm-id blob-store)
+    (read-storm-conf-as-nimbus storm-id blob-store)
     (catch KeyNotFoundException e
        (throw (NotAliveException. (str storm-id))))))
 
@@ -1307,6 +1317,7 @@
   "Sets up blobstore state for all current keys."
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         blob-store (:blob-store nimbus)
+        zk-client (:zk-client nimbus)
         local-set-of-keys (set (get-key-seq-from-blob-store blob-store))
         all-keys (set (.active-keys storm-cluster-state))
         locally-available-active-keys (set/intersection local-set-of-keys all-keys)
@@ -1319,7 +1330,7 @@
     (log-debug "Creating list of key entries for blobstore inside zookeeper" all-keys "local" locally-available-active-keys)
     (doseq [key locally-available-active-keys]
       (try
-        (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf))
+        (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info zk-client))
         (catch KeyNotFoundException _
           ; invalid key, remove it from blobstore
           (.deleteBlob blob-store key nimbus-subject))))))
@@ -1473,6 +1484,7 @@
 (defmethod blob-sync :distributed [conf nimbus]
   (if (not (is-leader nimbus :throw-exception false))
     (let [storm-cluster-state (:storm-cluster-state nimbus)
+          zk-client (:zk-client nimbus)
           nimbus-host-port-info (:nimbus-host-port-info nimbus)
           blob-store-key-set (set (get-key-seq-from-blob-store (:blob-store nimbus)))
           zk-key-set (set (.blobstore storm-cluster-state (fn [] (blob-sync conf nimbus))))]
@@ -1481,7 +1493,8 @@
                           (BlobSynchronizer. (:blob-store nimbus) conf)
                           (.setNimbusInfo nimbus-host-port-info)
                           (.setBlobStoreKeySet blob-store-key-set)
-                          (.setZookeeperKeySet zk-key-set))]
+                          (.setZookeeperKeySet zk-key-set)
+                          (.setZkClient zk-client))]
         (.syncBlobs sync-blobs)))))
 
 (defmethod blob-sync :local [conf nimbus]
@@ -2096,10 +2109,11 @@
     (^void createStateInZookeeper [this ^String blob-key]
       (let [storm-cluster-state (:storm-cluster-state nimbus)
             blob-store (:blob-store nimbus)
+            zk-client (:zk-client nimbus)
             nimbus-host-port-info (:nimbus-host-port-info nimbus)
             conf (:conf nimbus)]
         (if (instance? LocalFsBlobStore blob-store)
-          (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
+          (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info zk-client)))
         (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info)))
 
     (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]

http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/clj/org/apache/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/zookeeper.clj b/storm-core/src/clj/org/apache/storm/zookeeper.clj
index ca41093..12e8ad7 100644
--- a/storm-core/src/clj/org/apache/storm/zookeeper.clj
+++ b/storm-core/src/clj/org/apache/storm/zookeeper.clj
@@ -252,10 +252,8 @@
 
 (defn zk-leader-elector
   "Zookeeper Implementation of ILeaderElector."
-  [conf blob-store]
-  (let [servers (conf STORM-ZOOKEEPER-SERVERS)
-        zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)
-        leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
+  [conf zk blob-store]
+  (let [leader-lock-path "/leader-lock"
         id (.toHostPortString (NimbusInfo/fromConf conf))
         leader-latch (atom (LeaderLatch. zk leader-lock-path id))
         leader-latch-listener (atom (Zookeeper/leaderLatchListenerImpl conf zk blob-store @leader-latch))
@@ -302,5 +300,5 @@
             participants)))
 
       (^void close[this]
-        (log-message "closing zookeeper connection of leader elector.")
-        (.close zk)))))
+        ;;Do nothing now.
+        ))))

http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
index f1eb2f4..569aef2 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -51,6 +51,10 @@ public class BlobStoreUtils {
     private static final String BLOB_DEPENDENCIES_PREFIX = "dep-";
     private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class);
 
+    public static String getBlobStoreSubtree() {
+        return BLOBSTORE_SUBTREE;
+    }
+
     public static CuratorFramework createZKClient(Map conf) {
         List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
         Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
@@ -285,6 +289,4 @@ public class BlobStoreUtils {
         }
         return fileName;
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
index f035709..b581e12 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
@@ -17,16 +17,16 @@
  */
 package org.apache.storm.blobstore;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.curator.framework.CuratorFramework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper
  * for a non leader nimbus trying to be in sync with the operations performed on the leader nimbus.
@@ -57,6 +57,10 @@ public class BlobSynchronizer {
         this.blobStoreKeySet = blobStoreKeySet;
     }
 
+    public void setZkClient(CuratorFramework zkClient) {
+        this.zkClient = zkClient;
+    }
+
     public Set<String> getBlobStoreKeySet() {
         Set<String> keySet = new HashSet<String>();
         keySet.addAll(blobStoreKeySet);
@@ -72,7 +76,6 @@ public class BlobSynchronizer {
     public synchronized void syncBlobs() {
         try {
             LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}",getBlobStoreKeySet(), getZookeeperKeySet());
-            zkClient = BlobStoreUtils.createZKClient(conf);
             deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet());
             updateKeySetForBlobStore(getBlobStoreKeySet());
             Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
@@ -89,9 +92,6 @@ public class BlobSynchronizer {
                     LOG.debug("Detected deletion for the key {} while downloading - skipping download", key);
                 }
             }
-            if (zkClient !=null) {
-                zkClient.close();
-            }
         } catch(InterruptedException exp) {
             LOG.error("InterruptedException {}", exp);
         } catch(Exception exp) {

http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
index adbd4c4..570e0ad 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
@@ -18,21 +18,20 @@
 
 package org.apache.storm.blobstore;
 
+import java.nio.ByteBuffer;
+import java.util.TreeSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.Utils;
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.TreeSet;
-import java.util.Map;
-import java.util.List;
-
 /**
  * Class hands over the key sequence number which implies the number of updates made to a blob.
  * The information regarding the keys and the sequence number which represents the number of updates are
@@ -120,7 +119,6 @@ import java.util.List;
  */
 public class KeySequenceNumber {
     private static final Logger LOG = LoggerFactory.getLogger(KeySequenceNumber.class);
-    private final String BLOBSTORE_SUBTREE="/blobstore";
     private final String BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber";
     private final String key;
     private final NimbusInfo nimbusInfo;
@@ -132,12 +130,11 @@ public class KeySequenceNumber {
         this.nimbusInfo = nimbusInfo;
     }
 
-    public synchronized int getKeySequenceNumber(Map conf) throws KeyNotFoundException {
+    public synchronized int getKeySequenceNumber(CuratorFramework zkClient) throws KeyNotFoundException {
         TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
-        CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
         try {
             // Key has not been created yet and it is the first time it is being created
-            if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) {
+            if (zkClient.checkExists().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + key) == null) {
                 zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
                         .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key);
                 zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key,
@@ -148,7 +145,7 @@ public class KeySequenceNumber {
             // When all nimbodes go down and one or few of them come up
             // Unfortunately there might not be an exact way to know which one contains the most updated blob,
             // if all go down which is unlikely. Hence there might be a need to update the blob if all go down.
-            List<String> stateInfoList = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+            List<String> stateInfoList = zkClient.getChildren().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + key);
             LOG.debug("stateInfoList-size {} stateInfoList-data {}", stateInfoList.size(), stateInfoList);
             if (stateInfoList.isEmpty()) {
                 return getMaxSequenceNumber(zkClient);
@@ -208,10 +205,6 @@ public class KeySequenceNumber {
             // in other case, just set this to 0 to trigger re-sync later
             LOG.error("Exception {}", e);
             return INITIAL_SEQUENCE_NUMBER - 1;
-        } finally {
-            if (zkClient != null) {
-                zkClient.close();
-            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
index 74816c2..2f61430 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -118,7 +118,6 @@ public class LeaderElectorImp implements ILeaderElector {
 
     @Override
     public void close() {
-        LOG.info("closing zookeeper connection of leader elector.");
-        zk.close();
+        //Do nothing now.
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index a2ad797..b2e2236 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -346,7 +346,7 @@ public class Zookeeper {
 
             @Override
             public void isLeader() {
-                Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
+                Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, ClusterUtils.STORMS_SUBTREE, false));
 
                 Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
                 Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
@@ -454,15 +454,23 @@ public class Zookeeper {
         };
     }
 
-    public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {
-        return _instance.zkLeaderElectorImpl(conf, blobStore);
+    /**
+     * Get master leader elector.
+     * @param conf Config.
+     * @param zkClient ZkClient, the client must have a default Config.STORM_ZOOKEEPER_ROOT as root path.
+     * @param blobStore {@link BlobStore}
+     * @return Instance of {@link ILeaderElector}
+     * @throws UnknownHostException
+     */
+    public static ILeaderElector zkLeaderElector(Map conf, CuratorFramework zkClient,
+        BlobStore blobStore) throws UnknownHostException {
+        return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore);
     }
 
-    protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {
+    protected ILeaderElector zkLeaderElectorImpl(Map conf, CuratorFramework zk, BlobStore blobStore)
+        throws UnknownHostException {
         List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
-        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
-        CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
-        String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";
+        String leaderLockPath = "/leader-lock";
         String id = NimbusInfo.fromConf(conf).toHostPortString();
         AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
         AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =


[2/2] storm git commit: Merge branch 'reuse-zk-1.x' of https://github.com/danny0405/storm into STORM-2901-1.x-merge

Posted by ka...@apache.org.
Merge branch 'reuse-zk-1.x' of https://github.com/danny0405/storm into STORM-2901-1.x-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/49c2fc39
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/49c2fc39
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/49c2fc39

Branch: refs/heads/1.x-branch
Commit: 49c2fc39fa62467d7fcc4f7e0c2eeec86e5e8d4c
Parents: c1a1511 60dfa7d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Feb 1 23:21:26 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Feb 1 23:21:26 2018 +0900

----------------------------------------------------------------------
 .../apache/storm/command/shell_submission.clj   |  7 +++-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 36 ++++++++++++++------
 .../src/clj/org/apache/storm/zookeeper.clj      | 10 +++---
 .../apache/storm/blobstore/BlobStoreUtils.java  |  6 ++--
 .../storm/blobstore/BlobSynchronizer.java       | 16 ++++-----
 .../storm/blobstore/KeySequenceNumber.java      | 25 +++++---------
 .../storm/zookeeper/LeaderElectorImp.java       |  3 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   | 22 ++++++++----
 8 files changed, 72 insertions(+), 53 deletions(-)
----------------------------------------------------------------------