You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/01 14:29:00 UTC
[1/2] storm git commit: STORM-2901: Reuse ZK connection for Nimbus
for 1.x-branch
Repository: storm
Updated Branches:
refs/heads/1.x-branch c1a1511f1 -> 49c2fc39f
STORM-2901: Reuse ZK connection for Nimbus for 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60dfa7d1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60dfa7d1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60dfa7d1
Branch: refs/heads/1.x-branch
Commit: 60dfa7d1b3c8e2500a64db37777de38393354046
Parents: 25fa9dd
Author: chenyuzhao <ch...@meituan.com>
Authored: Tue Jan 23 12:42:46 2018 +0800
Committer: chenyuzhao <ch...@meituan.com>
Committed: Tue Jan 23 12:42:46 2018 +0800
----------------------------------------------------------------------
.../apache/storm/command/shell_submission.clj | 7 +++-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 36 ++++++++++++++------
.../src/clj/org/apache/storm/zookeeper.clj | 10 +++---
.../apache/storm/blobstore/BlobStoreUtils.java | 6 ++--
.../storm/blobstore/BlobSynchronizer.java | 16 ++++-----
.../storm/blobstore/KeySequenceNumber.java | 25 +++++---------
.../storm/zookeeper/LeaderElectorImp.java | 3 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 22 ++++++++----
8 files changed, 72 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 887ab3b..3efcc14 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -23,7 +23,12 @@
(defn -main [^String tmpjarpath & args]
(let [conf (read-storm-config)
; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok
- zk-leader-elector (zk-leader-elector conf nil)
+ zk (mk-client conf
+ (conf STORM-ZOOKEEPER-SERVERS)
+ (conf STORM-ZOOKEEPER-PORT)
+ :root (conf STORM-ZOOKEEPER-ROOT)
+ :auth-conf conf)
+ zk-leader-elector (zk-leader-elector conf zk nil)
leader-nimbus (.getLeader zk-leader-elector)
host (.getHost leader-nimbus)
port (.getPort leader-nimbus)
http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 7607b1b..bc72b29 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -133,6 +133,13 @@
scheduler
))
+(defn mk-zk-client [conf]
+ (let [zk-servers (conf STORM-ZOOKEEPER-SERVERS)
+ zk-port (conf STORM-ZOOKEEPER-PORT)
+ zk-root (conf STORM-ZOOKEEPER-ROOT)]
+ (if (and zk-servers zk-port)
+ (mk-client conf zk-servers zk-port :root zk-root :auth-conf conf))))
+
(defmulti blob-sync cluster-mode)
(defnk is-leader [nimbus :throw-exception true]
@@ -183,7 +190,8 @@
(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)
- blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))]
+ blob-store (Utils/getNimbusBlobStore conf (NimbusInfo/fromConf conf))
+ zk-client (mk-zk-client conf)]
{:conf conf
:nimbus-host-port-info (NimbusInfo/fromConf conf)
:inimbus inimbus
@@ -213,7 +221,8 @@
(exit-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
- :leader-elector (zk-leader-elector conf blob-store)
+ :zk-client zk-client
+ :leader-elector (zk-leader-elector conf zk-client blob-store)
:id->sched-status (atom {})
:node-id->resources (atom {}) ;;resources of supervisors
:id->resources (atom {}) ;;resources of topologies
@@ -452,9 +461,9 @@
supervisor-ids))
)))
-(defn- get-version-for-key [key nimbus-host-port-info conf]
+(defn- get-version-for-key [key nimbus-host-port-info zk-client]
(let [version (KeySequenceNumber. key nimbus-host-port-info)]
- (.getKeySequenceNumber version conf)))
+ (.getKeySequenceNumber version zk-client)))
(defn get-key-seq-from-blob-store [blob-store]
(let [key-iter (.listKeys blob-store)]
@@ -464,6 +473,7 @@
(let [subject (get-subject)
storm-cluster-state (:storm-cluster-state nimbus)
blob-store (:blob-store nimbus)
+ zk-client (:zk-client nimbus)
jar-key (master-stormjar-key storm-id)
code-key (master-stormcode-key storm-id)
conf-key (master-stormconf-key storm-id)
@@ -471,13 +481,13 @@
(when tmp-jar-location ;;in local mode there is no jar
(.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
(if (instance? LocalFsBlobStore blob-store)
- (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info conf))))
+ (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info zk-client))))
(.createBlob blob-store conf-key (Utils/toCompressedJsonConf storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
(if (instance? LocalFsBlobStore blob-store)
- (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info conf)))
+ (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info zk-client)))
(.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
(if (instance? LocalFsBlobStore blob-store)
- (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info conf)))))
+ (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info zk-client)))))
(defn- read-storm-topology [storm-id blob-store]
(Utils/deserialize
@@ -1115,7 +1125,7 @@
(defn try-read-storm-conf [conf storm-id blob-store]
(try-cause
- (read-storm-conf-as-nimbus conf storm-id blob-store)
+ (read-storm-conf-as-nimbus storm-id blob-store)
(catch KeyNotFoundException e
(throw (NotAliveException. (str storm-id))))))
@@ -1307,6 +1317,7 @@
"Sets up blobstore state for all current keys."
(let [storm-cluster-state (:storm-cluster-state nimbus)
blob-store (:blob-store nimbus)
+ zk-client (:zk-client nimbus)
local-set-of-keys (set (get-key-seq-from-blob-store blob-store))
all-keys (set (.active-keys storm-cluster-state))
locally-available-active-keys (set/intersection local-set-of-keys all-keys)
@@ -1319,7 +1330,7 @@
(log-debug "Creating list of key entries for blobstore inside zookeeper" all-keys "local" locally-available-active-keys)
(doseq [key locally-available-active-keys]
(try
- (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf))
+ (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info zk-client))
(catch KeyNotFoundException _
; invalid key, remove it from blobstore
(.deleteBlob blob-store key nimbus-subject))))))
@@ -1473,6 +1484,7 @@
(defmethod blob-sync :distributed [conf nimbus]
(if (not (is-leader nimbus :throw-exception false))
(let [storm-cluster-state (:storm-cluster-state nimbus)
+ zk-client (:zk-client nimbus)
nimbus-host-port-info (:nimbus-host-port-info nimbus)
blob-store-key-set (set (get-key-seq-from-blob-store (:blob-store nimbus)))
zk-key-set (set (.blobstore storm-cluster-state (fn [] (blob-sync conf nimbus))))]
@@ -1481,7 +1493,8 @@
(BlobSynchronizer. (:blob-store nimbus) conf)
(.setNimbusInfo nimbus-host-port-info)
(.setBlobStoreKeySet blob-store-key-set)
- (.setZookeeperKeySet zk-key-set))]
+ (.setZookeeperKeySet zk-key-set)
+ (.setZkClient zk-client))]
(.syncBlobs sync-blobs)))))
(defmethod blob-sync :local [conf nimbus]
@@ -2096,10 +2109,11 @@
(^void createStateInZookeeper [this ^String blob-key]
(let [storm-cluster-state (:storm-cluster-state nimbus)
blob-store (:blob-store nimbus)
+ zk-client (:zk-client nimbus)
nimbus-host-port-info (:nimbus-host-port-info nimbus)
conf (:conf nimbus)]
(if (instance? LocalFsBlobStore blob-store)
- (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf)))
+ (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info zk-client)))
(log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info)))
(^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/clj/org/apache/storm/zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/zookeeper.clj b/storm-core/src/clj/org/apache/storm/zookeeper.clj
index ca41093..12e8ad7 100644
--- a/storm-core/src/clj/org/apache/storm/zookeeper.clj
+++ b/storm-core/src/clj/org/apache/storm/zookeeper.clj
@@ -252,10 +252,8 @@
(defn zk-leader-elector
"Zookeeper Implementation of ILeaderElector."
- [conf blob-store]
- (let [servers (conf STORM-ZOOKEEPER-SERVERS)
- zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)
- leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
+ [conf zk blob-store]
+ (let [leader-lock-path "/leader-lock"
id (.toHostPortString (NimbusInfo/fromConf conf))
leader-latch (atom (LeaderLatch. zk leader-lock-path id))
leader-latch-listener (atom (Zookeeper/leaderLatchListenerImpl conf zk blob-store @leader-latch))
@@ -302,5 +300,5 @@
participants)))
(^void close[this]
- (log-message "closing zookeeper connection of leader elector.")
- (.close zk)))))
+ ;;Do nothing now.
+ ))))
http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
index f1eb2f4..569aef2 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobStoreUtils.java
@@ -51,6 +51,10 @@ public class BlobStoreUtils {
private static final String BLOB_DEPENDENCIES_PREFIX = "dep-";
private static final Logger LOG = LoggerFactory.getLogger(BlobStoreUtils.class);
+ public static String getBlobStoreSubtree() {
+ return BLOBSTORE_SUBTREE;
+ }
+
public static CuratorFramework createZKClient(Map conf) {
List<String> zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
@@ -285,6 +289,4 @@ public class BlobStoreUtils {
}
return fileName;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
index f035709..b581e12 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/BlobSynchronizer.java
@@ -17,16 +17,16 @@
*/
package org.apache.storm.blobstore;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
/**
* Is called periodically and updates the nimbus with blobs based on the state stored inside the zookeeper
* for a non leader nimbus trying to be in sync with the operations performed on the leader nimbus.
@@ -57,6 +57,10 @@ public class BlobSynchronizer {
this.blobStoreKeySet = blobStoreKeySet;
}
+ public void setZkClient(CuratorFramework zkClient) {
+ this.zkClient = zkClient;
+ }
+
public Set<String> getBlobStoreKeySet() {
Set<String> keySet = new HashSet<String>();
keySet.addAll(blobStoreKeySet);
@@ -72,7 +76,6 @@ public class BlobSynchronizer {
public synchronized void syncBlobs() {
try {
LOG.debug("Sync blobs - blobstore keys {}, zookeeper keys {}",getBlobStoreKeySet(), getZookeeperKeySet());
- zkClient = BlobStoreUtils.createZKClient(conf);
deleteKeySetFromBlobStoreNotOnZookeeper(getBlobStoreKeySet(), getZookeeperKeySet());
updateKeySetForBlobStore(getBlobStoreKeySet());
Set<String> keySetToDownload = getKeySetToDownload(getBlobStoreKeySet(), getZookeeperKeySet());
@@ -89,9 +92,6 @@ public class BlobSynchronizer {
LOG.debug("Detected deletion for the key {} while downloading - skipping download", key);
}
}
- if (zkClient !=null) {
- zkClient.close();
- }
} catch(InterruptedException exp) {
LOG.error("InterruptedException {}", exp);
} catch(Exception exp) {
http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
index adbd4c4..570e0ad 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/KeySequenceNumber.java
@@ -18,21 +18,20 @@
package org.apache.storm.blobstore;
+import java.nio.ByteBuffer;
+import java.util.TreeSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.curator.framework.CuratorFramework;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.Utils;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.util.TreeSet;
-import java.util.Map;
-import java.util.List;
-
/**
* Class hands over the key sequence number which implies the number of updates made to a blob.
* The information regarding the keys and the sequence number which represents the number of updates are
@@ -120,7 +119,6 @@ import java.util.List;
*/
public class KeySequenceNumber {
private static final Logger LOG = LoggerFactory.getLogger(KeySequenceNumber.class);
- private final String BLOBSTORE_SUBTREE="/blobstore";
private final String BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE="/blobstoremaxkeysequencenumber";
private final String key;
private final NimbusInfo nimbusInfo;
@@ -132,12 +130,11 @@ public class KeySequenceNumber {
this.nimbusInfo = nimbusInfo;
}
- public synchronized int getKeySequenceNumber(Map conf) throws KeyNotFoundException {
+ public synchronized int getKeySequenceNumber(CuratorFramework zkClient) throws KeyNotFoundException {
TreeSet<Integer> sequenceNumbers = new TreeSet<Integer>();
- CuratorFramework zkClient = BlobStoreUtils.createZKClient(conf);
try {
// Key has not been created yet and it is the first time it is being created
- if (zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) == null) {
+ if (zkClient.checkExists().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + key) == null) {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key);
zkClient.setData().forPath(BLOBSTORE_MAX_KEY_SEQUENCE_SUBTREE + "/" + key,
@@ -148,7 +145,7 @@ public class KeySequenceNumber {
// When all nimbodes go down and one or few of them come up
// Unfortunately there might not be an exact way to know which one contains the most updated blob,
// if all go down which is unlikely. Hence there might be a need to update the blob if all go down.
- List<String> stateInfoList = zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+ List<String> stateInfoList = zkClient.getChildren().forPath(BlobStoreUtils.getBlobStoreSubtree() + "/" + key);
LOG.debug("stateInfoList-size {} stateInfoList-data {}", stateInfoList.size(), stateInfoList);
if (stateInfoList.isEmpty()) {
return getMaxSequenceNumber(zkClient);
@@ -208,10 +205,6 @@ public class KeySequenceNumber {
// in other case, just set this to 0 to trigger re-sync later
LOG.error("Exception {}", e);
return INITIAL_SEQUENCE_NUMBER - 1;
- } finally {
- if (zkClient != null) {
- zkClient.close();
- }
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
index 74816c2..2f61430 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -118,7 +118,6 @@ public class LeaderElectorImp implements ILeaderElector {
@Override
public void close() {
- LOG.info("closing zookeeper connection of leader elector.");
- zk.close();
+ //Do nothing now.
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/60dfa7d1/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index a2ad797..b2e2236 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -346,7 +346,7 @@ public class Zookeeper {
@Override
public void isLeader() {
- Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
+ Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, ClusterUtils.STORMS_SUBTREE, false));
Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
@@ -454,15 +454,23 @@ public class Zookeeper {
};
}
- public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {
- return _instance.zkLeaderElectorImpl(conf, blobStore);
+ /**
+ * Get master leader elector.
+ * @param conf Config.
+ * @param zkClient ZkClient, the client must have a default Config.STORM_ZOOKEEPER_ROOT as root path.
+ * @param blobStore {@link BlobStore}
+ * @return Instance of {@link ILeaderElector}
+ * @throws UnknownHostException
+ */
+ public static ILeaderElector zkLeaderElector(Map conf, CuratorFramework zkClient,
+ BlobStore blobStore) throws UnknownHostException {
+ return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore);
}
- protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {
+ protected ILeaderElector zkLeaderElectorImpl(Map conf, CuratorFramework zk, BlobStore blobStore)
+ throws UnknownHostException {
List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
- Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
- CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
- String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";
+ String leaderLockPath = "/leader-lock";
String id = NimbusInfo.fromConf(conf).toHostPortString();
AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
[2/2] storm git commit: Merge branch 'reuse-zk-1.x' of
https://github.com/danny0405/storm into STORM-2901-1.x-merge
Posted by ka...@apache.org.
Merge branch 'reuse-zk-1.x' of https://github.com/danny0405/storm into STORM-2901-1.x-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/49c2fc39
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/49c2fc39
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/49c2fc39
Branch: refs/heads/1.x-branch
Commit: 49c2fc39fa62467d7fcc4f7e0c2eeec86e5e8d4c
Parents: c1a1511 60dfa7d
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Feb 1 23:21:26 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Feb 1 23:21:26 2018 +0900
----------------------------------------------------------------------
.../apache/storm/command/shell_submission.clj | 7 +++-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 36 ++++++++++++++------
.../src/clj/org/apache/storm/zookeeper.clj | 10 +++---
.../apache/storm/blobstore/BlobStoreUtils.java | 6 ++--
.../storm/blobstore/BlobSynchronizer.java | 16 ++++-----
.../storm/blobstore/KeySequenceNumber.java | 25 +++++---------
.../storm/zookeeper/LeaderElectorImp.java | 3 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 22 ++++++++----
8 files changed, 72 insertions(+), 53 deletions(-)
----------------------------------------------------------------------