You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/24 17:17:18 UTC
[08/27] storm git commit: update class hierarchy about cluster
http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
new file mode 100644
index 0000000..cd2bc4a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -0,0 +1,664 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.*;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.generated.*;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StormClusterStateImpl implements StormClusterState {
+
+ private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
+
+ private StateStorage stateStorage;
+
+ private ConcurrentHashMap<String, IFn> assignmentInfoCallback;
+ private ConcurrentHashMap<String, IFn> assignmentInfoWithVersionCallback;
+ private ConcurrentHashMap<String, IFn> assignmentVersionCallback;
+ private AtomicReference<IFn> supervisorsCallback;
+ // we want to reigister a topo directory getChildren callback for all workers of this dir
+ private ConcurrentHashMap<String, IFn> backPressureCallback;
+ private AtomicReference<IFn> assignmentsCallback;
+ private ConcurrentHashMap<String, IFn> stormBaseCallback;
+ private AtomicReference<IFn> blobstoreCallback;
+ private ConcurrentHashMap<String, IFn> credentialsCallback;
+ private ConcurrentHashMap<String, IFn> logConfigCallback;
+
+ private List<ACL> acls;
+ private String stateId;
+ private boolean solo;
+
+ public StormClusterStateImpl(StateStorage StateStorage, List<ACL> acls, ClusterStateContext context, boolean solo) throws Exception {
+
+ this.stateStorage = StateStorage;
+ this.solo = solo;
+
+ assignmentInfoCallback = new ConcurrentHashMap<>();
+ assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
+ assignmentVersionCallback = new ConcurrentHashMap<>();
+ supervisorsCallback = new AtomicReference<>();
+ backPressureCallback = new ConcurrentHashMap<>();
+ assignmentsCallback = new AtomicReference<>();
+ stormBaseCallback = new ConcurrentHashMap<>();
+ credentialsCallback = new ConcurrentHashMap<>();
+ logConfigCallback = new ConcurrentHashMap<>();
+ blobstoreCallback = new AtomicReference<>();
+
+ stateId = this.stateStorage.register(new ZKStateChangedCallback() {
+
+ public void changed(Watcher.Event.EventType type, String path) {
+ List<String> toks = Zookeeper.tokenizePath(path);
+ int size = toks.size();
+ if (size >= 1) {
+ String params = null;
+ String root = toks.get(0);
+ IFn fn = null;
+ if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) {
+ if (size == 1) {
+ // set null and get the old value
+ issueCallback(assignmentsCallback);
+ } else {
+ issueMapCallback(assignmentInfoCallback, toks.get(1));
+ issueMapCallback(assignmentVersionCallback, toks.get(1));
+ issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
+ }
+
+ } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) {
+ issueCallback(supervisorsCallback);
+ } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) {
+ issueCallback(blobstoreCallback);
+ } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) {
+ issueMapCallback(stormBaseCallback, toks.get(1));
+ } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) {
+ issueMapCallback(credentialsCallback, toks.get(1));
+ } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) {
+ issueMapCallback(logConfigCallback, toks.get(1));
+ } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
+ issueMapCallback(logConfigCallback, toks.get(1));
+ } else {
+ LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
+ Runtime.getRuntime().exit(30);
+ }
+
+ }
+
+ return;
+ }
+
+ });
+
+ String[] pathlist = { ClusterUtils.ASSIGNMENTS_SUBTREE, ClusterUtils.STORMS_SUBTREE, ClusterUtils.SUPERVISORS_SUBTREE, ClusterUtils.WORKERBEATS_SUBTREE,
+ ClusterUtils.ERRORS_SUBTREE, ClusterUtils.BLOBSTORE_SUBTREE, ClusterUtils.NIMBUSES_SUBTREE, ClusterUtils.LOGCONFIG_SUBTREE };
+ for (String path : pathlist) {
+ this.stateStorage.mkdirs(path, acls);
+ }
+
+ }
+
+ protected void issueCallback(AtomicReference<IFn> cb) {
+ IFn callback = cb.getAndSet(null);
+ if (callback != null)
+ callback.invoke();
+ }
+
+ protected void issueMapCallback(ConcurrentHashMap<String, IFn> callbackConcurrentHashMap, String key) {
+ IFn callback = callbackConcurrentHashMap.remove(key);
+ if (callback != null)
+ callback.invoke();
+ }
+
+ @Override
+ public List<String> assignments(IFn callback) {
+ if (callback != null) {
+ assignmentsCallback.set(callback);
+ }
+ return stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, callback != null);
+ }
+
+ @Override
+ public Assignment assignmentInfo(String stormId, IFn callback) {
+ if (callback != null) {
+ assignmentInfoCallback.put(stormId, callback);
+ }
+ byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
+ return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
+ }
+
+ @Override
+ public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback) {
+ if (callback != null) {
+ assignmentInfoWithVersionCallback.put(stormId, callback);
+ }
+ Assignment assignment = null;
+ Integer version = 0;
+ APersistentMap aPersistentMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
+ if (aPersistentMap != null) {
+ assignment = ClusterUtils.maybeDeserialize((byte[]) aPersistentMap.get(RT.keyword(null, "data")), Assignment.class);
+ version = (Integer) aPersistentMap.get(RT.keyword(null, "version"));
+ }
+ APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version });
+ return map;
+ }
+
+ @Override
+ public Integer assignmentVersion(String stormId, IFn callback) throws Exception {
+ if (callback != null) {
+ assignmentVersionCallback.put(stormId, callback);
+ }
+ return stateStorage.get_version(ClusterUtils.assignmentPath(stormId), callback != null);
+ }
+
+ // blobstore state
+ @Override
+ public List<String> blobstoreInfo(String blobKey) {
+ String path = ClusterUtils.blobstorePath(blobKey);
+ stateStorage.sync_path(path);
+ return stateStorage.get_children(path, false);
+ }
+
+ @Override
+ public List nimbuses() {
+ List<NimbusSummary> nimbusSummaries = new ArrayList<>();
+ List<String> nimbusIds = stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false);
+ for (String nimbusId : nimbusIds) {
+ byte[] serialized = stateStorage.get_data(ClusterUtils.nimbusPath(nimbusId), false);
+ NimbusSummary nimbusSummary = ClusterUtils.maybeDeserialize(serialized, NimbusSummary.class);
+ nimbusSummaries.add(nimbusSummary);
+ }
+ return nimbusSummaries;
+ }
+
+ @Override
+ public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
+ // explicit delete for ephmeral node to ensure this session creates the entry.
+ stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
+ stateStorage.add_listener(new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+ LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
+ if (connectionState.equals(ConnectionState.RECONNECTED)) {
+ LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
+ stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+ }
+
+ }
+ });
+
+ stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+ }
+
+ @Override
+ public List<String> activeStorms() {
+ return stateStorage.get_children(ClusterUtils.STORMS_SUBTREE, false);
+ }
+
+ @Override
+ public StormBase stormBase(String stormId, IFn callback) {
+ if (callback != null) {
+ stormBaseCallback.put(stormId, callback);
+ }
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(ClusterUtils.stormPath(stormId), callback != null), StormBase.class);
+ }
+
+ @Override
+ public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
+ byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
+ return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
+
+ }
+
+ @Override
+ public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift) {
+ List<ProfileRequest> requests = new ArrayList<>();
+ List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId, isThrift);
+ for (ProfileRequest profileRequest : profileRequests) {
+ NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
+ if (nodeInfo1.equals(nodeInfo))
+ requests.add(profileRequest);
+ }
+ return requests;
+ }
+
+ @Override
+ public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift) {
+ List<ProfileRequest> profileRequests = new ArrayList<>();
+ String path = ClusterUtils.profilerConfigPath(stormId);
+ if (stateStorage.node_exists(path, false)) {
+ List<String> strs = stateStorage.get_children(path, false);
+ for (String str : strs) {
+ String childPath = path + ClusterUtils.ZK_SEPERATOR + str;
+ byte[] raw = stateStorage.get_data(childPath, false);
+ ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class);
+ if (request != null)
+ profileRequests.add(request);
+ }
+ }
+ return profileRequests;
+ }
+
+ @Override
+ public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
+ ProfileAction profileAction = profileRequest.get_action();
+ String host = profileRequest.get_nodeInfo().get_node();
+ Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+ String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+ stateStorage.set_data(path, Utils.serialize(profileRequest), acls);
+ }
+
+ @Override
+ public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
+ ProfileAction profileAction = profileRequest.get_action();
+ String host = profileRequest.get_nodeInfo().get_node();
+ Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+ String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+ stateStorage.delete_node(path);
+ }
+
+ // need to take executor->node+port in explicitly so that we don't run into a situation where a
+ // long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
+ // with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
+ // we avoid situations like that
+ @Override
+ public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
+ Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhbs = new HashMap<>();
+
+ Map<NodeInfo, List<List<Long>>> nodePortExecutors = ClusterUtils.reverseMap(executorNodePort);
+
+ for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
+
+ String node = entry.getKey().get_node();
+ Long port = entry.getKey().get_port_iterator().next();
+ ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
+ List<ExecutorInfo> executorInfoList = new ArrayList<>();
+ for (List<Long> list : entry.getValue()) {
+ executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
+ }
+ if (whb != null)
+ executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
+ }
+ return executorWhbs;
+ }
+
+ @Override
+ public List<String> supervisors(IFn callback) {
+ if (callback != null) {
+ supervisorsCallback.set(callback);
+ }
+ return stateStorage.get_children(ClusterUtils.SUPERVISORS_SUBTREE, callback != null);
+ }
+
+ @Override
+ public SupervisorInfo supervisorInfo(String supervisorId) {
+ String path = ClusterUtils.supervisorPath(supervisorId);
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), SupervisorInfo.class);
+ }
+
+ @Override
+ public void setupHeatbeats(String stormId) {
+ stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls);
+ }
+
+ @Override
+ public void teardownHeartbeats(String stormId) {
+ try {
+ stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId));
+ } catch (Exception e) {
+ if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+ // do nothing
+ LOG.warn("Could not teardown heartbeats for {}.", stormId);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void teardownTopologyErrors(String stormId) {
+ try {
+ stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId));
+ } catch (Exception e) {
+ if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+ // do nothing
+ LOG.warn("Could not teardown errors for {}.", stormId);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public List<String> heartbeatStorms() {
+ return stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false);
+ }
+
+ @Override
+ public List<String> errorTopologies() {
+ return stateStorage.get_children(ClusterUtils.ERRORS_SUBTREE, false);
+ }
+
+ @Override
+ public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
+ stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), acls);
+ }
+
+ @Override
+ public LogConfig topologyLogConfig(String stormId, IFn cb) {
+ String path = ClusterUtils.logConfigPath(stormId);
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class);
+ }
+
+ @Override
+ public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
+ if (info != null) {
+ String path = ClusterUtils.workerbeatPath(stormId, node, port);
+ stateStorage.set_worker_hb(path, Utils.serialize(info), acls);
+ }
+ }
+
+ @Override
+ public void removeWorkerHeartbeat(String stormId, String node, Long port) {
+ String path = ClusterUtils.workerbeatPath(stormId, node, port);
+ stateStorage.delete_worker_hb(path);
+ }
+
+ @Override
+ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
+ String path = ClusterUtils.supervisorPath(supervisorId);
+ stateStorage.set_ephemeral_node(path, Utils.serialize(info), acls);
+ }
+
+ // if znode exists and to be not on?, delete; if exists and on?, do nothing;
+ // if not exists and to be on?, create; if not exists and not on?, do nothing;
+ @Override
+ public void workerBackpressure(String stormId, String node, Long port, boolean on) {
+ String path = ClusterUtils.backpressurePath(stormId, node, port);
+ boolean existed = stateStorage.node_exists(path, false);
+ if (existed) {
+ if (on == false)
+ stateStorage.delete_node(path);
+
+ } else {
+ if (on == true) {
+ stateStorage.set_ephemeral_node(path, null, acls);
+ }
+ }
+ }
+
+ // if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not.
+ @Override
+ public boolean topologyBackpressure(String stormId, IFn callback) {
+ if (callback != null) {
+ backPressureCallback.put(stormId, callback);
+ }
+ String path = ClusterUtils.backpressureStormRoot(stormId);
+ List<String> childrens = stateStorage.get_children(path, callback != null);
+ return childrens.size() > 0;
+
+ }
+
+ @Override
+ public void setupBackpressure(String stormId) {
+ stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls);
+ }
+
+ @Override
+ public void removeWorkerBackpressure(String stormId, String node, Long port) {
+ stateStorage.delete_node(ClusterUtils.backpressurePath(stormId, node, port));
+ }
+
+ @Override
+ public void activateStorm(String stormId, StormBase stormBase) {
+ String path = ClusterUtils.stormPath(stormId);
+ stateStorage.set_data(path, Utils.serialize(stormBase), acls);
+ }
+
+ // To update this function due to APersistentMap/APersistentSet is clojure's structure
+ @Override
+ public void updateStorm(String stormId, StormBase newElems) {
+
+ StormBase stormBase = stormBase(stormId, null);
+ if (stormBase.get_component_executors() != null) {
+
+ Map<String, Integer> newComponentExecutors = new HashMap<>();
+ Map<String, Integer> componentExecutors = newElems.get_component_executors();
+ // componentExecutors maybe be APersistentMap, which don't support "put"
+ for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
+ newComponentExecutors.put(entry.getKey(), entry.getValue());
+ }
+ for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
+ if (!componentExecutors.containsKey(entry.getKey())) {
+ newComponentExecutors.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (newComponentExecutors.size() > 0)
+ newElems.set_component_executors(newComponentExecutors);
+ }
+
+ Map<String, DebugOptions> ComponentDebug = new HashMap<>();
+ Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
+
+ Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
+ /// oldComponentDebug.keySet()/ newComponentDebug.keySet() maybe be APersistentSet, which don't support addAll
+ Set<String> debugOptionsKeys = new HashSet<>();
+ debugOptionsKeys.addAll(oldComponentDebug.keySet());
+ debugOptionsKeys.addAll(newComponentDebug.keySet());
+ for (String key : debugOptionsKeys) {
+ boolean enable = false;
+ double samplingpct = 0;
+ if (oldComponentDebug.containsKey(key)) {
+ enable = oldComponentDebug.get(key).is_enable();
+ samplingpct = oldComponentDebug.get(key).get_samplingpct();
+ }
+ if (newComponentDebug.containsKey(key)) {
+ enable = newComponentDebug.get(key).is_enable();
+ samplingpct += newComponentDebug.get(key).get_samplingpct();
+ }
+ DebugOptions debugOptions = new DebugOptions();
+ debugOptions.set_enable(enable);
+ debugOptions.set_samplingpct(samplingpct);
+ ComponentDebug.put(key, debugOptions);
+ }
+ if (ComponentDebug.size() > 0) {
+ newElems.set_component_debug(ComponentDebug);
+ }
+
+ if (StringUtils.isBlank(newElems.get_name())) {
+ newElems.set_name(stormBase.get_name());
+ }
+ if (newElems.get_status() == null) {
+ newElems.set_status(stormBase.get_status());
+ }
+ if (newElems.get_num_workers() == 0) {
+ newElems.set_num_workers(stormBase.get_num_workers());
+ }
+ if (newElems.get_launch_time_secs() == 0) {
+ newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
+ }
+ if (StringUtils.isBlank(newElems.get_owner())) {
+ newElems.set_owner(stormBase.get_owner());
+ }
+ if (newElems.get_topology_action_options() == null) {
+ newElems.set_topology_action_options(stormBase.get_topology_action_options());
+ }
+ if (newElems.get_status() == null) {
+ newElems.set_status(stormBase.get_status());
+ }
+ stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), acls);
+ }
+
+ @Override
+ public void removeStormBase(String stormId) {
+ stateStorage.delete_node(ClusterUtils.stormPath(stormId));
+ }
+
+ @Override
+ public void setAssignment(String stormId, Assignment info) {
+ stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), acls);
+ }
+
+ @Override
+ public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) {
+ String path = ClusterUtils.blobstorePath(key) + ClusterUtils.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo;
+ LOG.info("set-path: {}", path);
+ stateStorage.mkdirs(ClusterUtils.blobstorePath(key), acls);
+ stateStorage.delete_node_blobstore(ClusterUtils.blobstorePath(key), nimbusInfo.toHostPortString());
+ stateStorage.set_ephemeral_node(path, null, acls);
+ }
+
+ @Override
+ public List<String> activeKeys() {
+ return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, false);
+ }
+
+ // blobstore state
+ @Override
+ public List<String> blobstore(IFn callback) {
+ if (callback != null) {
+ blobstoreCallback.set(callback);
+ }
+ stateStorage.sync_path(ClusterUtils.BLOBSTORE_SUBTREE);
+ return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, callback != null);
+
+ }
+
+ @Override
+ public void removeStorm(String stormId) {
+ stateStorage.delete_node(ClusterUtils.assignmentPath(stormId));
+ stateStorage.delete_node(ClusterUtils.credentialsPath(stormId));
+ stateStorage.delete_node(ClusterUtils.logConfigPath(stormId));
+ stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId));
+ removeStormBase(stormId);
+ }
+
+ @Override
+ public void removeBlobstoreKey(String blobKey) {
+ LOG.debug("remove key {}", blobKey);
+ stateStorage.delete_node(ClusterUtils.blobstorePath(blobKey));
+ }
+
+ @Override
+ public void removeKeyVersion(String blobKey) {
+ stateStorage.delete_node(ClusterUtils.blobstoreMaxKeySequenceNumberPath(blobKey));
+ }
+
+ @Override
+ public void reportError(String stormId, String componentId, String node, Long port, String error) {
+
+ String path = ClusterUtils.errorPath(stormId, componentId);
+ String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
+ ErrorInfo errorInfo = new ErrorInfo(error, Time.currentTimeSecs());
+ errorInfo.set_host(node);
+ errorInfo.set_port(port.intValue());
+ byte[] serData = Utils.serialize(errorInfo);
+ stateStorage.mkdirs(path, acls);
+ stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, acls);
+ stateStorage.set_data(lastErrorPath, serData, acls);
+ List<String> childrens = stateStorage.get_children(path, false);
+
+ Collections.sort(childrens, new Comparator<String>() {
+ public int compare(String arg0, String arg1) {
+ return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
+ }
+ });
+
+ while (childrens.size() > 10) {
+ stateStorage.delete_node(path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0));
+ }
+ }
+
+ @Override
+ public List<ErrorInfo> errors(String stormId, String componentId) {
+ List<ErrorInfo> errorInfos = new ArrayList<>();
+ try {
+ String path = ClusterUtils.errorPath(stormId, componentId);
+ if (stateStorage.node_exists(path, false)) {
+ List<String> childrens = stateStorage.get_children(path, false);
+ for (String child : childrens) {
+ String childPath = path + ClusterUtils.ZK_SEPERATOR + child;
+ ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class);
+ if (errorInfo != null)
+ errorInfos.add(errorInfo);
+ }
+ }
+ Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
+ public int compare(ErrorInfo arg0, ErrorInfo arg1) {
+ return -Integer.compare(arg0.get_error_time_secs(), arg1.get_error_time_secs());
+ }
+ });
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+
+ return errorInfos;
+ }
+
+ @Override
+ public ErrorInfo lastError(String stormId, String componentId) {
+
+ String path = ClusterUtils.lastErrorPath(stormId, componentId);
+ if (stateStorage.node_exists(path, false)) {
+ ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), ErrorInfo.class);
+ return errorInfo;
+ }
+
+ return null;
+ }
+
+ @Override
+ public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException {
+ List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
+ String path = ClusterUtils.credentialsPath(stormId);
+ stateStorage.set_data(path, Utils.serialize(creds), aclList);
+
+ }
+
+ @Override
+ public Credentials credentials(String stormId, IFn callback) {
+ if (callback != null) {
+ credentialsCallback.put(stormId, callback);
+ }
+ String path = ClusterUtils.credentialsPath(stormId);
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, callback != null), Credentials.class);
+
+ }
+
+ @Override
+ public void disconnect() {
+ stateStorage.unregister(stateId);
+ if (solo)
+ stateStorage.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java
deleted file mode 100644
index 3a4205b..0000000
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormZkClusterState.java
+++ /dev/null
@@ -1,683 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.cluster;
-
-import clojure.lang.APersistentMap;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.RT;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.*;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.storm.callback.Callback;
-import org.apache.storm.generated.*;
-import org.apache.storm.nimbus.NimbusInfo;
-import org.apache.storm.utils.Time;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.zookeeper.Zookeeper;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.ACL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.security.NoSuchAlgorithmException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class StormZkClusterState implements StormClusterState {
-
- private static Logger LOG = LoggerFactory.getLogger(StormZkClusterState.class);
-
- private ClusterState clusterState;
-
- private ConcurrentHashMap<String, IFn> assignmentInfoCallback;
- private ConcurrentHashMap<String, IFn> assignmentInfoWithVersionCallback;
- private ConcurrentHashMap<String, IFn> assignmentVersionCallback;
- private AtomicReference<IFn> supervisorsCallback;
- // we want to reigister a topo directory getChildren callback for all workers of this dir
- private ConcurrentHashMap<String, IFn> backPressureCallback;
- private AtomicReference<IFn> assignmentsCallback;
- private ConcurrentHashMap<String, IFn> stormBaseCallback;
- private AtomicReference<IFn> blobstoreCallback;
- private ConcurrentHashMap<String, IFn> credentialsCallback;
- private ConcurrentHashMap<String, IFn> logConfigCallback;
-
- private List<ACL> acls;
- private String stateId;
- private boolean solo;
-
- public StormZkClusterState(Object clusterState, List<ACL> acls, ClusterStateContext context) throws Exception {
-
- if (clusterState instanceof ClusterState) {
- solo = false;
- this.clusterState = (ClusterState) clusterState;
- } else {
-
- solo = true;
- this.clusterState = new DistributedClusterState((Map) clusterState, (Map) clusterState, acls, context);
- }
-
- assignmentInfoCallback = new ConcurrentHashMap<>();
- assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
- assignmentVersionCallback = new ConcurrentHashMap<>();
- supervisorsCallback = new AtomicReference<>();
- backPressureCallback = new ConcurrentHashMap<>();
- assignmentsCallback = new AtomicReference<>();
- stormBaseCallback = new ConcurrentHashMap<>();
- credentialsCallback = new ConcurrentHashMap<>();
- logConfigCallback = new ConcurrentHashMap<>();
- blobstoreCallback = new AtomicReference<>();
-
- stateId = this.clusterState.register(new Callback() {
-
- public <T> Object execute(T... args) {
- if (args == null) {
- LOG.warn("Input args is null");
- return null;
- } else if (args.length < 2) {
- LOG.warn("Input args is invalid, args length:" + args.length);
- return null;
- }
- String path = (String) args[1];
-
- List<String> toks = Zookeeper.tokenizePath(path);
- int size = toks.size();
- if (size >= 1) {
- String params = null;
- String root = toks.get(0);
- IFn fn = null;
- if (root.equals(Cluster.ASSIGNMENTS_ROOT)) {
- if (size == 1) {
- // set null and get the old value
- issueCallback(assignmentsCallback);
- } else {
- issueMapCallback(assignmentInfoCallback, toks.get(1));
- issueMapCallback(assignmentVersionCallback, toks.get(1));
- issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
- }
-
- } else if (root.equals(Cluster.SUPERVISORS_ROOT)) {
- issueCallback(supervisorsCallback);
- } else if (root.equals(Cluster.BLOBSTORE_ROOT)) {
- issueCallback(blobstoreCallback);
- } else if (root.equals(Cluster.STORMS_ROOT) && size > 1) {
- issueMapCallback(stormBaseCallback, toks.get(1));
- } else if (root.equals(Cluster.CREDENTIALS_ROOT) && size > 1) {
- issueMapCallback(credentialsCallback, toks.get(1));
- } else if (root.equals(Cluster.LOGCONFIG_ROOT) && size > 1) {
- issueMapCallback(logConfigCallback, toks.get(1));
- } else if (root.equals(Cluster.BACKPRESSURE_ROOT) && size > 1) {
- issueMapCallback(logConfigCallback, toks.get(1));
- } else {
- LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
- Runtime.getRuntime().exit(30);
- }
-
- }
-
- return null;
- }
-
- });
-
- String[] pathlist = { Cluster.ASSIGNMENTS_SUBTREE, Cluster.STORMS_SUBTREE, Cluster.SUPERVISORS_SUBTREE, Cluster.WORKERBEATS_SUBTREE,
- Cluster.ERRORS_SUBTREE, Cluster.BLOBSTORE_SUBTREE, Cluster.NIMBUSES_SUBTREE, Cluster.LOGCONFIG_SUBTREE };
- for (String path : pathlist) {
- this.clusterState.mkdirs(path, acls);
- }
-
- }
-
- protected void issueCallback(AtomicReference<IFn> cb) {
- IFn callback = cb.getAndSet(null);
- if (callback != null)
- callback.invoke();
- }
-
- protected void issueMapCallback(ConcurrentHashMap<String, IFn> callbackConcurrentHashMap, String key) {
- IFn callback = callbackConcurrentHashMap.remove(key);
- if (callback != null)
- callback.invoke();
- }
-
- @Override
- public List<String> assignments(IFn callback) {
- if (callback != null) {
- assignmentsCallback.set(callback);
- }
- return clusterState.get_children(Cluster.ASSIGNMENTS_SUBTREE, callback != null);
- }
-
- @Override
- public Assignment assignmentInfo(String stormId, IFn callback) {
- if (callback != null) {
- assignmentInfoCallback.put(stormId, callback);
- }
- byte[] serialized = clusterState.get_data(Cluster.assignmentPath(stormId), callback != null);
- return Cluster.maybeDeserialize(serialized, Assignment.class);
- }
-
- @Override
- public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback) {
- if (callback != null) {
- assignmentInfoWithVersionCallback.put(stormId, callback);
- }
- APersistentMap aPersistentMap = clusterState.get_data_with_version(Cluster.assignmentPath(stormId), callback != null);
- Assignment assignment = Cluster.maybeDeserialize((byte[]) aPersistentMap.get("data"), Assignment.class);
- Integer version = (Integer) aPersistentMap.get("version");
- APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version });
- return map;
- }
-
- @Override
- public Integer assignmentVersion(String stormId, IFn callback) throws Exception {
- if (callback != null) {
- assignmentVersionCallback.put(stormId, callback);
- }
- return clusterState.get_version(Cluster.assignmentPath(stormId), callback != null);
- }
-
- // blobstore state
- @Override
- public List<String> blobstoreInfo(String blobKey) {
- String path = Cluster.blobstorePath(blobKey);
- clusterState.sync_path(path);
- return clusterState.get_children(path, false);
- }
-
- @Override
- public List nimbuses() {
- List<NimbusSummary> nimbusSummaries = new ArrayList<>();
- List<String> nimbusIds = clusterState.get_children(Cluster.NIMBUSES_SUBTREE, false);
- for (String nimbusId : nimbusIds) {
- byte[] serialized = clusterState.get_data(Cluster.nimbusPath(nimbusId), false);
- NimbusSummary nimbusSummary = Cluster.maybeDeserialize(serialized, NimbusSummary.class);
- nimbusSummaries.add(nimbusSummary);
- }
- return nimbusSummaries;
- }
-
- @Override
- public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
- // explicit delete for ephmeral node to ensure this session creates the entry.
- clusterState.delete_node(Cluster.nimbusPath(nimbusId));
- clusterState.add_listener(new ConnectionStateListener() {
- @Override
- public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
- LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
- if (connectionState.equals(ConnectionState.RECONNECTED)) {
- LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
- clusterState.set_ephemeral_node(Cluster.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
- }
-
- }
- });
-
- clusterState.set_ephemeral_node(Cluster.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
- }
-
- @Override
- public List<String> activeStorms() {
- return clusterState.get_children(Cluster.STORMS_SUBTREE, false);
- }
-
- @Override
- public StormBase stormBase(String stormId, IFn callback) {
- if (callback != null) {
- stormBaseCallback.put(stormId, callback);
- }
- return Cluster.maybeDeserialize(clusterState.get_data(Cluster.stormPath(stormId), callback != null), StormBase.class);
- }
-
- @Override
- public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
- byte[] bytes = clusterState.get_worker_hb(Cluster.workerbeatPath(stormId, node, port), false);
- if (bytes != null) {
- return Cluster.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
- }
- return null;
- }
-
- @Override
- public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift) {
- List<ProfileRequest> requests = new ArrayList<>();
- List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId, isThrift);
- for (ProfileRequest profileRequest : profileRequests) {
- NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
- if (nodeInfo1.equals(nodeInfo))
- requests.add(profileRequest);
- }
- return requests;
- }
-
- @Override
- public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift) {
- List<ProfileRequest> profileRequests = new ArrayList<>();
- String path = Cluster.profilerConfigPath(stormId);
- if (clusterState.node_exists(path, false)) {
- List<String> strs = clusterState.get_children(path, false);
- for (String str : strs) {
- String childPath = path + Cluster.ZK_SEPERATOR + str;
- byte[] raw = clusterState.get_data(childPath, false);
- ProfileRequest request = Cluster.maybeDeserialize(raw, ProfileRequest.class);
- if (request != null)
- profileRequests.add(request);
- }
- }
- return profileRequests;
- }
-
- @Override
- public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
- ProfileAction profileAction = profileRequest.get_action();
- String host = profileRequest.get_nodeInfo().get_node();
- Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
- String path = Cluster.profilerConfigPath(stormId, host, port, profileAction);
- clusterState.set_data(path, Utils.serialize(profileRequest), acls);
- }
-
- @Override
- public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
- ProfileAction profileAction = profileRequest.get_action();
- String host = profileRequest.get_nodeInfo().get_node();
- Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
- String path = Cluster.profilerConfigPath(stormId, host, port, profileAction);
- clusterState.delete_node(path);
- }
-
- // need to take executor->node+port in explicitly so that we don't run into a situation where a
- // long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
- // with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
- // we avoid situations like that
- @Override
- public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
- Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhbs = new HashMap<>();
-
- LOG.info(executorNodePort.toString());
- Map<NodeInfo, List<List<Long>>> nodePortExecutors = Cluster.reverseMap(executorNodePort);
- LOG.info(nodePortExecutors.toString());
-
- for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
-
- String node = entry.getKey().get_node();
- Long port = entry.getKey().get_port_iterator().next();
- ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
- List<ExecutorInfo> executorInfoList = new ArrayList<>();
- for (List<Long> list : entry.getValue()) {
- executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
- }
- executorWhbs.putAll(Cluster.convertExecutorBeats(executorInfoList, whb));
- }
- return executorWhbs;
- }
-
- @Override
- public List<String> supervisors(IFn callback) {
- if (callback != null) {
- supervisorsCallback.set(callback);
- }
- return clusterState.get_children(Cluster.SUPERVISORS_SUBTREE, callback != null);
- }
-
- @Override
- public SupervisorInfo supervisorInfo(String supervisorId) {
- String path = Cluster.supervisorPath(supervisorId);
- return Cluster.maybeDeserialize(clusterState.get_data(path, false), SupervisorInfo.class);
- }
-
- @Override
- public void setupHeatbeats(String stormId) {
- clusterState.mkdirs(Cluster.workerbeatStormRoot(stormId), acls);
- }
-
- @Override
- public void teardownHeartbeats(String stormId) {
- try {
- clusterState.delete_worker_hb(Cluster.workerbeatStormRoot(stormId));
- } catch (Exception e) {
- if (Zookeeper.exceptionCause(KeeperException.class, e)) {
- // do nothing
- LOG.warn("Could not teardown heartbeats for {}.", stormId);
- } else {
- throw e;
- }
- }
- }
-
- @Override
- public void teardownTopologyErrors(String stormId) {
- try {
- clusterState.delete_node(Cluster.errorStormRoot(stormId));
- } catch (Exception e) {
- if (Zookeeper.exceptionCause(KeeperException.class, e)) {
- // do nothing
- LOG.warn("Could not teardown errors for {}.", stormId);
- } else {
- throw e;
- }
- }
- }
-
- @Override
- public List<String> heartbeatStorms() {
- return clusterState.get_worker_hb_children(Cluster.WORKERBEATS_SUBTREE, false);
- }
-
- @Override
- public List<String> errorTopologies() {
- return clusterState.get_children(Cluster.ERRORS_SUBTREE, false);
- }
-
- @Override
- public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
- clusterState.set_data(Cluster.logConfigPath(stormId), Utils.serialize(logConfig), acls);
- }
-
- @Override
- public LogConfig topologyLogConfig(String stormId, IFn cb) {
- String path = Cluster.logConfigPath(stormId);
- return Cluster.maybeDeserialize(clusterState.get_data(path, cb != null), LogConfig.class);
- }
-
- @Override
- public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
- if (info != null) {
- String path = Cluster.workerbeatPath(stormId, node, port);
- clusterState.set_worker_hb(path, Utils.serialize(info), acls);
- }
- }
-
- @Override
- public void removeWorkerHeartbeat(String stormId, String node, Long port) {
- String path = Cluster.workerbeatPath(stormId, node, port);
- clusterState.delete_worker_hb(path);
- }
-
- @Override
- public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
- String path = Cluster.supervisorPath(supervisorId);
- clusterState.set_ephemeral_node(path, Utils.serialize(info), acls);
- }
-
- // if znode exists and to be not on?, delete; if exists and on?, do nothing;
- // if not exists and to be on?, create; if not exists and not on?, do nothing;
- @Override
- public void workerBackpressure(String stormId, String node, Long port, boolean on) {
- String path = Cluster.backpressurePath(stormId, node, port);
- boolean existed = clusterState.node_exists(path, false);
- if (existed) {
- if (on == false)
- clusterState.delete_node(path);
-
- } else {
- if (on == true) {
- clusterState.set_ephemeral_node(path, null, acls);
- }
- }
- }
-
- // if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not.
- @Override
- public boolean topologyBackpressure(String stormId, IFn callback) {
- if (callback != null) {
- backPressureCallback.put(stormId, callback);
- }
- String path = Cluster.backpressureStormRoot(stormId);
- List<String> childrens = clusterState.get_children(path, callback != null);
- return childrens.size() > 0;
-
- }
-
- @Override
- public void setupBackpressure(String stormId) {
- clusterState.mkdirs(Cluster.backpressureStormRoot(stormId), acls);
- }
-
- @Override
- public void removeWorkerBackpressure(String stormId, String node, Long port) {
- clusterState.delete_node(Cluster.backpressurePath(stormId, node, port));
- }
-
- @Override
- public void activateStorm(String stormId, StormBase stormBase) {
- String path = Cluster.stormPath(stormId);
- clusterState.set_data(path, Utils.serialize(stormBase), acls);
- }
-
- // maybe exit some questions for updateStorm
- @Override
- public void updateStorm(String stormId, StormBase newElems) {
-
- StormBase stormBase = stormBase(stormId, null);
- if (stormBase.get_component_executors() != null) {
-
- Map<String, Integer> newComponentExecutors = new HashMap<>();
- Map<String, Integer> componentExecutors = newElems.get_component_executors();
- //componentExecutors maybe be APersistentMap, which don't support put
- for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
- newComponentExecutors.put(entry.getKey(), entry.getValue());
- }
- for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
- if (!componentExecutors.containsKey(entry.getKey())) {
- newComponentExecutors.put(entry.getKey(), entry.getValue());
- }
- }
- if (newComponentExecutors.size() > 0)
- newElems.set_component_executors(newComponentExecutors);
- }
-
- Map<String, DebugOptions> ComponentDebug = new HashMap<>();
- Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
-
- Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
-
- Set<String> debugOptionsKeys = oldComponentDebug.keySet();
- debugOptionsKeys.addAll(newComponentDebug.keySet());
- for (String key : debugOptionsKeys) {
- boolean enable = false;
- double samplingpct = 0;
- if (oldComponentDebug.containsKey(key)) {
- enable = oldComponentDebug.get(key).is_enable();
- samplingpct = oldComponentDebug.get(key).get_samplingpct();
- }
- if (newComponentDebug.containsKey(key)) {
- enable = newComponentDebug.get(key).is_enable();
- samplingpct += newComponentDebug.get(key).get_samplingpct();
- }
- DebugOptions debugOptions = new DebugOptions();
- debugOptions.set_enable(enable);
- debugOptions.set_samplingpct(samplingpct);
- ComponentDebug.put(key, debugOptions);
- }
- if (ComponentDebug.size() > 0) {
- newElems.set_component_debug(ComponentDebug);
- }
-
-
- if (StringUtils.isBlank(newElems.get_name())) {
- newElems.set_name(stormBase.get_name());
- }
- if (newElems.get_status() == null){
- newElems.set_status(stormBase.get_status());
- }
- if (newElems.get_num_workers() == 0){
- newElems.set_num_workers(stormBase.get_num_workers());
- }
- if (newElems.get_launch_time_secs() == 0) {
- newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
- }
- if (StringUtils.isBlank(newElems.get_owner())) {
- newElems.set_owner(stormBase.get_owner());
- }
- if (newElems.get_topology_action_options() == null) {
- newElems.set_topology_action_options(stormBase.get_topology_action_options());
- }
- if (newElems.get_status() == null) {
- newElems.set_status(stormBase.get_status());
- }
- clusterState.set_data(Cluster.stormPath(stormId), Utils.serialize(newElems), acls);
- }
-
- @Override
- public void removeStormBase(String stormId) {
- clusterState.delete_node(Cluster.stormPath(stormId));
- }
-
- @Override
- public void setAssignment(String stormId, Assignment info) {
- clusterState.set_data(Cluster.assignmentPath(stormId), Utils.serialize(info), acls);
- }
-
- @Override
- public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) {
- String path = Cluster.blobstorePath(key) + Cluster.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo;
- LOG.info("set-path: {}", path);
- clusterState.mkdirs(Cluster.blobstorePath(key), acls);
- clusterState.delete_node_blobstore(Cluster.blobstorePath(key), nimbusInfo.toHostPortString());
- clusterState.set_ephemeral_node(path, null, acls);
- }
-
- @Override
- public List<String> activeKeys() {
- return clusterState.get_children(Cluster.BLOBSTORE_SUBTREE, false);
- }
-
- // blobstore state
- @Override
- public List<String> blobstore(IFn callback) {
- if (callback != null) {
- blobstoreCallback.set(callback);
- }
- clusterState.sync_path(Cluster.BLOBSTORE_SUBTREE);
- return clusterState.get_children(Cluster.BLOBSTORE_SUBTREE, callback != null);
-
- }
-
- @Override
- public void removeStorm(String stormId) {
- clusterState.delete_node(Cluster.assignmentPath(stormId));
- clusterState.delete_node(Cluster.credentialsPath(stormId));
- clusterState.delete_node(Cluster.logConfigPath(stormId));
- clusterState.delete_node(Cluster.profilerConfigPath(stormId));
- removeStormBase(stormId);
- }
-
- @Override
- public void removeBlobstoreKey(String blobKey) {
- LOG.debug("remove key {}", blobKey);
- clusterState.delete_node(Cluster.blobstorePath(blobKey));
- }
-
- @Override
- public void removeKeyVersion(String blobKey) {
- clusterState.delete_node(Cluster.blobstoreMaxKeySequenceNumberPath(blobKey));
- }
-
- @Override
- public void reportError(String stormId, String componentId, String node, Integer port, String error) {
-
- try {
- String path = Cluster.errorPath(stormId, componentId);
- String lastErrorPath = Cluster.lastErrorPath(stormId, componentId);
- ErrorInfo errorInfo = new ErrorInfo(error, Time.currentTimeSecs());
- errorInfo.set_host(node);
- errorInfo.set_port(port.intValue());
- byte[] serData = Utils.serialize(errorInfo);
- clusterState.mkdirs(path, acls);
- clusterState.create_sequential(path + Cluster.ZK_SEPERATOR + "e", serData, acls);
- clusterState.set_data(lastErrorPath, serData, acls);
- List<String> childrens = clusterState.get_children(path, false);
-
- Collections.sort(childrens);
-
- while (childrens.size() >= 10) {
- clusterState.delete_node(path + Cluster.ZK_SEPERATOR + childrens.remove(0));
- }
- } catch (UnsupportedEncodingException e) {
- throw Utils.wrapInRuntime(e);
- }
- }
-
- @Override
- public List<ErrorInfo> errors(String stormId, String componentId) {
- List<ErrorInfo> errorInfos = new ArrayList<>();
- try {
- String path = Cluster.errorPath(stormId, componentId);
- if (clusterState.node_exists(path, false)) {
- List<String> childrens = clusterState.get_children(path, false);
- for (String child : childrens) {
- String childPath = path + Cluster.ZK_SEPERATOR + child;
- ErrorInfo errorInfo = Cluster.maybeDeserialize(clusterState.get_data(childPath, false), ErrorInfo.class);
- if (errorInfo != null)
- errorInfos.add(errorInfo);
- }
- }
- Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
- public int compare(ErrorInfo arg0, ErrorInfo arg1) {
- return Integer.compare(arg0.get_error_time_secs(), arg1.get_error_time_secs());
- }
- });
- } catch (Exception e) {
- throw Utils.wrapInRuntime(e);
- }
-
- return errorInfos;
- }
-
- @Override
- public ErrorInfo lastError(String stormId, String componentId) {
- try {
- String path = Cluster.lastErrorPath(stormId, componentId);
- if (clusterState.node_exists(path, false)) {
- ErrorInfo errorInfo = Cluster.maybeDeserialize(clusterState.get_data(path, false), ErrorInfo.class);
- return errorInfo;
- }
- } catch (UnsupportedEncodingException e) {
- throw Utils.wrapInRuntime(e);
- }
- return null;
- }
-
- @Override
- public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException {
- List<ACL> aclList = Cluster.mkTopoOnlyAcls(topoConf);
- String path = Cluster.credentialsPath(stormId);
- clusterState.set_data(path, Utils.serialize(creds), aclList);
-
- }
-
- @Override
- public Credentials credentials(String stormId, IFn callback) {
- if (callback != null) {
- credentialsCallback.put(stormId, callback);
- }
- String path = Cluster.credentialsPath(stormId);
- return Cluster.maybeDeserialize(clusterState.get_data(path, callback != null), Credentials.class);
-
- }
-
- @Override
- public void disconnect() {
- clusterState.unregister(stateId);
- if (solo)
- clusterState.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
new file mode 100644
index 0000000..8ac0adc
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.APersistentMap;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.Config;
+import org.apache.storm.callback.DefaultWatcherCallBack;
+import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ZKStateStorage implements StateStorage {
+
+ private static Logger LOG = LoggerFactory.getLogger(ZKStateStorage.class);
+
+ private ConcurrentHashMap<String, ZKStateChangedCallback> callbacks = new ConcurrentHashMap<String, ZKStateChangedCallback>();
+ private CuratorFramework zkWriter;
+ private CuratorFramework zkReader;
+ private AtomicBoolean active;
+
+ private boolean isNimbus;
+ private Map authConf;
+ private Map<Object, Object> conf;
+
+ public ZKStateStorage(Map<Object, Object> conf, Map authConf, List<ACL> acls, ClusterStateContext context) throws Exception {
+ this.conf = conf;
+ this.authConf = authConf;
+ if (context.getDaemonType().equals(DaemonType.NIMBUS))
+ this.isNimbus = true;
+
+ // just mkdir STORM_ZOOKEEPER_ROOT dir
+ CuratorFramework zkTemp = mkZk();
+ String rootPath = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
+ Zookeeper.mkdirs(zkTemp, rootPath, acls);
+ zkTemp.close();
+
+ active = new AtomicBoolean(true);
+ zkWriter = mkZk(new WatcherCallBack() {
+ @Override
+ public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+ if (active.get()) {
+ if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
+ LOG.warn("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path);
+ } else {
+ LOG.info("Received event {} : {} : {}", state, type, path);
+ }
+
+ if (!type.equals(Watcher.Event.EventType.None)) {
+ for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) {
+ ZKStateChangedCallback fn = e.getValue();
+ fn.changed(type, path);
+ }
+ }
+ }
+ }
+ });
+ if (isNimbus) {
+ zkReader = mkZk(new WatcherCallBack() {
+ @Override
+ public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) {
+ if (active.get()) {
+ if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
+ LOG.warn("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path);
+ } else {
+ LOG.debug("Received event {} : {} : {}", state, type, path);
+ }
+
+ if (!type.equals(Watcher.Event.EventType.None)) {
+ for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) {
+ ZKStateChangedCallback fn = e.getValue();
+ fn.changed(type, path);
+ }
+ }
+ }
+ }
+ });
+ } else {
+ zkReader = zkWriter;
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private CuratorFramework mkZk() throws IOException {
+ return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT), "",
+ new DefaultWatcherCallBack(), authConf);
+ }
+
+ @SuppressWarnings("unchecked")
+ private CuratorFramework mkZk(WatcherCallBack watcher) throws NumberFormatException, IOException {
+ return Zookeeper.mkClient(conf, (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS), conf.get(Config.STORM_ZOOKEEPER_PORT),
+ String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher, authConf);
+ }
+
+ @Override
+ public void delete_node_blobstore(String path, String nimbusHostPortInfo) {
+
+ }
+
+ @Override
+ public String register(ZKStateChangedCallback callback) {
+ String id = UUID.randomUUID().toString();
+ this.callbacks.put(id, callback);
+ return id;
+ }
+
+ @Override
+ public void unregister(String id) {
+ this.callbacks.remove(id);
+ }
+
+ @Override
+ public String create_sequential(String path, byte[] data, List<ACL> acls) {
+ return Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL_SEQUENTIAL, acls);
+ }
+
+ @Override
+ public void mkdirs(String path, List<ACL> acls) {
+ Zookeeper.mkdirs(zkWriter, path, acls);
+ }
+
+ @Override
+ public void delete_node(String path) {
+ Zookeeper.deleteNode(zkWriter, path);
+ }
+
+ @Override
+ public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
+ Zookeeper.mkdirs(zkWriter, parentPath(path), acls);
+ if (Zookeeper.exists(zkWriter, path, false)) {
+ try {
+ Zookeeper.setData(zkWriter, path, data);
+ } catch (Exception e) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
+ Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+ } else {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ } else {
+ Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls);
+ }
+ }
+
+ @Override
+ public Integer get_version(String path, boolean watch) throws Exception {
+ Integer ret = Zookeeper.getVersion(zkReader, path, watch);
+ return ret;
+ }
+
+ @Override
+ public boolean node_exists(String path, boolean watch) {
+ return Zookeeper.existsNode(zkWriter, path, watch);
+ }
+
+ @Override
+ public List<String> get_children(String path, boolean watch) {
+ return Zookeeper.getChildren(zkReader, path, watch);
+ }
+
+ @Override
+ public void close() {
+ this.active.set(false);
+ zkWriter.close();
+ if (isNimbus) {
+ zkReader.close();
+ }
+ }
+
+ @Override
+ public void set_data(String path, byte[] data, List<ACL> acls) {
+ if (Zookeeper.exists(zkWriter, path, false)) {
+ Zookeeper.setData(zkWriter, path, data);
+ } else {
+ Zookeeper.mkdirs(zkWriter, parentPath(path), acls);
+ Zookeeper.createNode(zkWriter, path, data, CreateMode.PERSISTENT, acls);
+ }
+ }
+
+ @Override
+ public byte[] get_data(String path, boolean watch) {
+ byte[] ret = null;
+
+ ret = Zookeeper.getData(zkReader, path, watch);
+
+ return ret;
+ }
+
+ @Override
+ public APersistentMap get_data_with_version(String path, boolean watch) {
+ return Zookeeper.getDataWithVersion(zkReader, path, watch);
+ }
+
+ @Override
+ public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
+ set_data(path, data, acls);
+ }
+
+ @Override
+ public byte[] get_worker_hb(String path, boolean watch) {
+ return Zookeeper.getData(zkReader, path, watch);
+ }
+
+ @Override
+ public List<String> get_worker_hb_children(String path, boolean watch) {
+ return get_children(path, watch);
+ }
+
+ @Override
+ public void delete_worker_hb(String path) {
+ delete_node(path);
+ }
+
+ @Override
+ public void add_listener(final ConnectionStateListener listener) {
+ Zookeeper.addListener(zkReader, new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+ listener.stateChanged(curatorFramework, connectionState);
+ }
+ });
+ }
+
+ @Override
+ public void sync_path(String path) {
+ Zookeeper.syncPath(zkWriter, path);
+ }
+
+ // To be remove when finished port Util.clj
+ public static String parentPath(String path) {
+ List<String> toks = Zookeeper.tokenizePath(path);
+ int size = toks.size();
+ if (size > 0) {
+ toks.remove(size - 1);
+ }
+ return Zookeeper.toksToPath(toks);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
new file mode 100644
index 0000000..19b04f2
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.APersistentMap;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+
+public class ZKStateStorageFactory implements StateStorageFactory{
+
+ @Override
+ public StateStorage mkState(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) {
+ try {
+ return new ZKStateStorage(config, auth_conf, acls, context);
+ }catch (Exception e){
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java
index 5d67a54..2f1440c 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedCluster.java
@@ -16,16 +16,16 @@
*/
package org.apache.storm.testing.staticmocking;
-import org.apache.storm.cluster.Cluster;
+import org.apache.storm.cluster.ClusterUtils;
public class MockedCluster implements AutoCloseable {
- public MockedCluster(Cluster inst) {
- Cluster.setInstance(inst);
+ public MockedCluster(ClusterUtils inst) {
+ ClusterUtils.setInstance(inst);
}
@Override
public void close() throws Exception {
- Cluster.resetInstance();
+ ClusterUtils.resetInstance();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index f1c7f32..c280515 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -86,19 +86,23 @@ public class Zookeeper {
_instance = INSTANCE;
}
- public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root) {
- return mkClient(conf, servers, port, root, new DefaultWatcherCallBack());
+ public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root) {
+ return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack());
}
- public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, Map authConf) {
- return mkClient(conf, servers, port, "", new DefaultWatcherCallBack(), authConf);
+ public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, Map authConf) {
+ return mkClientImpl(conf, servers, port, "", new DefaultWatcherCallBack(), authConf);
}
- public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, Map authConf) {
- return mkClient(conf, servers, port, root, new DefaultWatcherCallBack(), authConf);
+ public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, Map authConf) {
+ return mkClientImpl(conf, servers, port, root, new DefaultWatcherCallBack(), authConf);
}
public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
+ return _instance.mkClientImpl(conf, servers, port, root, watcher, authConf);
+ }
+
+ public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher, Map authConf) {
CuratorFramework fk;
if (authConf != null) {
fk = Utils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf));
@@ -124,8 +128,8 @@ public class Zookeeper {
*
* @return
*/
- public static CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) {
- return mkClient(conf, servers, port, root, watcher, null);
+ public CuratorFramework mkClientImpl(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) {
+ return mkClientImpl(conf, servers, port, root, watcher, null);
}
public static String createNode(CuratorFramework zk, String path, byte[] data, org.apache.zookeeper.CreateMode mode, List<ACL> acls) {
@@ -347,7 +351,7 @@ public class Zookeeper {
protected ILeaderElector zkLeaderElectorImpl(Map conf) throws UnknownHostException {
List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
- CuratorFramework zk = mkClient(conf, servers, port, "", conf);
+ CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";
String id = NimbusInfo.fromConf(conf).toHostPortString();
AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
index d374511..d4fab3f 100644
--- a/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
+++ b/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
@@ -21,7 +21,7 @@
(:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount
TestAggregatesCounter TestConfBolt AckFailMapTracker AckTracker TestPlannerSpout])
(:import [org.apache.storm.tuple Fields])
- (:import [org.apache.storm.cluster StormZkClusterState])
+ (:import [org.apache.storm.cluster StormClusterStateImpl])
(:use [org.apache.storm testing config clojure util converter])
(:use [org.apache.storm.daemon common])
(:require [org.apache.storm [thrift :as thrift]]))
@@ -576,7 +576,7 @@
(:topology tracked))
_ (advance-cluster-time cluster 11)
storm-id (get-storm-id state "test-errors")
- errors-count (fn [] (count (clojurify-error (.errors state storm-id "2"))))]
+ errors-count (fn [] (count (.errors state storm-id "2")))]
(is (nil? (clojurify-error (.lastError state storm-id "2"))))
http://git-wip-us.apache.org/repos/asf/storm/blob/55b86ca4/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj
index d0b9882..fa34355 100644
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@ -23,9 +23,10 @@
(:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
(:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo ConfigUtils])
- (:import [org.apache.storm.cluster ClusterState DistributedClusterState ClusterStateContext StormZkClusterState])
+ (:import [org.apache.storm.cluster StateStorage ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
(:import [org.apache.storm.zookeeper Zookeeper])
- (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
+ (:import [org.apache.storm.callback ZKStateChangedCallback])
+ (:import [org.apache.storm.testing.staticmocking MockedZookeeper MockedCluster])
(:require [conjure.core])
(:use [conjure core])
(:use [clojure test])
@@ -33,18 +34,18 @@
(defn mk-config [zk-port]
(merge (clojurify-structure (ConfigUtils/readStormConfig))
- {STORM-ZOOKEEPER-PORT zk-port
- STORM-ZOOKEEPER-SERVERS ["localhost"]}))
+ {STORM-ZOOKEEPER-PORT zk-port
+ STORM-ZOOKEEPER-SERVERS ["localhost"]}))
(defn mk-state
([zk-port] (let [conf (mk-config zk-port)]
- (DistributedClusterState. conf conf nil (ClusterStateContext.))))
+ (ClusterUtils/mkDistributedClusterState conf conf nil (ClusterStateContext.))))
([zk-port cb]
- (let [ret (mk-state zk-port)]
- (.register ret cb)
- ret )))
+ (let [ret (mk-state zk-port)]
+ (.register ret cb)
+ ret)))
-(defn mk-storm-state [zk-port] (StormZkClusterState. (mk-config zk-port) nil (ClusterStateContext.)))
+(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) nil (ClusterStateContext.)))
(deftest test-basics
(with-inprocess-zookeeper zk-port
@@ -99,24 +100,27 @@
(defn mk-callback-tester []
(let [last (atom nil)
- cb (fn [type path]
- (reset! last {:type type :path path}))]
+ cb (reify
+ ZKStateChangedCallback
+ (changed
+ [this type path]
+ (reset! last {:type type :path path})))]
[last cb]
))
(defn read-and-reset! [aatom]
(let [time (System/currentTimeMillis)]
- (loop []
- (if-let [val @aatom]
- (do
- (reset! aatom nil)
- val)
- (do
- (when (> (- (System/currentTimeMillis) time) 30000)
- (throw (RuntimeException. "Waited too long for atom to change state")))
- (Thread/sleep 10)
- (recur))
- ))))
+ (loop []
+ (if-let [val @aatom]
+ (do
+ (reset! aatom nil)
+ val)
+ (do
+ (when (> (- (System/currentTimeMillis) time) 30000)
+ (throw (RuntimeException. "Waited too long for atom to change state")))
+ (Thread/sleep 10)
+ (recur))
+ ))))
(deftest test-callbacks
(with-inprocess-zookeeper zk-port
@@ -189,35 +193,35 @@
(is (= #{"storm1" "storm3"} (set (.assignments state nil))))
(is (= assignment2 (clojurify-assignment (.assignmentInfo state "storm1" nil))))
(is (= assignment1 (clojurify-assignment (.assignmentInfo state "storm3" nil))))
-
- (is (= [] (.active-storms state)))
+
+ (is (= [] (.activeStorms state)))
(.activateStorm state "storm1" (thriftify-storm-base base1))
- (is (= ["storm1"] (.active-storms state)))
+ (is (= ["storm1"] (.activeStorms state)))
(is (= base1 (clojurify-storm-base (.stormBase state "storm1" nil))))
(is (= nil (clojurify-storm-base (.stormBase state "storm2" nil))))
(.activateStorm state "storm2" (thriftify-storm-base base2))
(is (= base1 (clojurify-storm-base (.stormBase state "storm1" nil))))
(is (= base2 (clojurify-storm-base (.stormBase state "storm2" nil))))
- (is (= #{"storm1" "storm2"} (set (.active-storms state))))
+ (is (= #{"storm1" "storm2"} (set (.activeStorms state))))
(.removeStormBase state "storm1")
(is (= base2 (clojurify-storm-base (.stormBase state "storm2" nil))))
- (is (= #{"storm2"} (set (.active-storms state))))
+ (is (= #{"storm2"} (set (.activeStorms state))))
(is (nil? (clojurify-crdentials (.credentials state "storm1" nil))))
- (.setCredentials! state "storm1" (thriftify-credentials {"a" "a"}) {})
+ (.setCredentials state "storm1" (thriftify-credentials {"a" "a"}) {})
(is (= {"a" "a"} (clojurify-crdentials (.credentials state "storm1" nil))))
(.setCredentials state "storm1" (thriftify-credentials {"b" "b"}) {})
(is (= {"b" "b"} (clojurify-crdentials (.credentials state "storm1" nil))))
- (is (= [] (.blobstoreInfo state nil)))
- (.setupBlobstore state "key1" nimbusInfo1 "1")
- (is (= ["key1"] (.blobstoreInfo state nil)))
+ (is (= [] (.blobstoreInfo state "")))
+ (.setupBlobstore state "key1" nimbusInfo1 (Integer/parseInt "1"))
+ (is (= ["key1"] (.blobstoreInfo state "")))
(is (= [(str (.toHostPortString nimbusInfo1) "-1")] (.blobstoreInfo state "key1")))
- (.setupBlobstore state "key1" nimbusInfo2 "1")
+ (.setupBlobstore state "key1" nimbusInfo2 (Integer/parseInt "1"))
(is (= #{(str (.toHostPortString nimbusInfo1) "-1")
(str (.toHostPortString nimbusInfo2) "-1")} (set (.blobstoreInfo state "key1"))))
(.removeBlobstoreKey state "key1")
- (is (= [] (.blobstoreInfo state nil)))
+ (is (= [] (.blobstoreInfo state "")))
(is (= [] (.nimbuses state)))
(.addNimbusHost state "nimbus1:port" nimbusSummary1)
@@ -230,11 +234,10 @@
)))
(defn- validate-errors! [state storm-id component errors-list]
- (let [errors (clojurify-error (.errors state storm-id component))]
- ;;(println errors)
+ (let [errors (map clojurify-error (.errors state storm-id component))]
(is (= (count errors) (count errors-list)))
(doseq [[error target] (map vector errors errors-list)]
- (when-not (.contains (:error error) target)
+ (when-not (.contains (:error error) target)
(println target " => " (:error error)))
(is (.contains (:error error) target))
)))
@@ -257,8 +260,9 @@
(.reportError state "a" "2" (local-hostname) 6700 (stringify-error (IllegalArgumentException.)))
(advance-time-secs! 2))
(validate-errors! state "a" "2" (concat (repeat 5 "IllegalArgumentException")
- (repeat 5 "RuntimeException")
- ))
+ (repeat 5 "RuntimeException")
+ ))
+
(.disconnect state)
))))
@@ -285,23 +289,23 @@
(with-inprocess-zookeeper zk-port
(let [builder (Mockito/mock CuratorFrameworkFactory$Builder)
conf (merge
- (mk-config zk-port)
- {STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10
- STORM-ZOOKEEPER-SESSION-TIMEOUT 10
- STORM-ZOOKEEPER-RETRY-INTERVAL 5
- STORM-ZOOKEEPER-RETRY-TIMES 2
- STORM-ZOOKEEPER-RETRY-INTERVAL-CEILING 15
- STORM-ZOOKEEPER-AUTH-SCHEME "digest"
- STORM-ZOOKEEPER-AUTH-PAYLOAD "storm:thisisapoorpassword"})]
+ (mk-config zk-port)
+ {STORM-ZOOKEEPER-CONNECTION-TIMEOUT 10
+ STORM-ZOOKEEPER-SESSION-TIMEOUT 10
+ STORM-ZOOKEEPER-RETRY-INTERVAL 5
+ STORM-ZOOKEEPER-RETRY-TIMES 2
+ STORM-ZOOKEEPER-RETRY-INTERVAL-CEILING 15
+ STORM-ZOOKEEPER-AUTH-SCHEME "digest"
+ STORM-ZOOKEEPER-AUTH-PAYLOAD "storm:thisisapoorpassword"})]
(. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder))
(. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
(. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
(TestUtils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf))
(is (nil?
- (try
- (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
- (catch MockitoAssertionError e
- e)))))))
+ (try
+ (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
+ (catch MockitoAssertionError e
+ e)))))))
(deftest test-storm-state-callbacks
;; TODO finish
@@ -309,13 +313,17 @@
(deftest test-cluster-state-default-acls
(testing "The default ACLs are empty."
- (let [zk-mock (Mockito/mock Zookeeper)]
+ (let [zk-mock (Mockito/mock Zookeeper)
+ curator-frameworke (reify CuratorFramework (^void close [this] nil))]
;; No need for when clauses because we just want to return nil
(with-open [_ (MockedZookeeper. zk-mock)]
- (. (Mockito/when (Mockito/mock Zookeeper)) (thenReturn (reify CuratorFramework (^void close [this] nil))))
- (. (Mockito/when (Mockito/mock DistributedClusterState)) (thenReturn {}))
- (. (Mockito/when (Mockito/mock StormZkClusterState)) (thenReturn (reify ClusterState
- (register [this callback] nil)
- (mkdirs [this path acls] nil))))
- (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))))
-
+ (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/anyList) (Mockito/any) (Mockito/anyString) (Mockito/any) (Mockito/anyMap))) (thenReturn curator-frameworke))
+ (ClusterUtils/mkDistributedClusterState {} nil nil (ClusterStateContext.))
+ (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))
+ (let [distributed-state-storage (reify StateStorage
+ (register [this callback] nil)
+ (mkdirs [this path acls] nil))
+ cluster-utils (Mockito/mock ClusterUtils)]
+ (with-open [mocked-cluster (MockedCluster. cluster-utils)]
+ (. (Mockito/when (.mkDistributedClusterStateImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage))
+ (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.))))))
\ No newline at end of file