You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/24 17:17:23 UTC
[13/27] storm git commit: Merge branch 'master' into ClusterUtils
http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 8df5885,0000000..17c8641
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@@ -1,664 -1,0 +1,687 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.*;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.generated.*;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.zookeeper.Zookeeper;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
++import java.io.PrintWriter;
++import java.io.StringWriter;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StormClusterStateImpl implements IStormClusterState {
+
+ private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
+
+ private IStateStorage stateStorage;
+
+ private ConcurrentHashMap<String, IFn> assignmentInfoCallback;
+ private ConcurrentHashMap<String, IFn> assignmentInfoWithVersionCallback;
+ private ConcurrentHashMap<String, IFn> assignmentVersionCallback;
+ private AtomicReference<IFn> supervisorsCallback;
+ // we want to reigister a topo directory getChildren callback for all workers of this dir
+ private ConcurrentHashMap<String, IFn> backPressureCallback;
+ private AtomicReference<IFn> assignmentsCallback;
+ private ConcurrentHashMap<String, IFn> stormBaseCallback;
+ private AtomicReference<IFn> blobstoreCallback;
+ private ConcurrentHashMap<String, IFn> credentialsCallback;
+ private ConcurrentHashMap<String, IFn> logConfigCallback;
+
+ private List<ACL> acls;
+ private String stateId;
+ private boolean solo;
+
+ public StormClusterStateImpl(IStateStorage StateStorage, List<ACL> acls, ClusterStateContext context, boolean solo) throws Exception {
+
+ this.stateStorage = StateStorage;
+ this.solo = solo;
++ this.acls = acls;
+
+ assignmentInfoCallback = new ConcurrentHashMap<>();
+ assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
+ assignmentVersionCallback = new ConcurrentHashMap<>();
+ supervisorsCallback = new AtomicReference<>();
+ backPressureCallback = new ConcurrentHashMap<>();
+ assignmentsCallback = new AtomicReference<>();
+ stormBaseCallback = new ConcurrentHashMap<>();
+ credentialsCallback = new ConcurrentHashMap<>();
+ logConfigCallback = new ConcurrentHashMap<>();
+ blobstoreCallback = new AtomicReference<>();
+
+ stateId = this.stateStorage.register(new ZKStateChangedCallback() {
+
+ public void changed(Watcher.Event.EventType type, String path) {
+ List<String> toks = Zookeeper.tokenizePath(path);
+ int size = toks.size();
+ if (size >= 1) {
- String params = null;
+ String root = toks.get(0);
- IFn fn = null;
+ if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) {
+ if (size == 1) {
+ // set null and get the old value
+ issueCallback(assignmentsCallback);
+ } else {
+ issueMapCallback(assignmentInfoCallback, toks.get(1));
+ issueMapCallback(assignmentVersionCallback, toks.get(1));
+ issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
+ }
+
+ } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) {
+ issueCallback(supervisorsCallback);
+ } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) {
+ issueCallback(blobstoreCallback);
+ } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) {
+ issueMapCallback(stormBaseCallback, toks.get(1));
+ } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) {
+ issueMapCallback(credentialsCallback, toks.get(1));
+ } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) {
+ issueMapCallback(logConfigCallback, toks.get(1));
+ } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
+ issueMapCallback(logConfigCallback, toks.get(1));
+ } else {
+ LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
+ Runtime.getRuntime().exit(30);
+ }
+
+ }
+
+ return;
+ }
+
+ });
+
+ String[] pathlist = { ClusterUtils.ASSIGNMENTS_SUBTREE, ClusterUtils.STORMS_SUBTREE, ClusterUtils.SUPERVISORS_SUBTREE, ClusterUtils.WORKERBEATS_SUBTREE,
+ ClusterUtils.ERRORS_SUBTREE, ClusterUtils.BLOBSTORE_SUBTREE, ClusterUtils.NIMBUSES_SUBTREE, ClusterUtils.LOGCONFIG_SUBTREE };
+ for (String path : pathlist) {
+ this.stateStorage.mkdirs(path, acls);
+ }
+
+ }
+
+ protected void issueCallback(AtomicReference<IFn> cb) {
+ IFn callback = cb.getAndSet(null);
+ if (callback != null)
+ callback.invoke();
+ }
+
+ protected void issueMapCallback(ConcurrentHashMap<String, IFn> callbackConcurrentHashMap, String key) {
+ IFn callback = callbackConcurrentHashMap.remove(key);
+ if (callback != null)
+ callback.invoke();
+ }
+
+ @Override
+ public List<String> assignments(IFn callback) {
+ if (callback != null) {
+ assignmentsCallback.set(callback);
+ }
+ return stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, callback != null);
+ }
+
+ @Override
+ public Assignment assignmentInfo(String stormId, IFn callback) {
+ if (callback != null) {
+ assignmentInfoCallback.put(stormId, callback);
+ }
+ byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
+ return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
+ }
+
+ @Override
+ public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback) {
+ if (callback != null) {
+ assignmentInfoWithVersionCallback.put(stormId, callback);
+ }
+ Assignment assignment = null;
+ Integer version = 0;
+ APersistentMap aPersistentMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
+ if (aPersistentMap != null) {
+ assignment = ClusterUtils.maybeDeserialize((byte[]) aPersistentMap.get(RT.keyword(null, "data")), Assignment.class);
+ version = (Integer) aPersistentMap.get(RT.keyword(null, "version"));
+ }
+ APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version });
+ return map;
+ }
+
+ @Override
+ public Integer assignmentVersion(String stormId, IFn callback) throws Exception {
+ if (callback != null) {
+ assignmentVersionCallback.put(stormId, callback);
+ }
+ return stateStorage.get_version(ClusterUtils.assignmentPath(stormId), callback != null);
+ }
+
+ // blobstore state
+ @Override
+ public List<String> blobstoreInfo(String blobKey) {
+ String path = ClusterUtils.blobstorePath(blobKey);
+ stateStorage.sync_path(path);
+ return stateStorage.get_children(path, false);
+ }
+
+ @Override
+ public List nimbuses() {
+ List<NimbusSummary> nimbusSummaries = new ArrayList<>();
+ List<String> nimbusIds = stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false);
+ for (String nimbusId : nimbusIds) {
+ byte[] serialized = stateStorage.get_data(ClusterUtils.nimbusPath(nimbusId), false);
+ NimbusSummary nimbusSummary = ClusterUtils.maybeDeserialize(serialized, NimbusSummary.class);
+ nimbusSummaries.add(nimbusSummary);
+ }
+ return nimbusSummaries;
+ }
+
+ @Override
+ public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
+ // explicit delete for ephmeral node to ensure this session creates the entry.
+ stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
+ stateStorage.add_listener(new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+ LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
+ if (connectionState.equals(ConnectionState.RECONNECTED)) {
+ LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
+ stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+ }
+
+ }
+ });
+
+ stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+ }
+
+ @Override
+ public List<String> activeStorms() {
+ return stateStorage.get_children(ClusterUtils.STORMS_SUBTREE, false);
+ }
+
+ @Override
+ public StormBase stormBase(String stormId, IFn callback) {
+ if (callback != null) {
+ stormBaseCallback.put(stormId, callback);
+ }
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(ClusterUtils.stormPath(stormId), callback != null), StormBase.class);
+ }
+
+ @Override
+ public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
+ byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
+ return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
+
+ }
+
+ @Override
+ public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift) {
+ List<ProfileRequest> requests = new ArrayList<>();
+ List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId, isThrift);
+ for (ProfileRequest profileRequest : profileRequests) {
+ NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
+ if (nodeInfo1.equals(nodeInfo))
+ requests.add(profileRequest);
+ }
+ return requests;
+ }
+
+ @Override
+ public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift) {
+ List<ProfileRequest> profileRequests = new ArrayList<>();
+ String path = ClusterUtils.profilerConfigPath(stormId);
+ if (stateStorage.node_exists(path, false)) {
+ List<String> strs = stateStorage.get_children(path, false);
+ for (String str : strs) {
+ String childPath = path + ClusterUtils.ZK_SEPERATOR + str;
+ byte[] raw = stateStorage.get_data(childPath, false);
+ ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class);
+ if (request != null)
+ profileRequests.add(request);
+ }
+ }
+ return profileRequests;
+ }
+
+ @Override
+ public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
+ ProfileAction profileAction = profileRequest.get_action();
+ String host = profileRequest.get_nodeInfo().get_node();
+ Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+ String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+ stateStorage.set_data(path, Utils.serialize(profileRequest), acls);
+ }
+
+ @Override
+ public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
+ ProfileAction profileAction = profileRequest.get_action();
+ String host = profileRequest.get_nodeInfo().get_node();
+ Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+ String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+ stateStorage.delete_node(path);
+ }
+
- // need to take executor->node+port in explicitly so that we don't run into a situation where a
- // long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
- // with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
- // we avoid situations like that
++ /**
++ * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the
++ * timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid
++ * situations like that
++ *
++ * @param stormId
++ * @param executorNodePort
++ * @return
++ */
+ @Override
- public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
- Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhbs = new HashMap<>();
++ public Map<ExecutorInfo, APersistentMap> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
++ Map<ExecutorInfo, APersistentMap> executorWhbs = new HashMap<>();
+
+ Map<NodeInfo, List<List<Long>>> nodePortExecutors = ClusterUtils.reverseMap(executorNodePort);
+
+ for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
+
+ String node = entry.getKey().get_node();
+ Long port = entry.getKey().get_port_iterator().next();
+ ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
+ List<ExecutorInfo> executorInfoList = new ArrayList<>();
+ for (List<Long> list : entry.getValue()) {
+ executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
+ }
+ if (whb != null)
+ executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
+ }
+ return executorWhbs;
+ }
+
+ @Override
+ public List<String> supervisors(IFn callback) {
+ if (callback != null) {
+ supervisorsCallback.set(callback);
+ }
+ return stateStorage.get_children(ClusterUtils.SUPERVISORS_SUBTREE, callback != null);
+ }
+
+ @Override
+ public SupervisorInfo supervisorInfo(String supervisorId) {
+ String path = ClusterUtils.supervisorPath(supervisorId);
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), SupervisorInfo.class);
+ }
+
+ @Override
+ public void setupHeatbeats(String stormId) {
+ stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls);
+ }
+
+ @Override
+ public void teardownHeartbeats(String stormId) {
+ try {
+ stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId));
+ } catch (Exception e) {
+ if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+ // do nothing
+ LOG.warn("Could not teardown heartbeats for {}.", stormId);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void teardownTopologyErrors(String stormId) {
+ try {
+ stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId));
+ } catch (Exception e) {
+ if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+ // do nothing
+ LOG.warn("Could not teardown errors for {}.", stormId);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public List<String> heartbeatStorms() {
+ return stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false);
+ }
+
+ @Override
+ public List<String> errorTopologies() {
+ return stateStorage.get_children(ClusterUtils.ERRORS_SUBTREE, false);
+ }
+
+ @Override
+ public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
+ stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), acls);
+ }
+
+ @Override
+ public LogConfig topologyLogConfig(String stormId, IFn cb) {
+ String path = ClusterUtils.logConfigPath(stormId);
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class);
+ }
+
+ @Override
+ public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
+ if (info != null) {
+ String path = ClusterUtils.workerbeatPath(stormId, node, port);
+ stateStorage.set_worker_hb(path, Utils.serialize(info), acls);
+ }
+ }
+
+ @Override
+ public void removeWorkerHeartbeat(String stormId, String node, Long port) {
+ String path = ClusterUtils.workerbeatPath(stormId, node, port);
+ stateStorage.delete_worker_hb(path);
+ }
+
+ @Override
+ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
+ String path = ClusterUtils.supervisorPath(supervisorId);
+ stateStorage.set_ephemeral_node(path, Utils.serialize(info), acls);
+ }
+
- // if znode exists and to be not on?, delete; if exists and on?, do nothing;
- // if not exists and to be on?, create; if not exists and not on?, do nothing;
++ /**
++ * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
++ *
++ * @param stormId
++ * @param node
++ * @param port
++ * @param on
++ */
+ @Override
+ public void workerBackpressure(String stormId, String node, Long port, boolean on) {
+ String path = ClusterUtils.backpressurePath(stormId, node, port);
+ boolean existed = stateStorage.node_exists(path, false);
+ if (existed) {
+ if (on == false)
+ stateStorage.delete_node(path);
+
+ } else {
+ if (on == true) {
+ stateStorage.set_ephemeral_node(path, null, acls);
+ }
+ }
+ }
+
- // if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not.
++ /**
++ * if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not.
++ *
++ * @param stormId
++ * @param callback
++ * @return
++ */
+ @Override
+ public boolean topologyBackpressure(String stormId, IFn callback) {
+ if (callback != null) {
+ backPressureCallback.put(stormId, callback);
+ }
+ String path = ClusterUtils.backpressureStormRoot(stormId);
+ List<String> childrens = stateStorage.get_children(path, callback != null);
+ return childrens.size() > 0;
+
+ }
+
+ @Override
+ public void setupBackpressure(String stormId) {
+ stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls);
+ }
+
+ @Override
+ public void removeWorkerBackpressure(String stormId, String node, Long port) {
+ stateStorage.delete_node(ClusterUtils.backpressurePath(stormId, node, port));
+ }
+
+ @Override
+ public void activateStorm(String stormId, StormBase stormBase) {
+ String path = ClusterUtils.stormPath(stormId);
+ stateStorage.set_data(path, Utils.serialize(stormBase), acls);
+ }
+
- // To update this function due to APersistentMap/APersistentSet is clojure's structure
++ /**
++ * To update this function due to APersistentMap/APersistentSet is clojure's structure
++ *
++ * @param stormId
++ * @param newElems
++ */
+ @Override
+ public void updateStorm(String stormId, StormBase newElems) {
+
+ StormBase stormBase = stormBase(stormId, null);
+ if (stormBase.get_component_executors() != null) {
+
+ Map<String, Integer> newComponentExecutors = new HashMap<>();
+ Map<String, Integer> componentExecutors = newElems.get_component_executors();
+ // componentExecutors maybe be APersistentMap, which don't support "put"
+ for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
+ newComponentExecutors.put(entry.getKey(), entry.getValue());
+ }
+ for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
+ if (!componentExecutors.containsKey(entry.getKey())) {
+ newComponentExecutors.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (newComponentExecutors.size() > 0)
+ newElems.set_component_executors(newComponentExecutors);
+ }
+
+ Map<String, DebugOptions> ComponentDebug = new HashMap<>();
+ Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
+
+ Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
+ /// oldComponentDebug.keySet()/ newComponentDebug.keySet() maybe be APersistentSet, which don't support addAll
+ Set<String> debugOptionsKeys = new HashSet<>();
+ debugOptionsKeys.addAll(oldComponentDebug.keySet());
+ debugOptionsKeys.addAll(newComponentDebug.keySet());
+ for (String key : debugOptionsKeys) {
+ boolean enable = false;
+ double samplingpct = 0;
+ if (oldComponentDebug.containsKey(key)) {
+ enable = oldComponentDebug.get(key).is_enable();
+ samplingpct = oldComponentDebug.get(key).get_samplingpct();
+ }
+ if (newComponentDebug.containsKey(key)) {
+ enable = newComponentDebug.get(key).is_enable();
+ samplingpct += newComponentDebug.get(key).get_samplingpct();
+ }
+ DebugOptions debugOptions = new DebugOptions();
+ debugOptions.set_enable(enable);
+ debugOptions.set_samplingpct(samplingpct);
+ ComponentDebug.put(key, debugOptions);
+ }
+ if (ComponentDebug.size() > 0) {
+ newElems.set_component_debug(ComponentDebug);
+ }
+
+ if (StringUtils.isBlank(newElems.get_name())) {
+ newElems.set_name(stormBase.get_name());
+ }
+ if (newElems.get_status() == null) {
+ newElems.set_status(stormBase.get_status());
+ }
+ if (newElems.get_num_workers() == 0) {
+ newElems.set_num_workers(stormBase.get_num_workers());
+ }
+ if (newElems.get_launch_time_secs() == 0) {
+ newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
+ }
+ if (StringUtils.isBlank(newElems.get_owner())) {
+ newElems.set_owner(stormBase.get_owner());
+ }
+ if (newElems.get_topology_action_options() == null) {
+ newElems.set_topology_action_options(stormBase.get_topology_action_options());
+ }
+ if (newElems.get_status() == null) {
+ newElems.set_status(stormBase.get_status());
+ }
+ stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), acls);
+ }
+
+ @Override
+ public void removeStormBase(String stormId) {
+ stateStorage.delete_node(ClusterUtils.stormPath(stormId));
+ }
+
+ @Override
+ public void setAssignment(String stormId, Assignment info) {
+ stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), acls);
+ }
+
+ @Override
+ public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) {
+ String path = ClusterUtils.blobstorePath(key) + ClusterUtils.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo;
+ LOG.info("set-path: {}", path);
+ stateStorage.mkdirs(ClusterUtils.blobstorePath(key), acls);
+ stateStorage.delete_node_blobstore(ClusterUtils.blobstorePath(key), nimbusInfo.toHostPortString());
+ stateStorage.set_ephemeral_node(path, null, acls);
+ }
+
+ @Override
+ public List<String> activeKeys() {
+ return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, false);
+ }
+
+ // blobstore state
+ @Override
+ public List<String> blobstore(IFn callback) {
+ if (callback != null) {
+ blobstoreCallback.set(callback);
+ }
+ stateStorage.sync_path(ClusterUtils.BLOBSTORE_SUBTREE);
+ return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, callback != null);
+
+ }
+
+ @Override
+ public void removeStorm(String stormId) {
+ stateStorage.delete_node(ClusterUtils.assignmentPath(stormId));
+ stateStorage.delete_node(ClusterUtils.credentialsPath(stormId));
+ stateStorage.delete_node(ClusterUtils.logConfigPath(stormId));
+ stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId));
+ removeStormBase(stormId);
+ }
+
+ @Override
+ public void removeBlobstoreKey(String blobKey) {
+ LOG.debug("remove key {}", blobKey);
+ stateStorage.delete_node(ClusterUtils.blobstorePath(blobKey));
+ }
+
+ @Override
+ public void removeKeyVersion(String blobKey) {
+ stateStorage.delete_node(ClusterUtils.blobstoreMaxKeySequenceNumberPath(blobKey));
+ }
+
+ @Override
- public void reportError(String stormId, String componentId, String node, Long port, String error) {
++ public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
+
+ String path = ClusterUtils.errorPath(stormId, componentId);
+ String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
- ErrorInfo errorInfo = new ErrorInfo(error, Time.currentTimeSecs());
++ ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.StringifyError(error), Time.currentTimeSecs());
+ errorInfo.set_host(node);
+ errorInfo.set_port(port.intValue());
+ byte[] serData = Utils.serialize(errorInfo);
+ stateStorage.mkdirs(path, acls);
+ stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, acls);
+ stateStorage.set_data(lastErrorPath, serData, acls);
+ List<String> childrens = stateStorage.get_children(path, false);
+
+ Collections.sort(childrens, new Comparator<String>() {
+ public int compare(String arg0, String arg1) {
+ return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
+ }
+ });
+
+ while (childrens.size() > 10) {
+ stateStorage.delete_node(path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0));
+ }
+ }
+
+ @Override
+ public List<ErrorInfo> errors(String stormId, String componentId) {
+ List<ErrorInfo> errorInfos = new ArrayList<>();
+ try {
+ String path = ClusterUtils.errorPath(stormId, componentId);
+ if (stateStorage.node_exists(path, false)) {
+ List<String> childrens = stateStorage.get_children(path, false);
+ for (String child : childrens) {
+ String childPath = path + ClusterUtils.ZK_SEPERATOR + child;
+ ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class);
+ if (errorInfo != null)
+ errorInfos.add(errorInfo);
+ }
+ }
+ Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
+ public int compare(ErrorInfo arg0, ErrorInfo arg1) {
+ return Integer.compare(arg1.get_error_time_secs(), arg0.get_error_time_secs());
+ }
+ });
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+
+ return errorInfos;
+ }
+
+ @Override
+ public ErrorInfo lastError(String stormId, String componentId) {
+
+ String path = ClusterUtils.lastErrorPath(stormId, componentId);
+ if (stateStorage.node_exists(path, false)) {
+ ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), ErrorInfo.class);
+ return errorInfo;
+ }
+
+ return null;
+ }
+
+ @Override
+ public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException {
+ List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
+ String path = ClusterUtils.credentialsPath(stormId);
+ stateStorage.set_data(path, Utils.serialize(creds), aclList);
+
+ }
+
+ @Override
+ public Credentials credentials(String stormId, IFn callback) {
+ if (callback != null) {
+ credentialsCallback.put(stormId, callback);
+ }
+ String path = ClusterUtils.credentialsPath(stormId);
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, callback != null), Credentials.class);
+
+ }
+
+ @Override
+ public void disconnect() {
+ stateStorage.unregister(stateId);
+ if (solo)
+ stateStorage.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
index f3b9253,0000000..956c20e
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@@ -1,36 -1,0 +1,36 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import clojure.lang.APersistentMap;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+
- public class ZKStateStorageFactory implements StateStorageFactory{
++public class ZKStateStorageFactory implements StateStorageFactory {
+
+ @Override
+ public IStateStorage mkStore(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) {
+ try {
+ return new ZKStateStorage(config, auth_conf, acls, context);
- }catch (Exception e){
++ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index af0e8f3,34f3665..20d6deb
--- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@@ -60,6 -60,6 +60,10 @@@ public class PacemakerClient implement
private StormBoundedExponentialBackoffRetry backoff = new StormBoundedExponentialBackoffRetry(100, 5000, 20);
private int retryTimes = 0;
++ //the constructor is invoked by pacemaker-state-factory-test
++ public PacemakerClient() {
++ bootstrap = new ClientBootstrap();
++ }
public PacemakerClient(Map config) {
String host = (String)config.get(Config.PACEMAKER_HOST);
@@@ -157,6 -157,7 +161,7 @@@
public String secretKey() {
return secret;
}
-
++ public HBMessage checkCaptured() {return null;}
public HBMessage send(HBMessage m) {
waitUntilReady();
LOG.debug("Sending message: {}", m.toString());
http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/cluster_test.clj
index 39adb9e,b146cb0..22c1f80
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@@ -22,11 -22,11 +22,11 @@@
(:import [org.mockito Mockito])
(:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
- (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo ConfigUtils])
+ (:import [org.apache.storm.utils Time Utils ZookeeperAuthInfo ConfigUtils])
- (:import [org.apache.storm.cluster ClusterState])
+ (:import [org.apache.storm.cluster IStateStorage ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
(:import [org.apache.storm.zookeeper Zookeeper])
- (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
- (:require [org.apache.storm [zookeeper :as zk]])
+ (:import [org.apache.storm.callback ZKStateChangedCallback])
+ (:import [org.apache.storm.testing.staticmocking MockedZookeeper MockedCluster])
(:require [conjure.core])
(:use [conjure core])
(:use [clojure test])
@@@ -39,14 -39,18 +39,18 @@@
(defn mk-state
([zk-port] (let [conf (mk-config zk-port)]
- (mk-distributed-cluster-state conf :auth-conf conf)))
+ (ClusterUtils/mkStateStorage conf conf nil (ClusterStateContext.))))
([zk-port cb]
- (let [ret (mk-state zk-port)]
- (.register ret cb)
- ret )))
+ (let [ret (mk-state zk-port)]
+ (.register ret cb)
+ ret)))
-(defn mk-storm-state [zk-port] (mk-storm-cluster-state (mk-config zk-port)))
+(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) nil (ClusterStateContext.)))
+ (defn barr
+ [& vals]
+ (byte-array (map byte vals)))
+
(deftest test-basics
(with-inprocess-zookeeper zk-port
(let [state (mk-state zk-port)]
@@@ -242,27 -244,26 +246,32 @@@
(is (.contains (:error error) target))
)))
++(defn- stringify-error [error]
++ (let [result (java.io.StringWriter.)
++ printer (java.io.PrintWriter. result)]
++ (.printStackTrace error printer)
++ (.toString result)))
(deftest test-storm-cluster-state-errors
(with-inprocess-zookeeper zk-port
(with-simulated-time
(let [state (mk-storm-state zk-port)]
- (.reportError state "a" "1" (local-hostname) 6700 (stringify-error (RuntimeException.)))
- (.report-error state "a" "1" (Utils/localHostname) 6700 (RuntimeException.))
++ (.reportError state "a" "1" (Utils/localHostname) 6700 (RuntimeException.))
(validate-errors! state "a" "1" ["RuntimeException"])
(advance-time-secs! 1)
- (.reportError state "a" "1" (local-hostname) 6700 (stringify-error (IllegalArgumentException.)))
- (.report-error state "a" "1" (Utils/localHostname) 6700 (IllegalArgumentException.))
++ (.reportError state "a" "1" (Utils/localHostname) 6700 (IllegalArgumentException.))
(validate-errors! state "a" "1" ["IllegalArgumentException" "RuntimeException"])
(doseq [i (range 10)]
- (.reportError state "a" "2" (local-hostname) 6700 (stringify-error (RuntimeException.)))
- (.report-error state "a" "2" (Utils/localHostname) 6700 (RuntimeException.))
++ (.reportError state "a" "2" (Utils/localHostname) 6700 (RuntimeException.))
(advance-time-secs! 2))
(validate-errors! state "a" "2" (repeat 10 "RuntimeException"))
(doseq [i (range 5)]
- (.reportError state "a" "2" (local-hostname) 6700 (stringify-error (IllegalArgumentException.)))
- (.report-error state "a" "2" (Utils/localHostname) 6700 (IllegalArgumentException.))
++ (.reportError state "a" "2" (Utils/localHostname) 6700 (IllegalArgumentException.))
(advance-time-secs! 2))
(validate-errors! state "a" "2" (concat (repeat 5 "IllegalArgumentException")
- (repeat 5 "RuntimeException")
- ))
+ (repeat 5 "RuntimeException")
+ ))
+
(.disconnect state)
))))
@@@ -300,12 -301,12 +309,12 @@@
(. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder))
(. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
(. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder))
- (TestUtils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf))
+ (Utils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf))
(is (nil?
- (try
- (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
- (catch MockitoAssertionError e
- e)))))))
+ (try
+ (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
+ (catch MockitoAssertionError e
+ e)))))))
(deftest test-storm-state-callbacks
;; TODO finish
@@@ -313,17 -314,15 +322,17 @@@
(deftest test-cluster-state-default-acls
(testing "The default ACLs are empty."
- (let [zk-mock (Mockito/mock Zookeeper)]
+ (let [zk-mock (Mockito/mock Zookeeper)
+ curator-frameworke (reify CuratorFramework (^void close [this] nil))]
;; No need for when clauses because we just want to return nil
(with-open [_ (MockedZookeeper. zk-mock)]
- (stubbing [zk/mk-client (reify CuratorFramework (^void close [this] nil))]
- (mk-distributed-cluster-state {})
- (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil)))))
- (stubbing [mk-distributed-cluster-state (reify ClusterState
- (register [this callback] nil)
- (mkdirs [this path acls] nil))]
- (mk-storm-cluster-state {})
- (verify-call-times-for mk-distributed-cluster-state 1)
- (verify-first-call-args-for-indices mk-distributed-cluster-state [4] nil))))
+ (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/anyList) (Mockito/any) (Mockito/anyString) (Mockito/any) (Mockito/anyMap))) (thenReturn curator-frameworke))
+ (ClusterUtils/mkStateStorage {} nil nil (ClusterStateContext.))
+ (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))
+ (let [distributed-state-storage (reify IStateStorage
+ (register [this callback] nil)
+ (mkdirs [this path acls] nil))
+ cluster-utils (Mockito/mock ClusterUtils)]
+ (with-open [mocked-cluster (MockedCluster. cluster-utils)]
- (. (Mockito/when (mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage))
++ (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage))
+ (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.))))))
http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 772a232,70cb885..09c4371
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@@ -23,27 -23,33 +23,36 @@@
[org.apache.storm.nimbus InMemoryTopologyActionNotifier])
(:import [org.apache.storm.testing.staticmocking MockedZookeeper])
(:import [org.apache.storm.scheduler INimbus])
+ (:import [org.mockito Mockito])
+ (:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
- (:import [org.apache.storm.testing.staticmocking MockedConfigUtils MockedCluster])
++ (:import [org.apache.storm.testing.staticmocking MockedCluster])
(:import [org.apache.storm.generated Credentials NotAliveException SubmitOptions
TopologyInitialStatus TopologyStatus AlreadyAliveException KillOptions RebalanceOptions
InvalidTopologyException AuthorizationException
LogConfig LogLevel LogLevelAction])
(:import [java.util HashMap])
(:import [java.io File])
- (:import [org.apache.storm.utils Time Utils ConfigUtils])
+ (:import [org.apache.storm.utils Time Utils Utils$UptimeComputer ConfigUtils IPredicate]
+ [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller])
(:import [org.apache.storm.zookeeper Zookeeper])
- (:import [org.apache.commons.io FileUtils])
+ (:import [org.apache.commons.io FileUtils]
+ [org.json.simple JSONValue])
- (:use [org.apache.storm testing MockAutoCred util config log timer zookeeper])
+ (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils])
+ (:use [org.apache.storm testing MockAutoCred util config log timer converter])
(:use [org.apache.storm.daemon common])
(:require [conjure.core])
(:require [org.apache.storm
- [thrift :as thrift]
- [cluster :as cluster]])
+ [thrift :as thrift]])
(:use [conjure core]))
+ (defn- from-json
+ [^String str]
+ (if str
+ (clojurify-structure
+ (JSONValue/parse str))
+ nil))
+
(defn storm-component->task-info [cluster storm-name]
(let [storm-id (get-storm-id (:storm-cluster-state cluster) storm-name)
nimbus (:nimbus cluster)]
@@@ -72,8 -80,8 +83,8 @@@
(defn storm-num-workers [state storm-name]
(let [storm-id (get-storm-id state storm-name)
- assignment (.assignment-info state storm-id nil)]
+ assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
- (count (reverse-map (:executor->node+port assignment)))
+ (count (clojurify-structure (Utils/reverseMap (:executor->node+port assignment))))
))
(defn topology-nodes [state storm-name]
@@@ -95,9 -103,11 +106,11 @@@
set
)))
+ ;TODO: when translating this function, don't call map-val, but instead use an inline for loop.
+ ; map-val is a temporary kluge for clojure.
(defn topology-node-distribution [state storm-name]
(let [storm-id (get-storm-id state storm-name)
- assignment (.assignment-info state storm-id nil)]
+ assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
(->> assignment
:executor->node+port
vals
@@@ -124,19 -134,18 +137,18 @@@
(defn do-executor-heartbeat [cluster storm-id executor]
(let [state (:storm-cluster-state cluster)
- executor->node+port (:executor->node+port (.assignment-info state storm-id nil))
+ executor->node+port (:executor->node+port (clojurify-assignment (.assignmentInfo state storm-id nil)))
[node port] (get executor->node+port executor)
- curr-beat (.get-worker-heartbeat state storm-id node port)
+ curr-beat (clojurify-zk-worker-hb (.getWorkerHeartbeat state storm-id node port))
stats (:executor-stats curr-beat)]
- (.worker-heartbeat! state storm-id node port
- {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})}
+ (.workerHeartbeat state storm-id node port
- (thriftify-zk-worker-hb {:storm-id storm-id :time-secs (current-time-secs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})})
++ (thriftify-zk-worker-hb {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})})
)))
(defn slot-assignments [cluster storm-id]
(let [state (:storm-cluster-state cluster)
- assignment (.assignment-info state storm-id nil)]
+ assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
- (reverse-map (:executor->node+port assignment))
- ))
+ (clojurify-structure (Utils/reverseMap (:executor->node+port assignment)))))
(defn task-ids [cluster storm-id]
(let [nimbus (:nimbus cluster)]
@@@ -146,8 -155,10 +158,10 @@@
(defn topology-executors [cluster storm-id]
(let [state (:storm-cluster-state cluster)
- assignment (clojurify-assignment (.assignmentInfo state storm-id nil))]
- (keys (:executor->node+port assignment))
- assignment (.assignment-info state storm-id nil)
- ret-keys (keys (:executor->node+port assignment))
++ assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
++ ret-keys (keys (:executor->node+port assignment))
+ _ (log-message "ret-keys: " (pr-str ret-keys)) ]
+ ret-keys
))
(defn check-distribution [items distribution]
@@@ -1350,23 -1399,27 +1402,27 @@@
NIMBUS-THRIFT-PORT 6666})
expected-acls nimbus/NIMBUS-ZK-ACLS
fake-inimbus (reify INimbus (getForcedScheduler [this] nil))
+ fake-cu (proxy [ConfigUtils] []
- (nimbusTopoHistoryStateImpl [conf] nil))
++ (nimbusTopoHistoryStateImpl [conf] nil))
+ fake-utils (proxy [Utils] []
+ (newInstanceImpl [_])
+ (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
- (upTime [] 0))))]
++ (upTime [] 0))))
+ cluster-utils (Mockito/mock ClusterUtils)]
- (with-open [_ (proxy [MockedConfigUtils] []
- (nimbusTopoHistoryStateImpl [conf] nil))
+ (with-open [_ (ConfigUtilsInstaller. fake-cu)
+ _ (UtilsInstaller. fake-utils)
zk-le (MockedZookeeper. (proxy [Zookeeper] []
- (zkLeaderElectorImpl [conf] nil)))]
+ (zkLeaderElectorImpl [conf] nil)))
+ mocked-cluster (MockedCluster. cluster-utils)]
(stubbing [mk-authorization-handler nil
- cluster/mk-storm-cluster-state nil
- nimbus/file-cache-map nil
- nimbus/mk-blob-cache-map nil
- nimbus/mk-bloblist-cache-map nil
- mk-timer nil
- nimbus/mk-scheduler nil]
- (nimbus/nimbus-data auth-conf fake-inimbus)
- (verify-call-times-for cluster/mk-storm-cluster-state 1)
- (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
- expected-acls))))))
+ nimbus/file-cache-map nil
+ nimbus/mk-blob-cache-map nil
+ nimbus/mk-bloblist-cache-map nil
- uptime-computer nil
- new-instance nil
+ mk-timer nil
+ nimbus/mk-scheduler nil]
+ (nimbus/nimbus-data auth-conf fake-inimbus)
+ (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
+ )))))
(deftest test-file-bogus-download
(with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
@@@ -1397,9 -1450,9 +1453,9 @@@
STORM-CLUSTER-MODE "local"
STORM-ZOOKEEPER-PORT zk-port
STORM-LOCAL-DIR nimbus-dir}))
- (bind cluster-state (cluster/mk-storm-cluster-state conf))
+ (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
- (sleep-secs 1)
+ (Time/sleepSecs 1)
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
@@@ -1429,10 -1482,10 +1485,10 @@@
STORM-ZOOKEEPER-PORT zk-port
STORM-LOCAL-DIR nimbus-dir
NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName InMemoryTopologyActionNotifier)}))
- (bind cluster-state (cluster/mk-storm-cluster-state conf))
+ (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.)))
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
(bind notifier (InMemoryTopologyActionNotifier.))
- (sleep-secs 1)
+ (Time/sleepSecs 1)
(bind topology (thrift/mk-topology
{"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
{}))
http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
index 1a7bd2c,0925237..1c45266
--- a/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
+++ b/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
@@@ -19,10 -20,8 +19,11 @@@
(:import [org.apache.storm.generated
HBExecutionException HBNodes HBRecords
HBServerMessageType HBMessage HBMessageData HBPulse]
- [org.apache.storm.cluster ClusterStateContext PaceMakerStateStorageFactory]
- [org.apache.storm.cluster ClusterStateContext]
- [org.mockito Mockito Matchers]))
++ [org.apache.storm.cluster ClusterStateContext PaceMakerStateStorageFactory PaceMakerStateStorage]
+ [org.mockito Mockito Matchers])
+(:import [org.mockito.exceptions.base MockitoAssertionError])
++(:import [org.apache.storm.pacemaker PacemakerClient])
+(:import [org.apache.storm.testing.staticmocking MockedPaceMakerStateStorageFactory]))
(defn- string-to-bytes [string]
(byte-array (map int string)))
@@@ -30,26 -29,24 +31,23 @@@
(defn- bytes-to-string [bytez]
(apply str (map char bytez)))
--(defprotocol send-capture
-- (send [this something])
-- (check-captured [this]))
--
(defn- make-send-capture [response]
(let [captured (atom nil)]
-- (reify send-capture
-- (send [this something] (reset! captured something) response)
-- (check-captured [this] @captured))))
-
-(defmacro with-mock-pacemaker-client-and-state [client state response & body]
- `(let [~client (make-send-capture ~response)]
- (stubbing [psf/makeZKState nil
- psf/makeClient ~client]
- (let [~state (psf/-mkState nil nil nil nil (ClusterStateContext.))]
++ (proxy [PacemakerClient] []
++ (send [m] (reset! captured m) response)
++ (checkCaptured [] @captured))))
+
+(defmacro with-mock-pacemaker-client-and-state [client state pacefactory mock response & body]
+ `(let [~client (make-send-capture ~response)
+ ~pacefactory (Mockito/mock PaceMakerStateStorageFactory)]
+
+ (with-open [~mock (MockedPaceMakerStateStorageFactory. ~pacefactory)]
+ (. (Mockito/when (.initZKstateImpl ~pacefactory (Mockito/any) (Mockito/any) (Mockito/anyList) (Mockito/any))) (thenReturn nil))
+ (. (Mockito/when (.initMakeClientImpl ~pacefactory (Mockito/any))) (thenReturn ~client))
- (let [~state (.mkStore ~pacefactory nil nil nil (ClusterStateContext.))]
++ (let [~state (PaceMakerStateStorage. (PaceMakerStateStorageFactory/initMakeClient nil)
++ (PaceMakerStateStorageFactory/initZKstate nil nil nil nil))]
~@body))))
-
(deftest pacemaker_state_set_worker_hb
(testing "set_worker_hb"
(with-mock-pacemaker-client-and-state
@@@ -57,7 -54,7 +55,7 @@@
(HBMessage. HBServerMessageType/SEND_PULSE_RESPONSE nil)
(.set_worker_hb state "/foo" (string-to-bytes "data") nil)
-- (let [sent (.check-captured client)
++ (let [sent (.checkCaptured client)
pulse (.get_pulse (.get_data sent))]
(is (= (.get_type sent) HBServerMessageType/SEND_PULSE))
(is (= (.get_id pulse) "/foo"))
@@@ -65,13 -62,13 +63,12 @@@
(testing "set_worker_hb"
(with-mock-pacemaker-client-and-state
- client state
+ client state pacefactory mock
(HBMessage. HBServerMessageType/SEND_PULSE nil)
- (is (thrown? RuntimeException
- (is (thrown? HBExecutionException
-- (.set_worker_hb state "/foo" (string-to-bytes "data") nil))))))
++ (is (thrown? RuntimeException
++ (.set_worker_hb state "/foo" (string-to-bytes "data") nil))))))
--
(deftest pacemaker_state_delete_worker_hb
(testing "delete_worker_hb"
@@@ -80,74 -77,74 +77,75 @@@
(HBMessage. HBServerMessageType/DELETE_PATH_RESPONSE nil)
(.delete_worker_hb state "/foo/bar")
-- (let [sent (.check-captured client)]
++ (let [sent (.checkCaptured client)]
(is (= (.get_type sent) HBServerMessageType/DELETE_PATH))
(is (= (.get_path (.get_data sent)) "/foo/bar")))))
-- (testing "delete_worker_hb"
-- (with-mock-pacemaker-client-and-state
- client state pacefactory mock
- client state
-- (HBMessage. HBServerMessageType/DELETE_PATH nil)
--
- (is (thrown? RuntimeException
- (is (thrown? HBExecutionException
-- (.delete_worker_hb state "/foo/bar"))))))
++ (testing "delete_worker_hb"
++ (with-mock-pacemaker-client-and-state
++ client state pacefactory mock
++ (HBMessage. HBServerMessageType/DELETE_PATH nil)
++
++ (is (thrown? RuntimeException
++ (.delete_worker_hb state "/foo/bar"))))))
(deftest pacemaker_state_get_worker_hb
(testing "get_worker_hb"
(with-mock-pacemaker-client-and-state
- client state
+ client state pacefactory mock
(HBMessage. HBServerMessageType/GET_PULSE_RESPONSE
-- (HBMessageData/pulse
-- (doto (HBPulse.)
-- (.set_id "/foo")
-- (.set_details (string-to-bytes "some data")))))
++ (HBMessageData/pulse
++ (doto (HBPulse.)
++ (.set_id "/foo")
++ (.set_details (string-to-bytes "some data")))))
(.get_worker_hb state "/foo" false)
-- (let [sent (.check-captured client)]
++ (let [sent (.checkCaptured client)]
(is (= (.get_type sent) HBServerMessageType/GET_PULSE))
(is (= (.get_path (.get_data sent)) "/foo")))))
(testing "get_worker_hb - fail (bad response)"
(with-mock-pacemaker-client-and-state
- client state
+ client state pacefactory mock
(HBMessage. HBServerMessageType/GET_PULSE nil)
--
- (is (thrown? HBExecutionException
- (.get_worker_hb state "/foo" false)))))
-
++
+ (is (thrown? RuntimeException
- (.get_worker_hb state "/foo" false)))))
-
++ (.get_worker_hb state "/foo" false)))))
++
(testing "get_worker_hb - fail (bad data)"
(with-mock-pacemaker-client-and-state
- client state
+ client state pacefactory mock
(HBMessage. HBServerMessageType/GET_PULSE_RESPONSE nil)
--
- (is (thrown? HBExecutionException
- (.get_worker_hb state "/foo" false))))))
++
+ (is (thrown? RuntimeException
- (.get_worker_hb state "/foo" false))))))
++ (.get_worker_hb state "/foo" false))))))
(deftest pacemaker_state_get_worker_hb_children
(testing "get_worker_hb_children"
(with-mock-pacemaker-client-and-state
- client state
+ client state pacefactory mock
(HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE
-- (HBMessageData/nodes
-- (HBNodes. [])))
++ (HBMessageData/nodes
++ (HBNodes. [])))
(.get_worker_hb_children state "/foo" false)
-- (let [sent (.check-captured client)]
++ (let [sent (.checkCaptured client)]
(is (= (.get_type sent) HBServerMessageType/GET_ALL_NODES_FOR_PATH))
(is (= (.get_path (.get_data sent)) "/foo")))))
(testing "get_worker_hb_children - fail (bad response)"
(with-mock-pacemaker-client-and-state
- client state
+ client state pacefactory mock
(HBMessage. HBServerMessageType/DELETE_PATH nil)
- (is (thrown? HBExecutionException
- (.get_worker_hb_children state "/foo" false)))))
+ (is (thrown? RuntimeException
- (.get_worker_hb_children state "/foo" false)))))
++ (.get_worker_hb_children state "/foo" false)))))
-- (testing "get_worker_hb_children - fail (bad data)"
++ (testing "get_worker_hb_children - fail (bad data)"
(with-mock-pacemaker-client-and-state
- client state
+ client state pacefactory mock
(HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE nil)
- ;need been update due to HBExecutionException
-
- (is (thrown? HBExecutionException
- (.get_worker_hb_children state "/foo" false))))))
++
+ (is (thrown? RuntimeException
- (.get_worker_hb_children state "/foo" false))))))
++ (.get_worker_hb_children state "/foo" false))))))
++
http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/test/clj/org/apache/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/supervisor_test.clj
index b89b7bb,9c31ddf..3ebdbcd
--- a/storm-core/test/clj/org/apache/storm/supervisor_test.clj
+++ b/storm-core/test/clj/org/apache/storm/supervisor_test.clj
@@@ -21,20 -21,20 +21,23 @@@
(:require [clojure [string :as string] [set :as set]])
(:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout])
(:import [org.apache.storm.scheduler ISupervisor])
- (:import [org.apache.storm.utils ConfigUtils])
+ (:import [org.apache.storm.utils Time Utils$UptimeComputer ConfigUtils])
(:import [org.apache.storm.generated RebalanceOptions])
- (:import [org.apache.storm.testing.staticmocking MockedConfigUtils MockedCluster])
- (:import [org.mockito Matchers Mockito])
++ (:import [org.apache.storm.testing.staticmocking MockedCluster])
(:import [java.util UUID])
- (:import [org.mockito Mockito])
++ (:import [org.mockito Mockito Matchers])
+ (:import [org.mockito.exceptions.base MockitoAssertionError])
(:import [java.io File])
(:import [java.nio.file Files])
- (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils])
- (:import [org.apache.storm.utils Utils IPredicate]
++ (:import [org.apache.storm.utils Utils IPredicate])
++ (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]
+ [org.apache.storm.utils.staticmocking ConfigUtilsInstaller
+ UtilsInstaller])
(:import [java.nio.file.attribute FileAttribute])
- (:use [org.apache.storm config testing util timer log])
+ (:use [org.apache.storm config testing util timer log converter])
(:use [org.apache.storm.daemon common])
(:require [org.apache.storm.daemon [worker :as worker] [supervisor :as supervisor]]
- [org.apache.storm [thrift :as thrift] [cluster :as cluster]])
+ [org.apache.storm [thrift :as thrift]])
(:use [conjure core])
(:require [clojure.java.io :as io]))
@@@ -43,9 -43,10 +46,10 @@@
[cluster supervisor-id port]
(let [state (:storm-cluster-state cluster)
slot-assigns (for [storm-id (.assignments state nil)]
- (let [executors (-> (.assignment-info state storm-id nil)
+ (let [executors (-> (clojurify-assignment (.assignmentInfo state storm-id nil))
:executor->node+port
- reverse-map
+ (Utils/reverseMap)
+ clojurify-structure
(get [supervisor-id port] ))]
(when executors [storm-id executors])
))
@@@ -565,198 -632,203 +635,201 @@@
fake-isupervisor (reify ISupervisor
(getSupervisorId [this] nil)
(getAssignmentId [this] nil))
+ fake-cu (proxy [ConfigUtils] []
+ (supervisorStateImpl [conf] nil)
+ (supervisorLocalDirImpl [conf] nil))
+ fake-utils (proxy [Utils] []
+ (localHostnameImpl [] nil)
+ (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
- (upTime [] 0))))]
++ (upTime [] 0))))
+ cluster-utils (Mockito/mock ClusterUtils)]
- (with-open [_ (proxy [MockedConfigUtils] []
- (supervisorStateImpl [conf] nil)
- (supervisorLocalDirImpl [conf] nil))
+ (with-open [_ (ConfigUtilsInstaller. fake-cu)
- _ (UtilsInstaller. fake-utils)]
- (stubbing [cluster/mk-storm-cluster-state nil
- mk-timer nil]
++ _ (UtilsInstaller. fake-utils)
+ mocked-cluster (MockedCluster. cluster-utils)]
- (stubbing [uptime-computer nil
- ; cluster/mk-storm-cluster-state nil
- local-hostname nil
- mk-timer nil]
(supervisor/supervisor-data auth-conf nil fake-isupervisor)
- (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
- ; (verify-call-times-for cluster/mk-storm-cluster-state 1)
- ; (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
- ; expected-acls)
- )))))
- (verify-call-times-for cluster/mk-storm-cluster-state 1)
- (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
- expected-acls)))))
++ (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))))))
- (deftest test-write-log-metadata
- (testing "supervisor writes correct data to logs metadata file"
- (let [exp-owner "alice"
- exp-worker-id "42"
- exp-storm-id "0123456789"
- exp-port 4242
- exp-logs-users ["bob" "charlie" "daryl"]
- exp-logs-groups ["read-only-group" "special-group"]
- storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
- TOPOLOGY-USERS ["charlie" "bob"]
- TOPOLOGY-GROUPS ["special-group"]
- LOGS-GROUPS ["read-only-group"]
- LOGS-USERS ["daryl"]}
- exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
- "worker-id" exp-worker-id
- LOGS-USERS exp-logs-users
- LOGS-GROUPS exp-logs-groups}
- conf {}]
- (mocking [supervisor/write-log-metadata-to-yaml-file!]
- (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
- exp-storm-id exp-port conf)
- (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
- exp-storm-id exp-port exp-data conf)))))
+ (deftest test-write-log-metadata
+ (testing "supervisor writes correct data to logs metadata file"
+ (let [exp-owner "alice"
+ exp-worker-id "42"
+ exp-storm-id "0123456789"
+ exp-port 4242
+ exp-logs-users ["bob" "charlie" "daryl"]
+ exp-logs-groups ["read-only-group" "special-group"]
+ storm-conf {TOPOLOGY-SUBMITTER-USER "alice"
+ TOPOLOGY-USERS ["charlie" "bob"]
+ TOPOLOGY-GROUPS ["special-group"]
+ LOGS-GROUPS ["read-only-group"]
+ LOGS-USERS ["daryl"]}
+ exp-data {TOPOLOGY-SUBMITTER-USER exp-owner
+ "worker-id" exp-worker-id
+ LOGS-USERS exp-logs-users
+ LOGS-GROUPS exp-logs-groups}
+ conf {}]
+ (mocking [supervisor/write-log-metadata-to-yaml-file!]
+ (supervisor/write-log-metadata! storm-conf exp-owner exp-worker-id
+ exp-storm-id exp-port conf)
+ (verify-called-once-with-args supervisor/write-log-metadata-to-yaml-file!
+ exp-storm-id exp-port exp-data conf)))))
- (deftest test-worker-launcher-requires-user
- (testing "worker-launcher throws on blank user"
- (mocking [launch-process]
- (is (thrown-cause-with-msg? java.lang.IllegalArgumentException
- #"(?i).*user cannot be blank.*"
- (supervisor/worker-launcher {} nil ""))))))
+ (deftest test-worker-launcher-requires-user
+ (testing "worker-launcher throws on blank user"
+ (let [utils-proxy (proxy [Utils] []
+ (launchProcessImpl [& _] nil))]
+ (with-open [_ (UtilsInstaller. utils-proxy)]
+ (is (try
+ (supervisor/worker-launcher {} nil "")
+ false
+ (catch Throwable t
+ (and (re-matches #"(?i).*user cannot be blank.*" (.getMessage t))
+ (Utils/exceptionCauseIsInstanceOf java.lang.IllegalArgumentException t)))))))))
- (defn found? [sub-str input-str]
- (if (string? input-str)
- (contrib-str/substring? sub-str (str input-str))
- (boolean (some #(contrib-str/substring? sub-str %) input-str))))
+ (defn found? [sub-str input-str]
+ (if (string? input-str)
+ (contrib-str/substring? sub-str (str input-str))
+ (boolean (some #(contrib-str/substring? sub-str %) input-str))))
- (defn not-found? [sub-str input-str]
+ (defn not-found? [sub-str input-str]
(complement (found? sub-str input-str)))
- (deftest test-substitute-childopts-happy-path-string
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-happy-path-string
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-happy-path-list
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-happy-path-list
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m")
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-happy-path-list-arraylist
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-happy-path-list-arraylist
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts '["-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log" "-Xms256m" "-Xmx%HEAP-MEM%m"]
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01-w-01-9999.log" "-Xms256m" "-Xmx512m")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-topology-id-alone
- (testing "worker-launcher replaces ids in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-topology-id-alone
+ (testing "worker-launcher replaces ids in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%TOPOLOGY-ID%.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-s-01.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-no-keys
- (testing "worker-launcher has no ids to replace in childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-no-keys
+ (testing "worker-launcher has no ids to replace in childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-nil-childopts
- (testing "worker-launcher has nil childopts"
- (let [worker-id "w-01"
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts nil
- expected-childopts nil
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-nil-childopts
+ (testing "worker-launcher has nil childopts"
+ (let [worker-id "w-01"
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts nil
+ expected-childopts nil
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-substitute-childopts-nil-ids
- (testing "worker-launcher has nil ids"
- (let [worker-id nil
- topology-id "s-01"
- port 9999
- mem-onheap 512
- childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
- expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
- childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
- (is (= expected-childopts childopts-with-ids)))))
+ (deftest test-substitute-childopts-nil-ids
+ (testing "worker-launcher has nil ids"
+ (let [worker-id nil
+ topology-id "s-01"
+ port 9999
+ mem-onheap 512
+ childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log"
+ expected-childopts '("-Xloggc:/home/y/lib/storm/current/logs/gc.worker-9999-s-01--9999.log")
+ childopts-with-ids (supervisor/substitute-childopts childopts worker-id topology-id port mem-onheap)]
+ (is (= expected-childopts childopts-with-ids)))))
- (deftest test-retry-read-assignments
- (with-simulated-time-local-cluster [cluster
- :supervisors 0
- :ports-per-supervisor 2
- :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
- NIMBUS-MONITOR-FREQ-SECS 10
- TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
- TOPOLOGY-ACKER-EXECUTORS 0}]
- (letlocals
- (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
- (bind topology1 (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
- {}))
- (bind topology2 (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
- {}))
- (bind state (:storm-cluster-state cluster))
- (bind changed (capture-changed-workers
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology1"
- {TOPOLOGY-WORKERS 2}
- topology1
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- (submit-mocked-assignment
- (:nimbus cluster)
- (:storm-cluster-state cluster)
- "topology2"
- {TOPOLOGY-WORKERS 2}
- topology2
- {1 "1"
- 2 "1"}
- {[1 1] ["sup1" 1]
- [2 2] ["sup1" 2]}
- {["sup1" 1] [0.0 0.0 0.0]
- ["sup1" 2] [0.0 0.0 0.0]
- })
- ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
- (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
- ))
- (is (empty? (:launched changed)))
- (bind options (RebalanceOptions.))
- (.set_wait_secs options 0)
- (bind changed (capture-changed-workers
- (.rebalance (:nimbus cluster) "topology2" options)
- (advance-cluster-time cluster 10)
- (heartbeat-workers cluster "sup1" [1 2 3 4])
- (advance-cluster-time cluster 10)
- ))
- (validate-launched-once (:launched changed)
- {"sup1" [1 2]}
- (get-storm-id (:storm-cluster-state cluster) "topology1"))
- (validate-launched-once (:launched changed)
- {"sup1" [3 4]}
- (get-storm-id (:storm-cluster-state cluster) "topology2"))
- )))
+ (deftest test-retry-read-assignments
+ (with-simulated-time-local-cluster [cluster
+ :supervisors 0
+ :ports-per-supervisor 2
+ :daemon-conf {ConfigUtils/NIMBUS_DO_NOT_REASSIGN true
+ NIMBUS-MONITOR-FREQ-SECS 10
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 30
+ TOPOLOGY-ACKER-EXECUTORS 0}]
+ (letlocals
+ (bind sup1 (add-supervisor cluster :id "sup1" :ports [1 2 3 4]))
+ (bind topology1 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind topology2 (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 2)}
+ {}))
+ (bind state (:storm-cluster-state cluster))
+ (bind changed (capture-changed-workers
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ (:storm-cluster-state cluster)
+ "topology1"
+ {TOPOLOGY-WORKERS 2}
+ topology1
+ {1 "1"
+ 2 "1"}
+ {[1 1] ["sup1" 1]
+ [2 2] ["sup1" 2]}
+ {["sup1" 1] [0.0 0.0 0.0]
+ ["sup1" 2] [0.0 0.0 0.0]
+ })
+ (submit-mocked-assignment
+ (:nimbus cluster)
+ (:storm-cluster-state cluster)
+ "topology2"
+ {TOPOLOGY-WORKERS 2}
+ topology2
+ {1 "1"
+ 2 "1"}
+ {[1 1] ["sup1" 1]
+ [2 2] ["sup1" 2]}
+ {["sup1" 1] [0.0 0.0 0.0]
+ ["sup1" 2] [0.0 0.0 0.0]
+ })
+ ;; Instead of sleeping until topology is scheduled, rebalance topology so mk-assignments is called.
+ (.rebalance (:nimbus cluster) "topology1" (doto (RebalanceOptions.) (.set_wait_secs 0)))
+ ))
+ (is (empty? (:launched changed)))
+ (bind options (RebalanceOptions.))
+ (.set_wait_secs options 0)
+ (bind changed (capture-changed-workers
+ (.rebalance (:nimbus cluster) "topology2" options)
+ (advance-cluster-time cluster 10)
+ (heartbeat-workers cluster "sup1" [1 2 3 4])
+ (advance-cluster-time cluster 10)
+ ))
+ (validate-launched-once (:launched changed)
+ {"sup1" [1 2]}
+ (get-storm-id (:storm-cluster-state cluster) "topology1"))
+ (validate-launched-once (:launched changed)
+ {"sup1" [3 4]}
+ (get-storm-id (:storm-cluster-state cluster) "topology2"))
- ))))
++ )))