You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/18 14:39:34 UTC
[1/2] storm git commit: STORM-2740: Add in caching of topology and
conf to nimbus
Repository: storm
Updated Branches:
refs/heads/master 05a74c73d -> da2f03586
STORM-2740: Add in caching of topology and conf to nimbus
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ac8d37b9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ac8d37b9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ac8d37b9
Branch: refs/heads/master
Commit: ac8d37b9fdb43197b143901bd6d3212601265bca
Parents: 124acb9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Sep 14 17:02:24 2017 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Sep 15 11:15:06 2017 -0500
----------------------------------------------------------------------
.../org/apache/storm/blobstore/BlobStore.java | 65 +++--
.../src/jvm/org/apache/storm/utils/Utils.java | 12 +-
.../apache/storm/command/shell_submission.clj | 4 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 72 +++---
.../apache/storm/security/auth/auth_test.clj | 2 +-
.../storm/security/auth/nimbus_auth_test.clj | 14 +-
.../java/org/apache/storm/LocalCluster.java | 18 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 130 +++++-----
.../apache/storm/daemon/nimbus/TopoCache.java | 244 +++++++++++++++++++
.../storm/zookeeper/LeaderElectorImp.java | 10 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 27 +-
11 files changed, 446 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
index bb177a1..406ac8b 100644
--- a/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
+++ b/storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java
@@ -27,23 +27,20 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.regex.Pattern;
-
import javax.security.auth.Subject;
-
-import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.Utils;
import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.ReadableBlobMeta;
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Provides a way to store blobs that can be downloaded.
@@ -232,6 +229,30 @@ public abstract class BlobStore implements Shutdownable {
* Wrapper called to create the blob which contains
* the byte data
* @param key Key for the blob.
+ * @param data Byte data that needs to be uploaded.
+ * @param who Is the subject creating the blob.
+ * @throws AuthorizationException
+ * @throws IOException
+ * @throws KeyNotFoundException
+ */
+ public void updateBlob(String key, byte [] data, Subject who) throws AuthorizationException, IOException, KeyNotFoundException {
+ AtomicOutputStream out = null;
+ try {
+ out = updateBlob(key, who);
+ out.write(data);
+ out.close();
+ out = null;
+ } finally {
+ if (out != null) {
+ out.cancel();
+ }
+ }
+ }
+
+ /**
+ * Wrapper called to create the blob which contains
+ * the byte data
+ * @param key Key for the blob.
* @param in InputStream from which the data is read to be
* written as a part of the blob.
* @param meta Metadata which contains the acls information
@@ -305,32 +326,6 @@ public abstract class BlobStore implements Shutdownable {
out.close();
return bytes;
}
-
- /**
- * Helper method to read a stored topology
- * @param topoId the id of the topology to read
- * @param who who to read it as
- * @return the deserialized topology.
- * @throws IOException on any error while reading the blob.
- * @throws AuthorizationException if who is not allowed to read the blob
- * @throws KeyNotFoundException if the blob could not be found
- */
- public StormTopology readTopology(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException {
- return Utils.deserialize(readBlob(ConfigUtils.masterStormCodeKey(topoId), who), StormTopology.class);
- }
-
- /**
- * Helper method to read a stored topology config
- * @param topoId the id of the topology whose conf we are reading
- * @param who who we are reading this as
- * @return the deserialized config
- * @throws KeyNotFoundException if the blob could not be found
- * @throws AuthorizationException if who is not allowed to read the blob
- * @throws IOException on any error while reading the blob.
- */
- public Map<String, Object> readTopologyConf(String topoId, Subject who) throws KeyNotFoundException, AuthorizationException, IOException {
- return Utils.fromCompressedJsonConf(readBlob(ConfigUtils.masterStormConfKey(topoId), who));
- }
private static final KeyFilter<String> TO_TOPO_ID = (key) -> ConfigUtils.getIdFromBlobKey(key);
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index a8820e6..a028935 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -1104,21 +1104,11 @@ public class Utils {
public static long getVersionFromBlobVersionFile(File versionFile) {
long currentVersion = 0;
if (versionFile.exists() && !(versionFile.isDirectory())) {
- BufferedReader br = null;
- try {
- br = new BufferedReader(new FileReader(versionFile));
+ try (BufferedReader br = new BufferedReader(new FileReader(versionFile))) {
String line = br.readLine();
currentVersion = Long.parseLong(line);
} catch (IOException e) {
throw new RuntimeException(e);
- } finally {
- try {
- if (br != null) {
- br.close();
- }
- } catch (Exception ignore) {
- LOG.error("Exception trying to cleanup", ignore);
- }
}
return currentVersion;
} else {
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 4b4bc37..8aee299 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -25,8 +25,8 @@
(defn -main [^String tmpjarpath & args]
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))
- ; since this is not a purpose to add to leader lock queue, passing nil as blob-store is ok
- zk-leader-elector (Zookeeper/zkLeaderElector conf nil)
+ ; since this is not a purpose to add to leader lock queue, passing nil as blob-store and topo cache is ok
+ zk-leader-elector (Zookeeper/zkLeaderElector conf nil nil)
leader-nimbus (.getLeader zk-leader-elector)
host (.getHost leader-nimbus)
port (.getPort leader-nimbus)
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index c4f3fad..dcea44e 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -21,7 +21,7 @@
TestAggregatesCounter TestPlannerSpout TestPlannerBolt]
[org.apache.storm.blobstore BlobStore]
[org.apache.storm.nimbus InMemoryTopologyActionNotifier]
- [org.apache.storm.daemon.nimbus Nimbus Nimbus$StandaloneINimbus]
+ [org.apache.storm.daemon.nimbus TopoCache Nimbus Nimbus$StandaloneINimbus]
[org.apache.storm.generated GlobalStreamId TopologyStatus SupervisorInfo StormTopology StormBase]
[org.apache.storm LocalCluster LocalCluster$Builder Thrift MockAutoCred Testing Testing$Condition]
[org.apache.storm.stats BoltExecutorStats StatsUtil]
@@ -56,7 +56,7 @@
(defn- mk-nimbus
[conf inimbus blob-store leader-elector group-mapper cluster-state]
- (Nimbus. conf inimbus cluster-state nil blob-store leader-elector group-mapper))
+ (Nimbus. conf inimbus cluster-state nil blob-store nil leader-elector group-mapper))
(defn- from-json
[^String str]
@@ -1307,7 +1307,7 @@
(with-open [zk (InProcessZookeeper. )]
(with-open [tmp-nimbus-dir (TmpPath.)
_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+ (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))]
(let [nimbus-dir (.getPath tmp-nimbus-dir)]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -1324,7 +1324,7 @@
{}))
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. false))))]
+ (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. false))))]
(letlocals
(bind non-leader-cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
@@ -1384,11 +1384,13 @@
(deftest test-nimbus-iface-methods-check-authorization
(let [cluster-state (Mockito/mock IStormClusterState)
- blob-store (Mockito/mock BlobStore)]
+ blob-store (Mockito/mock BlobStore)
+ tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
+ (.withTopoCache tc)
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
topology-name "test"
@@ -1403,11 +1405,13 @@
(deftest test-nimbus-check-authorization-params
(let [cluster-state (Mockito/mock IStormClusterState)
- blob-store (Mockito/mock BlobStore)]
+ blob-store (Mockito/mock BlobStore)
+ tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
+ (.withTopoCache tc)
(.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
@@ -1418,8 +1422,8 @@
expected-conf {TOPOLOGY-NAME expected-name
"foo" "bar"}]
(.thenReturn (Mockito/when (.getTopoId cluster-state topology-name)) (Optional/of topology-id))
- (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) expected-conf)
- (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/anyObject))) nil)
+ (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) expected-conf)
+ (.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/anyObject))) nil)
(testing "getTopologyConf calls check-authorization! with the correct parameters."
(let [expected-operation "getTopologyConf"]
(try
@@ -1456,15 +1460,17 @@
(finally
(.checkAuthorization (Mockito/verify nimbus) (Mockito/eq topology-name) (Mockito/any Map) (Mockito/eq expected-operation))
;;One for this time and one for getTopology call
- (.readTopology (Mockito/verify blob-store (Mockito/times 2)) (Mockito/eq topology-id) (Mockito/anyObject))))))))))
+ (.readTopology (Mockito/verify tc (Mockito/times 2)) (Mockito/eq topology-id) (Mockito/anyObject))))))))))
(deftest test-check-authorization-getSupervisorPageInfo
(let [cluster-state (Mockito/mock IStormClusterState)
- blob-store (Mockito/mock BlobStore)]
+ blob-store (Mockito/mock BlobStore)
+ tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
+ (.withTopoCache tc)
(.withNimbusWrapper (reify UnaryOperator (apply [this nimbus] (Mockito/spy nimbus))))
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
@@ -1494,8 +1500,8 @@
(.put "super2" (doto (SupervisorInfo.) (.set_hostname "host2") (.set_meta [(long 1234)])
(.set_uptime_secs (long 123)) (.set_meta [1 2 3]) (.set_used_ports []) (.set_resources_map {}))))]
(.thenReturn (Mockito/when (.allSupervisorInfo cluster-state)) all-supervisors)
- (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/any Subject))) expected-conf)
- (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology)
+ (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/any Subject))) expected-conf)
+ (.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/any Subject))) topology)
(.thenReturn (Mockito/when (.topologyAssignments cluster-state)) topo-assignment)
(.getSupervisorPageInfo nimbus "super1" nil true)
@@ -1550,10 +1556,12 @@
(deftest test-nimbus-iface-getClusterInfo-filters-topos-without-bases
(let [cluster-state (Mockito/mock IStormClusterState)
- blob-store (Mockito/mock BlobStore)]
+ blob-store (Mockito/mock BlobStore)
+ tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
+ (.withTopoCache tc)
(.withBlobStore blob-store)))]
(let [nimbus (.getNimbus cluster)
bogus-secs 42
@@ -1576,8 +1584,8 @@
]
(.thenReturn (Mockito/when (.stormBase cluster-state (Mockito/any String) (Mockito/anyObject))) storm-base)
(.thenReturn (Mockito/when (.topologyBases cluster-state)) bogus-bases)
- (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/any Subject))) topo-conf)
- (.thenReturn (Mockito/when (.readTopology blob-store (Mockito/any String) (Mockito/any Subject))) topology)
+ (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/any Subject))) topo-conf)
+ (.thenReturn (Mockito/when (.readTopology tc (Mockito/any String) (Mockito/any Subject))) topology)
(let [topos (.get_topologies (.getClusterInfo nimbus))]
; The number of topologies in the summary is correct.
@@ -1625,7 +1633,7 @@
_ (UtilsInstaller. fake-utils)
- (StormCommonInstaller. fake-common)
zk-le (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf blob-store] nil)))
+ (zkLeaderElectorImpl [conf blob-store tc] nil)))
mocked-cluster (MockedCluster. cluster-utils)]
(Nimbus. auth-conf fake-inimbus)
(.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
@@ -1643,11 +1651,13 @@
(deftest test-validate-topo-config-on-submit
(let [cluster-state (Mockito/mock IStormClusterState)
- blob-store (Mockito/mock BlobStore)]
+ blob-store (Mockito/mock BlobStore)
+ tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
+ (.withTopoCache tc)
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(.thenReturn (Mockito/when (.getTopoId cluster-state "test")) (Optional/empty))
(let [topology (Thrift/buildTopology {} {})
@@ -1694,7 +1704,7 @@
(with-open [zk (InProcessZookeeper. )]
(with-open [tmp-nimbus-dir (TmpPath.)
_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+ (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))]
(let [nimbus-dir (.getPath tmp-nimbus-dir)]
(letlocals
(bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
@@ -1756,11 +1766,13 @@
;; log configs it contains are LogLevelAction/UNCHANGED
(deftest empty-save-config-results-in-all-unchanged-actions
(let [cluster-state (Mockito/mock IStormClusterState)
- blob-store (Mockito/mock BlobStore)]
+ blob-store (Mockito/mock BlobStore)
+ tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
+ (.withTopoCache tc)
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
previous-config (LogConfig.)
@@ -1777,7 +1789,7 @@
(.set_target_log_level "ERROR")
(.set_action LogLevelAction/UNCHANGED)))
- (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {})
+ (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
(.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config)
(.setLogConfig nimbus "foo" mock-config)
@@ -1785,11 +1797,13 @@
(deftest log-level-update-merges-and-flags-existent-log-level
(let [cluster-state (Mockito/mock IStormClusterState)
- blob-store (Mockito/mock BlobStore)]
+ blob-store (Mockito/mock BlobStore)
+ tc (Mockito/mock TopoCache)]
(with-open [cluster (.build
(doto (LocalCluster$Builder. )
(.withClusterState cluster-state)
(.withBlobStore blob-store)
+ (.withTopoCache tc)
(.withDaemonConf {NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"})))]
(let [nimbus (.getNimbus cluster)
previous-config (LogConfig.)
@@ -1823,7 +1837,7 @@
(.set_target_log_level "DEBUG")
(.set_action LogLevelAction/UNCHANGED)))
- (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {})
+ (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
(.thenReturn (Mockito/when (.topologyLogConfig cluster-state (Mockito/any String) (Mockito/anyObject))) previous-config)
(.setLogConfig nimbus "foo" mock-config)
@@ -1883,7 +1897,7 @@
mock-blob-store (Mockito/mock BlobStore)
conf {}]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+ (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))]
(let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))]
(.set (.getHeartbeatsCache nimbus) hb-cache)
(.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (HashSet. inactive-topos))
@@ -1928,7 +1942,7 @@
mock-blob-store (Mockito/mock BlobStore)
conf {}]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf blob-store] (MockLeaderElector. ))))]
+ (zkLeaderElectorImpl [conf blob-store tc] (MockLeaderElector. ))))]
(let [nimbus (Mockito/spy (Nimbus. conf nil mock-state nil mock-blob-store nil nil))]
(.set (.getHeartbeatsCache nimbus) hb-cache)
(.thenReturn (Mockito/when (.storedTopoIds mock-blob-store)) (set inactive-topos))
@@ -1960,7 +1974,8 @@
assignments {"topo1" assignment, "topo2" assignment2}
mock-state (mock-cluster-state)
mock-blob-store (Mockito/mock BlobStore)
- nimbus (Nimbus. {} nil mock-state nil mock-blob-store (MockLeaderElector. ) nil)]
+ mock-tc (Mockito/mock TopoCache)
+ nimbus (Nimbus. {} nil mock-state nil mock-blob-store mock-tc (MockLeaderElector. ) nil)]
(let [supervisor1-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1"))
user1-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor1-topologies))
supervisor2-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super2"))
@@ -1980,9 +1995,10 @@
assignments {"topo1" assignment, "authorized" assignment2}
mock-state (mock-cluster-state)
mock-blob-store (Mockito/mock BlobStore)
- nimbus (Nimbus. {} nil mock-state nil mock-blob-store (MockLeaderElector. ) nil)]
- (.thenReturn (Mockito/when (.readTopologyConf mock-blob-store (Mockito/eq "authorized") (Mockito/anyObject))) {TOPOLOGY-NAME "authorized"})
- (.thenReturn (Mockito/when (.readTopologyConf mock-blob-store (Mockito/eq "topo1") (Mockito/anyObject))) {TOPOLOGY-NAME "topo1"})
+ mock-tc (Mockito/mock TopoCache)
+ nimbus (Nimbus. {} nil mock-state nil mock-blob-store mock-tc (MockLeaderElector. ) nil)]
+ (.thenReturn (Mockito/when (.readTopoConf mock-tc (Mockito/eq "authorized") (Mockito/anyObject))) {TOPOLOGY-NAME "authorized"})
+ (.thenReturn (Mockito/when (.readTopoConf mock-tc (Mockito/eq "topo1") (Mockito/anyObject))) {TOPOLOGY-NAME "topo1"})
(.setAuthorizationHandler nimbus (reify IAuthorizer (permit [this context operation topo-conf] (= "authorized" (get topo-conf TOPOLOGY-NAME)))))
(let [supervisor-topologies (clojurify-structure (Nimbus/topologiesOnSupervisor assignments "super1"))
user-topologies (clojurify-structure (.filterAuthorized nimbus "getTopology" supervisor-topologies))]
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
index 149a0f2..f7aac52 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/auth_test.clj
@@ -61,7 +61,7 @@
(defn nimbus-data [storm-conf inimbus]
(with-open [_ (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf blob-store] (Mockito/mock ILeaderElector))))]
+ (zkLeaderElectorImpl [conf blob-store tc] (Mockito/mock ILeaderElector))))]
(org.apache.storm.daemon.nimbus.Nimbus. storm-conf inimbus (Mockito/mock IStormClusterState) nil (Mockito/mock BlobStore) nil nil)))
(defn dummy-service-handler
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
index 83a3267..4b2d085 100644
--- a/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
+++ b/storm-core/test/clj/org/apache/storm/security/auth/nimbus_auth_test.clj
@@ -20,13 +20,13 @@
(:import [java.util Optional])
(:import [org.apache.storm LocalCluster$Builder DaemonConfig Config])
(:import [org.apache.storm.blobstore BlobStore])
- (:import [org.apache.storm.utils NimbusClient])
+ (:import [org.apache.storm.daemon.nimbus TopoCache])
(:import [org.apache.storm.generated NotAliveException StormBase])
(:import [org.apache.storm.security.auth AuthUtils ThriftServer ThriftClient
ReqContext ThriftConnectionType])
(:import [org.apache.storm.generated Nimbus Nimbus$Client Nimbus$Processor
AuthorizationException SubmitOptions TopologyInitialStatus KillOptions])
- (:import [org.apache.storm.utils ConfigUtils Utils])
+ (:import [org.apache.storm.utils ConfigUtils NimbusClient Utils])
(:import [org.apache.storm.cluster IStormClusterState])
(:import [org.mockito Mockito Matchers])
(:use [org.apache.storm util config daemon-config log])
@@ -64,12 +64,14 @@
(deftest test-noop-authorization-w-simple-transport
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
+ tc (Mockito/mock TopoCache)
topo-name "topo-name"]
(.thenReturn (Mockito/when (.getTopoId cluster-state topo-name)) (Optional/empty))
(with-open [cluster (.build
(doto (LocalCluster$Builder.)
(.withClusterState cluster-state)
(.withBlobStore blob-store)
+ (.withTopoCache tc)
(.withNimbusDaemon)
(.withDaemonConf
{NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.NoopAuthorizer"
@@ -88,14 +90,16 @@
(deftest test-deny-authorization-w-simple-transport
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
+ tc (Mockito/mock TopoCache)
topo-name "topo-name"
topo-id "topo-name-1"]
(.thenReturn (Mockito/when (.getTopoId cluster-state topo-name)) (Optional/of topo-id))
- (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {})
+ (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
(with-open [cluster (.build
(doto (LocalCluster$Builder.)
(.withClusterState cluster-state)
(.withBlobStore blob-store)
+ (.withTopoCache tc)
(.withNimbusDaemon)
(.withDaemonConf
{NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
@@ -150,14 +154,16 @@
(deftest test-deny-authorization-w-sasl-digest
(let [cluster-state (Mockito/mock IStormClusterState)
blob-store (Mockito/mock BlobStore)
+ tc (Mockito/mock TopoCache)
topo-name "topo-name"
topo-id "topo-name-1"]
(.thenReturn (Mockito/when (.getTopoId cluster-state topo-name)) (Optional/of topo-id))
- (.thenReturn (Mockito/when (.readTopologyConf blob-store (Mockito/any String) (Mockito/anyObject))) {})
+ (.thenReturn (Mockito/when (.readTopoConf tc (Mockito/any String) (Mockito/anyObject))) {})
(with-open [cluster (.build
(doto (LocalCluster$Builder.)
(.withClusterState cluster-state)
(.withBlobStore blob-store)
+ (.withTopoCache tc)
(.withNimbusDaemon)
(.withDaemonConf
{NIMBUS-AUTHORIZER "org.apache.storm.security.auth.authorizer.DenyAuthorizer"
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-server/src/main/java/org/apache/storm/LocalCluster.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
index 4fb4474..fa49d12 100644
--- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java
+++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java
@@ -43,6 +43,7 @@ import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.nimbus.Nimbus;
import org.apache.storm.daemon.nimbus.Nimbus.StandaloneINimbus;
+import org.apache.storm.daemon.nimbus.TopoCache;
import org.apache.storm.daemon.supervisor.ReadClusterState;
import org.apache.storm.daemon.supervisor.StandaloneSupervisor;
import org.apache.storm.daemon.supervisor.Supervisor;
@@ -140,6 +141,7 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
private boolean nimbusDaemon = false;
private UnaryOperator<Nimbus> nimbusWrapper = null;
private BlobStore store = null;
+ private TopoCache topoCache = null;
private IStormClusterState clusterState = null;
private ILeaderElector leaderElector = null;
private String trackId = null;
@@ -278,7 +280,16 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
this.store = store;
return this;
}
-
+
+ /**
+ * Use the following topo cache instead of creating out own.
+ * This is intended mostly for internal testing with Mocks.
+ */
+ public Builder withTopoCache(TopoCache topoCache) {
+ this.topoCache = topoCache;
+ return this;
+ }
+
/**
* Use the following clusterState instead of the one in the config.
* This is intended mostly for internal testing with Mocks.
@@ -430,8 +441,9 @@ public class LocalCluster implements ILocalClusterTrackedTopologyAware, Iface {
}
//Set it for nimbus only
conf.put(Config.STORM_LOCAL_DIR, nimbusTmp.getPath());
- Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus() : builder.inimbus,
- this.getClusterState(), null, builder.store, builder.leaderElector, builder.groupMapper);
+ Nimbus nimbus = new Nimbus(conf, builder.inimbus == null ? new StandaloneINimbus() : builder.inimbus,
+ this.getClusterState(), null, builder.store, builder.topoCache, builder.leaderElector,
+ builder.groupMapper);
if (builder.nimbusWrapper != null) {
nimbus = builder.nimbusWrapper.apply(nimbus);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index db2550d..00397a6 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -250,7 +250,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (killTime != null) {
delay = ((Number)killTime).intValue();
} else {
- delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, nimbus.getBlobStore()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+ delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, nimbus.getTopoCache()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
}
nimbus.delayEvent(topoId, delay, TopologyActions.REMOVE, null);
StormBase sb = new StormBase();
@@ -271,7 +271,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (rbo.is_set_wait_secs()) {
delay = rbo.get_wait_secs();
} else {
- delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, nimbus.getBlobStore()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+ delay = ObjectReader.getInt(Nimbus.readTopoConf(topoId, nimbus.getTopoCache()).get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
}
nimbus.delayEvent(topoId, delay, TopologyActions.DO_REBALANCE, null);
@@ -549,8 +549,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return ReqContext.context().subject();
}
- static Map<String, Object> readTopoConf(String topoId, BlobStore blobStore) throws KeyNotFoundException, AuthorizationException, IOException {
- return blobStore.readTopologyConf(topoId, getSubject());
+ static Map<String, Object> readTopoConf(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+ return tc.readTopoConf(topoId, getSubject());
}
static List<String> getKeyListFromId(Map<String, Object> conf, String id) {
@@ -568,16 +568,16 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return kseq.getKeySequenceNumber(conf);
}
- private static StormTopology readStormTopology(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
- return store.readTopology(topoId, getSubject());
+ private static StormTopology readStormTopology(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+ return tc.readTopology(topoId, getSubject());
}
- private static Map<String, Object> readTopoConfAsNimbus(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
- return store.readTopologyConf(topoId, NIMBUS_SUBJECT);
+ private static Map<String, Object> readTopoConfAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+ return tc.readTopoConf(topoId, NIMBUS_SUBJECT);
}
- private static StormTopology readStormTopologyAsNimbus(String topoId, BlobStore store) throws KeyNotFoundException, AuthorizationException, IOException {
- return store.readTopology(topoId, NIMBUS_SUBJECT);
+ private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache tc) throws KeyNotFoundException, AuthorizationException, IOException {
+ return tc.readTopology(topoId, NIMBUS_SUBJECT);
}
/**
@@ -720,9 +720,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return state.getTopoId(topoName).isPresent();
}
- private static Map<String, Object> tryReadTopoConf(String topoId, BlobStore store) throws NotAliveException, AuthorizationException, IOException {
+ private static Map<String, Object> tryReadTopoConf(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException {
try {
- return readTopoConfAsNimbus(topoId, store);
+ return readTopoConfAsNimbus(topoId, tc);
//Was a try-cause but I looked at the code around this and key not found is not wrapped in runtime,
// so it is not needed
} catch (KeyNotFoundException e) {
@@ -873,9 +873,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
}
- private static StormTopology tryReadTopology(String topoId, BlobStore store) throws NotAliveException, AuthorizationException, IOException {
+ private static StormTopology tryReadTopology(String topoId, TopoCache tc) throws NotAliveException, AuthorizationException, IOException {
try {
- return readStormTopologyAsNimbus(topoId, store);
+ return readStormTopologyAsNimbus(topoId, tc);
} catch (KeyNotFoundException e) {
throw new NotAliveException(topoId);
}
@@ -1033,6 +1033,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
@SuppressWarnings("deprecation")
private final TimeCacheMap<String, WritableByteChannel> uploaders;
private final BlobStore blobStore;
+ private final TopoCache topoCache;
@SuppressWarnings("deprecation")
private final TimeCacheMap<String, BufferInputStream> blobDownloaders;
@SuppressWarnings("deprecation")
@@ -1070,7 +1071,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
- BlobStore blobStore, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) throws Exception {
+ BlobStore blobStore, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper) throws Exception {
+ this(conf, inimbus, stormClusterState, hostPortInfo, blobStore, null, leaderElector, groupMapper);
+ }
+
+ public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
+ BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper)
+ throws Exception {
this.conf = conf;
if (hostPortInfo == null) {
hostPortInfo = NimbusInfo.fromConf(conf);
@@ -1095,6 +1102,10 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo);
}
this.blobStore = blobStore;
+ if (topoCache == null) {
+ topoCache = new TopoCache(blobStore, conf);
+ }
+ this.topoCache = topoCache;
this.blobDownloaders = makeBlobCacheMap(conf);
this.blobUploaders = makeBlobCacheMap(conf);
this.blobListers = makeBlobListCachMap(conf);
@@ -1106,7 +1117,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
});
this.scheduler = makeScheduler(conf, inimbus);
if (leaderElector == null) {
- leaderElector = Zookeeper.zkLeaderElector(conf, blobStore);
+ leaderElector = Zookeeper.zkLeaderElector(conf, blobStore, topoCache);
}
this.leaderElector = leaderElector;
this.idToSchedStatus = new AtomicReference<>(new HashMap<>());
@@ -1148,7 +1159,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private BlobStore getBlobStore() {
return blobStore;
}
-
+
+ private TopoCache getTopoCache() {
+ return topoCache;
+ }
+
private boolean isLeader() throws Exception {
return leaderElector.isLeader();
}
@@ -1254,13 +1269,13 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
clusterState.setupBlobstore(jarKey, hostPortInfo, getVersionForKey(jarKey, hostPortInfo, conf));
}
}
-
- store.createBlob(confKey, Utils.toCompressedJsonConf(topoConf), new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
+
+ topoCache.addTopoConf(topoId, subject, topoConf);
if (store instanceof LocalFsBlobStore) {
clusterState.setupBlobstore(confKey, hostPortInfo, getVersionForKey(confKey, hostPortInfo, conf));
}
-
- store.createBlob(codeKey, Utils.serialize(topology), new SettableBlobMeta(BlobStoreAclHandler.DEFAULT), subject);
+
+ topoCache.addTopology(topoId, subject, topology);
if (store instanceof LocalFsBlobStore) {
clusterState.setupBlobstore(codeKey, hostPortInfo, getVersionForKey(codeKey, hostPortInfo, conf));
}
@@ -1317,10 +1332,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
AuthorizationException, IOException, InvalidTopologyException {
assert (base != null);
assert (topoId != null);
-
- BlobStore store = blobStore;
- Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
- StormTopology topo = readStormTopologyAsNimbus(topoId, store);
+
+ Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, topoCache);
+ StormTopology topo = readStormTopologyAsNimbus(topoId, topoCache);
if (!base.is_set_principal()) {
fixupBase(base, topoConf);
stormClusterState.updateStorm(topoId, base);
@@ -1387,12 +1401,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
private List<List<Integer>> computeExecutors(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
- BlobStore store = blobStore;
assert (base != null);
Map<String, Integer> compToExecutors = base.get_component_executors();
- Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
- StormTopology topology = readStormTopologyAsNimbus(topoId, store);
+ Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, topoCache);
+ StormTopology topology = readStormTopologyAsNimbus(topoId, topoCache);
List<List<Integer>> ret = new ArrayList<>();
if (compToExecutors != null) {
Map<Integer, String> taskInfo = StormCommon.stormTaskInfo(topology, topoConf);
@@ -1413,10 +1426,9 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
private Map<List<Integer>, String> computeExecutorToComponent(String topoId, StormBase base) throws KeyNotFoundException, AuthorizationException, InvalidTopologyException, IOException {
- BlobStore store = blobStore;
List<List<Integer>> executors = computeExecutors(topoId, base);
- StormTopology topology = readStormTopologyAsNimbus(topoId, store);
- Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, store);
+ StormTopology topology = readStormTopologyAsNimbus(topoId, topoCache);
+ Map<String, Object> topoConf = readTopoConfAsNimbus(topoId, topoCache);
Map<Integer, String> taskToComponent = StormCommon.stormTaskInfo(topology, topoConf);
Map<List<Integer>, String> ret = new HashMap<>();
for (List<Integer> executor: executors) {
@@ -1869,9 +1881,8 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException {
assert(TopologyStatus.ACTIVE == initStatus || TopologyStatus.INACTIVE == initStatus);
IStormClusterState state = stormClusterState;
- BlobStore store = blobStore;
- Map<String, Object> topoConf = readTopoConf(topoId, store);
- StormTopology topology = StormCommon.systemTopology(topoConf, readStormTopology(topoId, store));
+ Map<String, Object> topoConf = readTopoConf(topoId, topoCache);
+ StormTopology topology = StormCommon.systemTopology(topoConf, readStormTopology(topoId, topoCache));
Map<String, Integer> numExecutors = new HashMap<>();
for (Entry<String, Object> entry: StormCommon.allComponents(topology).entrySet()) {
numExecutors.put(entry.getKey(), StormCommon.numStartExecutors(entry.getValue()));
@@ -1906,7 +1917,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
IStormClusterState state = stormClusterState;
String topoId = state.getTopoId(topoName)
.orElseThrow(() -> new NotAliveException(topoName + " is not alive"));
- return tryReadTopoConf(topoId, blobStore);
+ return tryReadTopoConf(topoId, topoCache);
}
@VisibleForTesting
@@ -1959,7 +1970,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
private boolean isAuthorized(String operation, String topoId) throws NotAliveException, AuthorizationException, IOException {
- Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
try {
@@ -1986,7 +1997,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
BlobStore store = blobStore;
IStormClusterState state = stormClusterState;
- StormTopology topo = readStormTopologyAsNimbus(topoId, store);
+ StormTopology topo = readStormTopologyAsNimbus(topoId, topoCache);
List<String> dependencyJars = topo.get_dependency_jars();
LOG.info("Removing dependency jars from blobs - {}", dependencyJars);
if (dependencyJars != null && !dependencyJars.isEmpty()) {
@@ -2004,9 +2015,17 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
public void rmTopologyKeys(String topoId) {
BlobStore store = blobStore;
IStormClusterState state = stormClusterState;
+ try {
+ topoCache.deleteTopoConf(topoId, NIMBUS_SUBJECT);
+ } catch (Exception e) {
+ //Just go on and try to delete the others
+ }
+ try {
+ topoCache.deleteTopology(topoId, NIMBUS_SUBJECT);
+ } catch (Exception e) {
+ //Just go on and try to delte the others
+ }
rmBlobKey(store, ConfigUtils.masterStormJarKey(topoId), state);
- rmBlobKey(store, ConfigUtils.masterStormConfKey(topoId), state);
- rmBlobKey(store, ConfigUtils.masterStormCodeKey(topoId), state);
}
@VisibleForTesting
@@ -2139,7 +2158,6 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
return;
}
IStormClusterState state = stormClusterState;
- BlobStore store = blobStore;
Collection<ICredentialsRenewer> renewers = credRenewers;
Object lock = credUpdateLock;
Map<String, StormBase> assignedBases = state.topologyBases();
@@ -2147,7 +2165,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
for (Entry<String, StormBase> entry: assignedBases.entrySet()) {
String id = entry.getKey();
String ownerPrincipal = entry.getValue().get_principal();
- Map<String, Object> topoConf = Collections.unmodifiableMap(merge(conf, tryReadTopoConf(id, store)));
+ Map<String, Object> topoConf = Collections.unmodifiableMap(merge(conf, tryReadTopoConf(id, topoCache)));
synchronized(lock) {
Credentials origCreds = state.credentials(id, null);
if (origCreds != null) {
@@ -2261,7 +2279,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
TopologySummary summary = new TopologySummary(topoId, base.get_name(), numTasks, numExecutors, numWorkers,
Time.deltaSecs(base.get_launch_time_secs()), extractStatusStr(base));
try {
- StormTopology topo = tryReadTopology(topoId, blobStore);
+ StormTopology topo = tryReadTopology(topoId, topoCache);
if (topo != null && topo.is_set_storm_version()) {
summary.set_storm_version(topo.get_storm_version());
}
@@ -2313,13 +2331,12 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
}
private CommonTopoInfo getCommonTopoInfo(String topoId, String operation) throws NotAliveException, AuthorizationException, IOException, InvalidTopologyException {
- BlobStore store = blobStore;
IStormClusterState state = stormClusterState;
CommonTopoInfo ret = new CommonTopoInfo();
- ret.topoConf = tryReadTopoConf(topoId, store);
+ ret.topoConf = tryReadTopoConf(topoId, topoCache);
ret.topoName = (String)ret.topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(ret.topoName, ret.topoConf, operation);
- ret.topology = tryReadTopology(topoId, store);
+ ret.topology = tryReadTopology(topoId, topoCache);
ret.taskToComponent = StormCommon.stormTaskInfo(ret.topology, ret.topoConf);
ret.base = state.stormBase(topoId, null);
if (ret.base != null && ret.base.is_set_launch_time_secs()) {
@@ -2747,7 +2764,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
public void setLogConfig(String topoId, LogConfig config) throws TException {
try {
setLogConfigCalls.mark();
- Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "setLogConfig");
@@ -2804,7 +2821,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
public LogConfig getLogConfig(String topoId) throws TException {
try {
getLogConfigCalls.mark();
- Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "getLogConfig");
@@ -2830,7 +2847,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
IStormClusterState state = stormClusterState;
String topoId = toTopoId(topoName);
- Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
topoConf = merge(conf, topoConf);
// make sure samplingPct is within bounds.
double spct = Math.max(Math.min(samplingPercentage, 100.0), 0.0);
@@ -2871,7 +2888,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
public void setWorkerProfiler(String topoId, ProfileRequest profileRequest) throws TException {
try {
setWorkerProfilerCalls.mark();
- Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "setWorkerProfiler");
@@ -2943,7 +2960,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (topoId == null) {
throw new NotAliveException(topoName + " is not alive");
}
- Map<String, Object> topoConf = tryReadTopoConf(topoId, blobStore);
+ Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
topoConf = merge(conf, topoConf);
if (credentials == null) {
credentials = new Credentials(Collections.emptyMap());
@@ -3750,7 +3767,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
public String getTopologyConf(String id) throws NotAliveException, AuthorizationException, TException {
try {
getTopologyConfCalls.mark();
- Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
+ Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
Map<String, Object> checkConf = merge(conf, topoConf);
String topoName = (String) checkConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, checkConf, "getTopologyConf");
@@ -3768,11 +3785,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
public StormTopology getTopology(String id) throws NotAliveException, AuthorizationException, TException {
try {
getTopologyCalls.mark();
- Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
+ Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "getTopology");
- return StormCommon.systemTopology(topoConf, tryReadTopology(id, blobStore));
+ return StormCommon.systemTopology(topoConf, tryReadTopology(id, topoCache));
} catch (Exception e) {
LOG.warn("Get topology exception. (topology id='{}')", id, e);
if (e instanceof TException) {
@@ -3786,11 +3803,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
public StormTopology getUserTopology(String id) throws NotAliveException, AuthorizationException, TException {
try {
getUserTopologyCalls.mark();
- Map<String, Object> topoConf = tryReadTopoConf(id, blobStore);
+ Map<String, Object> topoConf = tryReadTopoConf(id, topoCache);
topoConf = merge(conf, topoConf);
String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME);
checkAuthorization(topoName, topoConf, "getUserTopology");
- return tryReadTopology(id, blobStore);
+ return tryReadTopology(id, topoCache);
} catch (Exception e) {
LOG.warn("Get user topology exception. (topology id='{}')", id, e);
if (e instanceof TException) {
@@ -3806,12 +3823,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
try {
List<String> adminUsers = (List<String>) conf.getOrDefault(Config.NIMBUS_ADMINS, Collections.emptyList());
IStormClusterState state = stormClusterState;
- BlobStore store = blobStore;
List<String> assignedIds = state.assignments(null);
Set<String> ret = new HashSet<>();
boolean isAdmin = adminUsers.contains(user);
for (String topoId: assignedIds) {
- Map<String, Object> topoConf = tryReadTopoConf(topoId, store);
+ Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache);
topoConf = merge(conf, topoConf);
List<String> groups = ServerConfigUtils.getTopoLogsGroups(topoConf);
List<String> topoLogUsers = ServerConfigUtils.getTopoLogsUsers(topoConf);
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java
new file mode 100644
index 0000000..c7387f9
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/TopoCache.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import static org.apache.storm.blobstore.BlobStoreAclHandler.READ;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.security.auth.Subject;
+
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.BlobStoreAclHandler;
+import org.apache.storm.generated.AccessControl;
+import org.apache.storm.generated.AuthorizationException;
+import org.apache.storm.generated.KeyAlreadyExistsException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.generated.ReadableBlobMeta;
+import org.apache.storm.generated.SettableBlobMeta;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache topologies and topology confs from the blob store.
+ * Makes reading this faster because it can skip
+ * deserialization in many cases.
+ */
+public class TopoCache {
+ public static final Logger LOG = LoggerFactory.getLogger(TopoCache.class);
+
+ private static final class WithAcl<T> {
+ public final List<AccessControl> acl;
+ public final T data;
+
+ public WithAcl(List<AccessControl> acl, T data) {
+ this.acl = acl;
+ this.data = data;
+ }
+ }
+
+ private final BlobStore store;
+ private final BlobStoreAclHandler aclHandler;
+ private final ConcurrentHashMap<String, WithAcl<StormTopology>> topos = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, WithAcl<Map<String, Object>>> confs = new ConcurrentHashMap<>();
+
+ public TopoCache(BlobStore store, Map<String, Object> conf) {
+ this.store = store;
+ aclHandler = new BlobStoreAclHandler(conf);
+ }
+
+ /**
+ * Read a topology.
+ * @param topoId the id of the topology to read
+ * @param who who to read it as
+ * @return the deserialized topology.
+ * @throws IOException on any error while reading the blob.
+ * @throws AuthorizationException if who is not allowed to read the blob
+ * @throws KeyNotFoundException if the blob could not be found
+ */
+ public StormTopology readTopology(final String topoId, final Subject who)
+ throws KeyNotFoundException, AuthorizationException, IOException {
+ final String key = ConfigUtils.masterStormCodeKey(topoId);
+ WithAcl<StormTopology> cached = topos.get(topoId);
+ if (cached == null) {
+ //We need to read a new one
+ StormTopology topo = Utils.deserialize(store.readBlob(key, who), StormTopology.class);
+ ReadableBlobMeta meta = store.getBlobMeta(key, who);
+ cached = new WithAcl<>(meta.get_settable().get_acl(), topo);
+ WithAcl<StormTopology> previous = topos.putIfAbsent(topoId, cached);
+ if (previous != null) {
+ cached = previous;
+ }
+ } else {
+ //Check if the user is allowed to read this
+ aclHandler.hasPermissions(cached.acl, READ, who, key);
+ }
+ return cached.data;
+ }
+
+ /**
+ * Delete a topology when we are done.
+ * @param topoId the id of the topology
+ * @param who who is deleting it
+ * @throws AuthorizationException if who is not allowed to delete the blob
+ * @throws KeyNotFoundException if the blob could not be found
+ */
+ public void deleteTopology(final String topoId, final Subject who) throws AuthorizationException, KeyNotFoundException {
+ final String key = ConfigUtils.masterStormCodeKey(topoId);
+ store.deleteBlob(key, who);
+ topos.remove(topoId);
+ }
+
+ /**
+ * Add a new topology.
+ * @param topoId the id of the topology
+ * @param who who is doing it
+ * @param topo the topology itself
+ * @throws AuthorizationException if who is not allowed to add a topology
+ * @throws KeyAlreadyExistsException if the topology already exists
+ * @throws IOException on any error interacting with the blob store
+ */
+ public void addTopology(final String topoId, final Subject who, final StormTopology topo)
+ throws AuthorizationException, KeyAlreadyExistsException, IOException {
+ final String key = ConfigUtils.masterStormCodeKey(topoId);
+ final List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
+ store.createBlob(key, Utils.serialize(topo), new SettableBlobMeta(acl), who);
+ topos.put(topoId, new WithAcl<>(acl, topo));
+ }
+
+ /**
+ * Update an existing topology .
+ * @param topoId the id of the topology
+ * @param who who is doing it
+ * @param topo the new topology to save
+ * @throws AuthorizationException if who is not allowed to update a topology
+ * @throws KeyNotFoundException if the topology is not found in the blob store
+ * @throws IOException on any error interacting with the blob store
+ */
+ public void updateTopology(final String topoId, final Subject who, final StormTopology topo)
+ throws AuthorizationException, KeyNotFoundException, IOException {
+ final String key = ConfigUtils.masterStormCodeKey(topoId);
+ store.updateBlob(key, Utils.serialize(topo), who);
+ List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
+ WithAcl<StormTopology> old = topos.get(topoId);
+ if (old != null) {
+ acl = old.acl;
+ } else {
+ acl = store.getBlobMeta(key, who).get_settable().get_acl();
+ }
+ topos.put(topoId, new WithAcl<>(acl, topo));
+ }
+
+ /**
+ * Read a topology conf.
+ * @param topoId the id of the topology to read the conf for
+ * @param who who to read it as
+ * @return the deserialized config.
+ * @throws IOException on any error while reading the blob.
+ * @throws AuthorizationException if who is not allowed to read the blob
+ * @throws KeyNotFoundException if the blob could not be found
+ */
+ public Map<String, Object> readTopoConf(final String topoId, final Subject who)
+ throws KeyNotFoundException, AuthorizationException, IOException {
+ final String key = ConfigUtils.masterStormConfKey(topoId);
+ WithAcl<Map<String, Object>> cached = confs.get(topoId);
+ if (cached == null) {
+ //We need to read a new one
+ Map<String, Object> topoConf = Utils.fromCompressedJsonConf(store.readBlob(key, who));
+ ReadableBlobMeta meta = store.getBlobMeta(key, who);
+ cached = new WithAcl<>(meta.get_settable().get_acl(), topoConf);
+ WithAcl<Map<String, Object>> previous = confs.putIfAbsent(topoId, cached);
+ if (previous != null) {
+ cached = previous;
+ }
+ } else {
+ //Check if the user is allowed to read this
+ aclHandler.hasPermissions(cached.acl, READ, who, key);
+ }
+ return cached.data;
+ }
+
+ /**
+ * Delete a topology conf when we are done.
+ * @param topoId the id of the topology
+ * @param who who is deleting it
+ * @throws AuthorizationException if who is not allowed to delete the topo conf
+ * @throws KeyNotFoundException if the topo conf is not found in the blob store
+ */
+ public void deleteTopoConf(final String topoId, final Subject who) throws AuthorizationException, KeyNotFoundException {
+ final String key = ConfigUtils.masterStormConfKey(topoId);
+ store.deleteBlob(key, who);
+ confs.remove(topoId);
+ }
+
+ /**
+ * Add a new topology config.
+ * @param topoId the id of the topology
+ * @param who who is doing it
+ * @param topoConf the topology conf itself
+ * @throws AuthorizationException if who is not allowed to add a topology conf
+ * @throws KeyAlreadyExistsException if the toplogy conf already exists in the blob store
+ * @throws IOException on any error interacting with the blob store.
+ */
+ public void addTopoConf(final String topoId, final Subject who, final Map<String, Object> topoConf)
+ throws AuthorizationException, KeyAlreadyExistsException, IOException {
+ final String key = ConfigUtils.masterStormConfKey(topoId);
+ final List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
+ store.createBlob(key, Utils.toCompressedJsonConf(topoConf), new SettableBlobMeta(acl), who);
+ confs.put(topoId, new WithAcl<>(acl, topoConf));
+ }
+
+ /**
+ * Update an existing topology conf.
+ * @param topoId the id of the topology
+ * @param who who is doing it
+ * @param topoConf the new topology conf to save
+ * @throws AuthorizationException if who is not allowed to update the topology conf
+ * @throws KeyNotFoundException if the topology conf is not found in the blob store
+ * @throws IOException on any error interacting with the blob store.
+ */
+ public void updateTopoConf(final String topoId, final Subject who, final Map<String, Object> topoConf)
+ throws AuthorizationException, KeyNotFoundException, IOException {
+ final String key = ConfigUtils.masterStormConfKey(topoId);
+ store.updateBlob(key, Utils.toCompressedJsonConf(topoConf), who);
+ List<AccessControl> acl = BlobStoreAclHandler.DEFAULT;
+ WithAcl<Map<String, Object>> old = confs.get(topoId);
+ if (old != null) {
+ acl = old.acl;
+ } else {
+ acl = store.getBlobMeta(key, who).get_settable().get_acl();
+ }
+ confs.put(topoId, new WithAcl<>(acl, topoConf));
+ }
+
+ /**
+ * Clear all entries from the Cache. This typically happens right after becoming a leader, just to be sure
+ * nothing has changed while we were not the leader.
+ */
+ public void clear() {
+ confs.clear();
+ topos.clear();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
index 09c14ba..6bf39c3 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.daemon.nimbus.TopoCache;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.Utils;
@@ -44,9 +45,11 @@ public class LeaderElectorImp implements ILeaderElector {
private final AtomicReference<LeaderLatch> leaderLatch;
private final AtomicReference<LeaderLatchListener> leaderLatchListener;
private final BlobStore blobStore;
+ private final TopoCache tc;
- public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id, AtomicReference<LeaderLatch> leaderLatch,
- AtomicReference<LeaderLatchListener> leaderLatchListener, BlobStore blobStore) {
+ public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,
+ AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,
+ BlobStore blobStore, final TopoCache tc) {
this.conf = conf;
this.servers = servers;
this.zk = zk;
@@ -55,6 +58,7 @@ public class LeaderElectorImp implements ILeaderElector {
this.leaderLatch = leaderLatch;
this.leaderLatchListener = leaderLatchListener;
this.blobStore = blobStore;
+ this.tc = tc;
}
@Override
@@ -67,7 +71,7 @@ public class LeaderElectorImp implements ILeaderElector {
// if this latch is already closed, we need to create new instance.
if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
- leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, blobStore, leaderLatch.get()));
+ leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, blobStore, leaderLatch.get(), tc));
LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
}
// Only if the latch is not already started we invoke start
http://git-wip-us.apache.org/repos/asf/storm/blob/ac8d37b9/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
index 4979588..802d3ba 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
@@ -31,6 +31,7 @@ import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.callback.DefaultWatcherCallBack;
import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.daemon.nimbus.TopoCache;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.StormTopology;
@@ -124,7 +125,10 @@ public class Zookeeper {
}
// Leader latch listener that will be invoked when we either gain or lose leadership
- public static LeaderLatchListener leaderLatchListenerImpl(final Map<String, Object> conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
+ public static LeaderLatchListener leaderLatchListenerImpl(final Map<String, Object> conf, final CuratorFramework zk,
+ final BlobStore blobStore, final LeaderLatch leaderLatch,
+ final TopoCache tc)
+ throws UnknownHostException {
final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
return new LeaderLatchListener() {
final String STORM_JAR_SUFFIX = "-stormjar.jar";
@@ -133,7 +137,8 @@ public class Zookeeper {
@Override
public void isLeader() {
- Set<String> activeTopologyIds = new TreeSet<>(ClientZookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
+ Set<String> activeTopologyIds = new TreeSet<>(ClientZookeeper.getChildren(zk,
+ conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));
Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
@@ -157,8 +162,10 @@ public class Zookeeper {
if (diffDependencies.isEmpty()) {
LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");
+ tc.clear();
} else {
- LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");
+ LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, "
+ + "giving up leadership.");
closeLatch();
}
} else {
@@ -170,6 +177,8 @@ public class Zookeeper {
@Override
public void notLeader() {
LOG.info("{} lost leadership.", hostName);
+ //Just to be sure
+ tc.clear();
}
private String generateJoinedString(Set<String> activeTopologyIds) {
@@ -241,11 +250,13 @@ public class Zookeeper {
};
}
- public static ILeaderElector zkLeaderElector(Map<String, Object> conf, BlobStore blobStore) throws UnknownHostException {
- return _instance.zkLeaderElectorImpl(conf, blobStore);
+ public static ILeaderElector zkLeaderElector(Map<String, Object> conf, BlobStore blobStore, final TopoCache tc)
+ throws UnknownHostException {
+ return _instance.zkLeaderElectorImpl(conf, blobStore, tc);
}
- protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, BlobStore blobStore) throws UnknownHostException {
+ protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, BlobStore blobStore, final TopoCache tc)
+ throws UnknownHostException {
List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
CuratorFramework zk = ClientZookeeper.mkClient(conf, servers, port, "", new DefaultWatcherCallBack(), conf);
@@ -253,9 +264,9 @@ public class Zookeeper {
String id = NimbusInfo.fromConf(conf).toHostPortString();
AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
- new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));
+ new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get(), tc));
return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
- leaderLatchListenerAtomicReference, blobStore);
+ leaderLatchListenerAtomicReference, blobStore, tc);
}
}
[2/2] storm git commit: Merge branch 'STORM-2740' of
https://github.com/revans2/incubator-storm into STORM-2740
Posted by bo...@apache.org.
Merge branch 'STORM-2740' of https://github.com/revans2/incubator-storm into STORM-2740
STORM-2740: Add in caching of topology and conf to nimbus
This closes #2327
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/da2f0358
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/da2f0358
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/da2f0358
Branch: refs/heads/master
Commit: da2f035865c0176e2fdf047730b0c1f55d016c50
Parents: 05a74c7 ac8d37b
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon Sep 18 09:07:07 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 18 09:07:07 2017 -0500
----------------------------------------------------------------------
.../org/apache/storm/blobstore/BlobStore.java | 65 +++--
.../src/jvm/org/apache/storm/utils/Utils.java | 12 +-
.../apache/storm/command/shell_submission.clj | 4 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 72 +++---
.../apache/storm/security/auth/auth_test.clj | 2 +-
.../storm/security/auth/nimbus_auth_test.clj | 14 +-
.../java/org/apache/storm/LocalCluster.java | 18 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 130 +++++-----
.../apache/storm/daemon/nimbus/TopoCache.java | 244 +++++++++++++++++++
.../storm/zookeeper/LeaderElectorImp.java | 10 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 27 +-
11 files changed, 446 insertions(+), 152 deletions(-)
----------------------------------------------------------------------