You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:33:33 UTC
[44/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateContext.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateContext.java
new file mode 100644
index 0000000..9ad6a92
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateContext.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.cluster;
+
+/**
+ * This class is intended to provide runtime-context to StateStorageFactory
+ * implementors, giving information such as what daemon is creating it.
+ */
+public class ClusterStateContext {
+
+ private DaemonType daemonType;
+
+ public ClusterStateContext() {
+ daemonType = DaemonType.UNKNOWN;
+ }
+
+ public ClusterStateContext(DaemonType daemonType) {
+ this.daemonType = daemonType;
+ }
+
+ public DaemonType getDaemonType() {
+ return daemonType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateListener.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateListener.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateListener.java
new file mode 100644
index 0000000..30e467e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterStateListener.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+public interface ClusterStateListener {
+ void stateChanged(ConnectionState newState);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
new file mode 100644
index 0000000..a800b07
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.ExecutorStats;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ClusterUtils {
+
+ public static final String ZK_SEPERATOR = "/";
+
+ public static final String ASSIGNMENTS_ROOT = "assignments";
+ public static final String CODE_ROOT = "code";
+ public static final String STORMS_ROOT = "storms";
+ public static final String SUPERVISORS_ROOT = "supervisors";
+ public static final String WORKERBEATS_ROOT = "workerbeats";
+ public static final String BACKPRESSURE_ROOT = "backpressure";
+ public static final String ERRORS_ROOT = "errors";
+ public static final String BLOBSTORE_ROOT = "blobstore";
+ public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT = "blobstoremaxkeysequencenumber";
+ public static final String NIMBUSES_ROOT = "nimbuses";
+ public static final String CREDENTIALS_ROOT = "credentials";
+ public static final String LOGCONFIG_ROOT = "logconfigs";
+ public static final String PROFILERCONFIG_ROOT = "profilerconfigs";
+
+ public static final String ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT;
+ public static final String STORMS_SUBTREE = ZK_SEPERATOR + STORMS_ROOT;
+ public static final String SUPERVISORS_SUBTREE = ZK_SEPERATOR + SUPERVISORS_ROOT;
+ public static final String WORKERBEATS_SUBTREE = ZK_SEPERATOR + WORKERBEATS_ROOT;
+ public static final String BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT;
+ public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;
+ public static final String BLOBSTORE_SUBTREE = ZK_SEPERATOR + BLOBSTORE_ROOT;
+ public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE = ZK_SEPERATOR + BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT;
+ public static final String NIMBUSES_SUBTREE = ZK_SEPERATOR + NIMBUSES_ROOT;
+ public static final String CREDENTIALS_SUBTREE = ZK_SEPERATOR + CREDENTIALS_ROOT;
+ public static final String LOGCONFIG_SUBTREE = ZK_SEPERATOR + LOGCONFIG_ROOT;
+ public static final String PROFILERCONFIG_SUBTREE = ZK_SEPERATOR + PROFILERCONFIG_ROOT;
+
+ // A singleton instance allows us to mock delegated static methods in our
+ // tests by subclassing.
+ private static final ClusterUtils INSTANCE = new ClusterUtils();
+ private static ClusterUtils _instance = INSTANCE;
+
+ /**
+ * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the
+ * implementation of the delegated method.
+ *
+ * @param u a Cluster instance
+ */
+ public static void setInstance(ClusterUtils u) {
+ _instance = u;
+ }
+
+ /**
+ * Resets the singleton instance to the default. This is helpful to reset the class to its original functionality when mocking is no longer desired.
+ */
+ public static void resetInstance() {
+ _instance = INSTANCE;
+ }
+
+ public static List<ACL> mkTopoOnlyAcls(Map topoConf) throws NoSuchAlgorithmException {
+ List<ACL> aclList = null;
+ String payload = (String) topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
+ if (Utils.isZkAuthenticationConfiguredTopology(topoConf)) {
+ aclList = new ArrayList<>();
+ ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0);
+ aclList.add(acl1);
+ ACL acl2 = new ACL(ZooDefs.Perms.READ, new Id("digest", DigestAuthenticationProvider.generateDigest(payload)));
+ aclList.add(acl2);
+ }
+ return aclList;
+ }
+
+ public static String supervisorPath(String id) {
+ return SUPERVISORS_SUBTREE + ZK_SEPERATOR + id;
+ }
+
+ public static String assignmentPath(String id) {
+ return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id;
+ }
+
+ public static String blobstorePath(String key) {
+ return BLOBSTORE_SUBTREE + ZK_SEPERATOR + key;
+ }
+
+ public static String blobstoreMaxKeySequenceNumberPath(String key) {
+ return BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE + ZK_SEPERATOR + key;
+ }
+
+ public static String nimbusPath(String id) {
+ return NIMBUSES_SUBTREE + ZK_SEPERATOR + id;
+ }
+
+ public static String stormPath(String id) {
+ return STORMS_SUBTREE + ZK_SEPERATOR + id;
+ }
+
+ public static String workerbeatStormRoot(String stormId) {
+ return WORKERBEATS_SUBTREE + ZK_SEPERATOR + stormId;
+ }
+
+ public static String workerbeatPath(String stormId, String node, Long port) {
+ return workerbeatStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port;
+ }
+
+ public static String backpressureStormRoot(String stormId) {
+ return BACKPRESSURE_SUBTREE + ZK_SEPERATOR + stormId;
+ }
+
+ public static String backpressurePath(String stormId, String node, Long port) {
+ return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port;
+ }
+
+ public static String errorStormRoot(String stormId) {
+ return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
+ }
+
+ public static String errorPath(String stormId, String componentId) {
+ try {
+ return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+
+ public static String lastErrorPath(String stormId, String componentId) {
+ return errorPath(stormId, componentId) + "-last-error";
+ }
+
+ public static String credentialsPath(String stormId) {
+ return CREDENTIALS_SUBTREE + ZK_SEPERATOR + stormId;
+ }
+
+ public static String logConfigPath(String stormId) {
+ return LOGCONFIG_SUBTREE + ZK_SEPERATOR + stormId;
+ }
+
+ public static String profilerConfigPath(String stormId) {
+ return PROFILERCONFIG_SUBTREE + ZK_SEPERATOR + stormId;
+ }
+
+ public static String profilerConfigPath(String stormId, String host, Long port, ProfileAction requestType) {
+ return profilerConfigPath(stormId) + ZK_SEPERATOR + host + "_" + port + "_" + requestType;
+ }
+
+ public static <T> T maybeDeserialize(byte[] serialized, Class<T> clazz) {
+ if (serialized != null) {
+ return Utils.deserialize(serialized, clazz);
+ }
+ return null;
+ }
+
+ /**
+ * Ensures that we only return heartbeats for executors assigned to this worker
+ * @param executors
+ * @param workerHeartbeat
+ * @return
+ */
+ public static Map<ExecutorInfo, ExecutorBeat> convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat workerHeartbeat) {
+ Map<ExecutorInfo, ExecutorBeat> executorWhb = new HashMap<>();
+ Map<ExecutorInfo, ExecutorStats> executorStatsMap = workerHeartbeat.get_executor_stats();
+ for (ExecutorInfo executor : executors) {
+ if (executorStatsMap.containsKey(executor)) {
+ int time = workerHeartbeat.get_time_secs();
+ int uptime = workerHeartbeat.get_uptime_secs();
+ ExecutorStats executorStats = workerHeartbeat.get_executor_stats().get(executor);
+ ExecutorBeat executorBeat = new ExecutorBeat(time, uptime, executorStats);
+ executorWhb.put(executor, executorBeat);
+ }
+ }
+ return executorWhb;
+ }
+
+ public IStormClusterState mkStormClusterStateImpl(Object stateStorage, List<ACL> acls, ClusterStateContext context) throws Exception {
+ if (stateStorage instanceof IStateStorage) {
+ return new StormClusterStateImpl((IStateStorage) stateStorage, acls, context, false);
+ } else {
+ IStateStorage Storage = _instance.mkStateStorageImpl((Map) stateStorage, (Map) stateStorage, acls, context);
+ return new StormClusterStateImpl(Storage, acls, context, true);
+ }
+ }
+
+ public IStateStorage mkStateStorageImpl(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+ String className = null;
+ IStateStorage stateStorage = null;
+ if (config.get(Config.STORM_CLUSTER_STATE_STORE) != null) {
+ className = (String) config.get(Config.STORM_CLUSTER_STATE_STORE);
+ } else {
+ className = "org.apache.storm.cluster.ZKStateStorageFactory";
+ }
+ Class clazz = Class.forName(className);
+ StateStorageFactory storageFactory = (StateStorageFactory) clazz.newInstance();
+ stateStorage = storageFactory.mkStore(config, auth_conf, acls, context);
+ return stateStorage;
+ }
+
+ public static IStateStorage mkStateStorage(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+ return _instance.mkStateStorageImpl(config, auth_conf, acls, context);
+ }
+
+ public static IStormClusterState mkStormClusterState(Object StateStorage, List<ACL> acls, ClusterStateContext context) throws Exception {
+ return _instance.mkStormClusterStateImpl(StateStorage, acls, context);
+ }
+
+ public static String stringifyError(Throwable error) {
+ StringWriter result = new StringWriter();
+ PrintWriter printWriter = new PrintWriter(result);
+ error.printStackTrace(printWriter);
+ return result.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ConnectionState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ConnectionState.java b/storm-client/src/jvm/org/apache/storm/cluster/ConnectionState.java
new file mode 100644
index 0000000..ec5e97d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ConnectionState.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+public enum ConnectionState {
+ CONNECTED,
+ RECONNECTED,
+ LOST
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java b/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
new file mode 100644
index 0000000..4589be3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/DaemonType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.cluster;
+
+public enum DaemonType {
+ SUPERVISOR,
+ NIMBUS,
+ WORKER,
+ PACEMAKER,
+ UNKNOWN
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java b/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
new file mode 100644
index 0000000..b32615e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.generated.ExecutorStats;
+
+public class ExecutorBeat {
+ private final int timeSecs;
+ private final int uptime;
+ private final ExecutorStats stats;
+
+ public ExecutorBeat(int timeSecs, int uptime, ExecutorStats stats) {
+ this.timeSecs = timeSecs;
+ this.uptime = uptime;
+ this.stats = stats;
+ }
+
+ public int getTimeSecs() {
+ return timeSecs;
+ }
+
+ public int getUptime() {
+ return uptime;
+ }
+
+ public ExecutorStats getStats() {
+ return stats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
new file mode 100644
index 0000000..aa731ff
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * StateStorage provides the API for the pluggable state store used by the
+ * Storm daemons. Data is stored in path/value format, and the store supports
+ * listing sub-paths at a given path.
+ * All data should be available across all nodes with eventual consistency.
+ *
+ * IMPORTANT NOTE:
+ * Heartbeats have different api calls used to interact with them. The root
+ * path (/) may or may not be the same as the root path for the other api calls.
+ *
+ * For example, performing these two calls:
+ * set_data("/path", data, acls);
+ * void set_worker_hb("/path", heartbeat, acls);
+ * may or may not cause a collision in "/path".
+ * Never use the same paths with the *_hb* methods as you do with the others.
+ */
+public interface IStateStorage {
+
+ /**
+ * Registers a callback function that gets called when CuratorEvents happen.
+ * @param callback is a clojure IFn that accepts the type - translated to
+ * clojure keyword as in zookeeper - and the path: (callback type path)
+ * @return is an id that can be passed to unregister(...) to unregister the
+ * callback.
+ */
+ String register(ZKStateChangedCallback callback);
+
+ /**
+ * Unregisters a callback function that was registered with register(...).
+ * @param id is the String id that was returned from register(...).
+ */
+ void unregister(String id);
+
+ /**
+ * Path will be appended with a monotonically increasing integer, a new node
+ * will be created there, and data will be put at that node.
+ * @param path The path that the monotonically increasing integer suffix will
+ * be added to.
+ * @param data The data that will be written at the suffixed path's node.
+ * @param acls The acls to apply to the path. May be null.
+ * @return The path with the integer suffix appended.
+ */
+ String create_sequential(String path, byte[] data, List<ACL> acls);
+
+ /**
+ * Creates nodes for path and all its parents. Path elements are separated by
+ * a "/", as in *nix filesystem notation. Equivalent to mkdir -p in *nix.
+ * @param path The path to create, along with all its parents.
+ * @param acls The acls to apply to the path. May be null.
+ * @return path
+ */
+ void mkdirs(String path, List<ACL> acls);
+
+ /**
+ * Deletes the node at a given path, and any child nodes that may exist.
+ * @param path The path to delete
+ */
+ void delete_node(String path);
+
+ /**
+ * Creates an ephemeral node at path. Ephemeral nodes are destroyed
+ * by the store when the client disconnects.
+ * @param path The path where a node will be created.
+ * @param data The data to be written at the node.
+ * @param acls The acls to apply to the path. May be null.
+ */
+ void set_ephemeral_node(String path, byte[] data, List<ACL> acls);
+
+ /**
+ * Gets the 'version' of the node at a path. Optionally sets a watch
+ * on that node. The version should increase whenever a write happens.
+ * @param path The path to get the version of.
+ * @param watch Whether or not to set a watch on the path. Watched paths
+ * emit events which are consumed by functions registered with the
+ * register method. Very useful for catching updates to nodes.
+ * @return The integer version of this node.
+ */
+ Integer get_version(String path, boolean watch) throws Exception;
+
+ /**
+ * Check if a node exists and optionally set a watch on the path.
+ * @param path The path to check for the existence of a node.
+ * @param watch Whether or not to set a watch on the path. Watched paths
+ * emit events which are consumed by functions registered with the
+ * register method. Very useful for catching updates to nodes.
+ * @return Whether or not a node exists at path.
+ */
+ boolean node_exists(String path, boolean watch);
+
+ /**
+ * Get a list of paths of all the child nodes which exist immediately
+ * under path.
+ * @param path The path to look under
+ * @param watch Whether or not to set a watch on the path. Watched paths
+ * emit events which are consumed by functions registered with the
+ * register method. Very useful for catching updates to nodes.
+ * @return list of string paths under path.
+ */
+ List<String> get_children(String path, boolean watch);
+
+ /**
+ * Close the connection to the data store.
+ */
+ void close();
+
+ /**
+ * Set the value of the node at path to data.
+ * @param path The path whose node we want to set.
+ * @param data The data to put in the node.
+ * @param acls The acls to apply to the path. May be null.
+ */
+ void set_data(String path, byte[] data, List<ACL> acls);
+
+ /**
+ * Get the data from the node at path
+ * @param path The path to look under
+ * @param watch Whether or not to set a watch on the path. Watched paths
+ * emit events which are consumed by functions registered with the
+ * register method. Very useful for catching updates to nodes.
+ * @return The data at the node.
+ */
+ byte[] get_data(String path, boolean watch);
+
+ /**
+ * Get the data at the node along with its version. Data is returned
+ * in an Map with the keys data and version.
+ * @param path The path to look under
+ * @param watch Whether or not to set a watch on the path. Watched paths
+ * emit events which are consumed by functions registered with the
+ * register method. Very useful for catching updates to nodes.
+ * @return the data with a version
+ */
+ VersionedData<byte[]> get_data_with_version(String path, boolean watch);
+
+ /**
+ * Write a worker heartbeat at the path.
+ * @param path The path whose node we want to set.
+ * @param data The data to put in the node.
+ * @param acls The acls to apply to the path. May be null.
+ */
+ void set_worker_hb(String path, byte[] data, List<ACL> acls);
+
+ /**
+ * Get the heartbeat from the node at path
+ * @param path The path to look under
+ * @param watch Whether or not to set a watch on the path. Watched paths
+ * emit events which are consumed by functions registered with the
+ * register method. Very useful for catching updates to nodes.
+ * @return The heartbeat at the node.
+ */
+ byte[] get_worker_hb(String path, boolean watch);
+
+ /**
+ * Get a list of paths of all the child nodes which exist immediately
+ * under path. This is similar to get_children, but must be used for
+ * any nodes
+ * @param path The path to look under
+ * @param watch Whether or not to set a watch on the path. Watched paths
+ * emit events which are consumed by functions registered with the
+ * register method. Very useful for catching updates to nodes.
+ * @return list of string paths under path.
+ */
+ List<String> get_worker_hb_children(String path, boolean watch);
+
+ /**
+ * Deletes the heartbeat at a given path, and any child nodes that may exist.
+ * @param path The path to delete.
+ */
+ void delete_worker_hb(String path);
+
+ /**
+ * Add a StateStorageListener to the connection.
+ * @param listener A StateStorageListener to handle changing cluster state
+ * events.
+ */
+ void add_listener(final ConnectionStateListener listener);
+
+ /**
+ * Force consistency on a path. Any writes committed on the path before
+ * this call will be completely propagated when it returns.
+ * @param path The path to synchronize.
+ */
+ void sync_path(String path);
+
+ /**
+ * Allows us to delete the znodes within /storm/blobstore/key_name
+ * whose znodes start with the corresponding nimbusHostPortInfo
+ * @param path /storm/blobstore/key_name
+ * @param nimbusHostPortInfo Contains the host port information of
+ * a nimbus node.
+ */
+ void delete_node_blobstore(String path, String nimbusHostPortInfo);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
new file mode 100644
index 0000000..704c9e5
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.generated.Credentials;
+import org.apache.storm.generated.ErrorInfo;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LogConfig;
+import org.apache.storm.generated.NimbusSummary;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.StormBase;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.nimbus.NimbusInfo;
+
+public interface IStormClusterState {
+ public List<String> assignments(Runnable callback);
+
+ public Assignment assignmentInfo(String stormId, Runnable callback);
+
+ public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback);
+
+ public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
+
+ public List<String> blobstoreInfo(String blobKey);
+
+ public List<NimbusSummary> nimbuses();
+
+ public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
+
+ public List<String> activeStorms();
+
+ /**
+ * Get a storm base for a topology
+ * @param stormId the id of the topology
+ * @param callback something to call if the data changes (best effort)
+ * @return the StormBase or null if it is not alive.
+ */
+ public StormBase stormBase(String stormId, Runnable callback);
+
+ public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
+
+ public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo);
+
+ public List<ProfileRequest> getTopologyProfileRequests(String stormId);
+
+ public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest);
+
+ public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
+
+ public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+
+ public List<String> supervisors(Runnable callback);
+
+ public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
+
+ public void setupHeatbeats(String stormId);
+
+ public void teardownHeartbeats(String stormId);
+
+ public void teardownTopologyErrors(String stormId);
+
+ public List<String> heartbeatStorms();
+
+ public List<String> errorTopologies();
+
+ public List<String> backpressureTopologies();
+
+ public void setTopologyLogConfig(String stormId, LogConfig logConfig);
+
+ public LogConfig topologyLogConfig(String stormId, Runnable cb);
+
+ public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
+
+ public void removeWorkerHeartbeat(String stormId, String node, Long port);
+
+ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
+
+ public void workerBackpressure(String stormId, String node, Long port, boolean on);
+
+ public boolean topologyBackpressure(String stormId, Runnable callback);
+
+ public void setupBackpressure(String stormId);
+
+ public void removeBackpressure(String stormId);
+
+ public void removeWorkerBackpressure(String stormId, String node, Long port);
+
+ public void activateStorm(String stormId, StormBase stormBase);
+
+ public void updateStorm(String stormId, StormBase newElems);
+
+ public void removeStormBase(String stormId);
+
+ public void setAssignment(String stormId, Assignment info);
+
+ public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo);
+
+ public List<String> activeKeys();
+
+ public List<String> blobstore(Runnable callback);
+
+ public void removeStorm(String stormId);
+
+ public void removeBlobstoreKey(String blobKey);
+
+ public void removeKeyVersion(String blobKey);
+
+ public void reportError(String stormId, String componentId, String node, Long port, Throwable error);
+
+ public List<ErrorInfo> errors(String stormId, String componentId);
+
+ public ErrorInfo lastError(String stormId, String componentId);
+
+ public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException;
+
+ public Credentials credentials(String stormId, Runnable callback);
+
+ public void disconnect();
+
+ /**
+ * @return All of the supervisors with the ID as the key
+ */
+ default Map<String, SupervisorInfo> allSupervisorInfo() {
+ return allSupervisorInfo(null);
+ }
+
+ /**
+ * @param callback be alerted if the list of supervisors change
+ * @return All of the supervisors with the ID as the key
+ */
+ default Map<String, SupervisorInfo> allSupervisorInfo(Runnable callback) {
+ Map<String, SupervisorInfo> ret = new HashMap<>();
+ for (String id: supervisors(callback)) {
+ ret.put(id, supervisorInfo(id));
+ }
+ return ret;
+ }
+
+ /**
+ * Get a topology ID from the name of a topology
+ * @param topologyName the name of the topology to look for
+ * @return the id of the topology or null if it is not alive.
+ */
+ default Optional<String> getTopoId(final String topologyName) {
+ String ret = null;
+ for (String topoId: activeStorms()) {
+ String name = stormBase(topoId, null).get_name();
+ if (topologyName.equals(name)) {
+ ret = topoId;
+ break;
+ }
+ }
+ return Optional.ofNullable(ret);
+ }
+
+ default Map<String, Assignment> topologyAssignments() {
+ Map<String, Assignment> ret = new HashMap<>();
+ for (String topoId: assignments(null)) {
+ ret.put(topoId, assignmentInfo(topoId, null));
+ }
+ return ret;
+ }
+
+ default Map<String, StormBase> topologyBases() {
+ Map<String, StormBase> stormBases = new HashMap<>();
+ for (String topologyId : activeStorms()) {
+ StormBase base = stormBase(topologyId, null);
+ if (base != null) { //rece condition with delete
+ stormBases.put(topologyId, base);
+ }
+ }
+ return stormBases;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
new file mode 100644
index 0000000..9295347
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.generated.*;
+import org.apache.storm.pacemaker.PacemakerClientPool;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PaceMakerStateStorage implements IStateStorage {
+
+ private static Logger LOG = LoggerFactory.getLogger(PaceMakerStateStorage.class);
+
+ private PacemakerClientPool pacemakerClientPool;
+ private IStateStorage stateStorage;
+ private static final int maxRetries = 10;
+
+ public PaceMakerStateStorage(PacemakerClientPool pacemakerClientPool, IStateStorage stateStorage) throws Exception {
+ this.pacemakerClientPool = pacemakerClientPool;
+ this.stateStorage = stateStorage;
+ }
+
+ @Override
+ public String register(ZKStateChangedCallback callback) {
+ return stateStorage.register(callback);
+ }
+
+ @Override
+ public void unregister(String id) {
+ stateStorage.unregister(id);
+ }
+
+ @Override
+ public String create_sequential(String path, byte[] data, List<ACL> acls) {
+ return stateStorage.create_sequential(path, data, acls);
+ }
+
+ @Override
+ public void mkdirs(String path, List<ACL> acls) {
+ stateStorage.mkdirs(path, acls);
+ }
+
+ @Override
+ public void delete_node(String path) {
+ stateStorage.delete_node(path);
+ }
+
+ @Override
+ public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
+ stateStorage.set_ephemeral_node(path, data, acls);
+ }
+
+ @Override
+ public Integer get_version(String path, boolean watch) throws Exception {
+ return stateStorage.get_version(path, watch);
+ }
+
+ @Override
+ public boolean node_exists(String path, boolean watch) {
+ return stateStorage.node_exists(path, watch);
+ }
+
+ @Override
+ public List<String> get_children(String path, boolean watch) {
+ return stateStorage.get_children(path, watch);
+ }
+
+ @Override
+ public void close() {
+ stateStorage.close();
+ pacemakerClientPool.close();
+ }
+
+ @Override
+ public void set_data(String path, byte[] data, List<ACL> acls) {
+ stateStorage.set_data(path, data, acls);
+ }
+
+ @Override
+ public byte[] get_data(String path, boolean watch) {
+ return stateStorage.get_data(path, watch);
+ }
+
+ @Override
+ public VersionedData<byte[]> get_data_with_version(String path, boolean watch) {
+ return stateStorage.get_data_with_version(path, watch);
+ }
+
+ @Override
+ public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
+ int retry = maxRetries;
+ while (true) {
+ try {
+ HBPulse hbPulse = new HBPulse();
+ hbPulse.set_id(path);
+ hbPulse.set_details(data);
+ HBMessage message = new HBMessage(HBServerMessageType.SEND_PULSE, HBMessageData.pulse(hbPulse));
+ HBMessage response = pacemakerClientPool.send(message);
+ if (response.get_type() != HBServerMessageType.SEND_PULSE_RESPONSE) {
+ throw new HBExecutionException("Invalid Response Type");
+ }
+ LOG.debug("Successful set_worker_hb");
+ break;
+ } catch (Exception e) {
+ if (retry <= 0) {
+ throw Utils.wrapInRuntime(e);
+ }
+ retry--;
+ LOG.error("{} Failed to set_worker_hb. Will make {} more attempts.", e.getMessage(), retry);
+ }
+ }
+ }
+
+ @Override
+ public byte[] get_worker_hb(String path, boolean watch) {
+ int retry = maxRetries;
+ while (true) {
+ try {
+ byte[] ret = null;
+ int latest_time_secs = 0;
+ boolean got_response = false;
+
+ HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));
+ List<HBMessage> responses = pacemakerClientPool.sendAll(message);
+ for(HBMessage response : responses) {
+ if (response.get_type() != HBServerMessageType.GET_PULSE_RESPONSE) {
+ LOG.error("get_worker_hb: Invalid Response Type");
+ continue;
+ }
+ // We got at least one GET_PULSE_RESPONSE message.
+ got_response = true;
+ byte[] details = response.get_data().get_pulse().get_details();
+ if(details == null) {
+ continue;
+ }
+ ClusterWorkerHeartbeat cwh = Utils.deserialize(details, ClusterWorkerHeartbeat.class);
+ if(cwh != null && cwh.get_time_secs() > latest_time_secs) {
+ latest_time_secs = cwh.get_time_secs();
+ ret = details;
+ }
+ }
+ if(!got_response) {
+ throw new HBExecutionException("Failed to get a response.");
+ }
+ return ret;
+ } catch (Exception e) {
+ if (retry <= 0) {
+ throw Utils.wrapInRuntime(e);
+ }
+ retry--;
+ LOG.error("{} Failed to get_worker_hb. Will make {} more attempts.", e.getMessage(), retry);
+ }
+ }
+ }
+
+ @Override
+ public List<String> get_worker_hb_children(String path, boolean watch) {
+ int retry = maxRetries;
+ while (true) {
+ try {
+ HashSet<String> retSet = new HashSet<>();
+
+ HBMessage message = new HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH, HBMessageData.path(path));
+ List<HBMessage> responses = pacemakerClientPool.sendAll(message);
+ for(HBMessage response : responses) {
+ if (response.get_type() != HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE) {
+ LOG.error("get_worker_hb_children: Invalid Response Type");
+ continue;
+ }
+ if(response.get_data().get_nodes().get_pulseIds() != null) {
+ retSet.addAll(response.get_data().get_nodes().get_pulseIds());
+ }
+ }
+
+ LOG.debug("Successful get_worker_hb");
+ return new ArrayList<>(retSet);
+ } catch (Exception e) {
+ if (retry <= 0) {
+ throw Utils.wrapInRuntime(e);
+ }
+ retry--;
+ LOG.error("{} Failed to get_worker_hb_children. Will make {} more attempts.", e.getMessage(), retry);
+ }
+ }
+ }
+
+ @Override
+ public void delete_worker_hb(String path) {
+ int retry = maxRetries;
+ boolean someSucceeded;
+ while (true) {
+ someSucceeded = false;
+ try {
+ HBMessage message = new HBMessage(HBServerMessageType.DELETE_PATH, HBMessageData.path(path));
+ List<HBMessage> responses = pacemakerClientPool.sendAll(message);
+ boolean allSucceeded = true;
+ for(HBMessage response : responses) {
+ if (response.get_type() != HBServerMessageType.DELETE_PATH_RESPONSE) {
+ LOG.debug("Failed to delete heartbeat {}", response);
+ allSucceeded = false;
+ }
+ else {
+ someSucceeded = true;
+ }
+ }
+ if(allSucceeded) {
+ break;
+ }
+ else {
+ throw new HBExecutionException("Failed to delete from all pacemakers.");
+ }
+ } catch (Exception e) {
+ if (retry <= 0) {
+ if(someSucceeded) {
+ LOG.warn("Unable to delete_worker_hb from every pacemaker.");
+ break;
+ }
+ else {
+ LOG.error("Unable to delete_worker_hb from any pacemaker.");
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+ retry--;
+ LOG.debug("{} Failed to delete_worker_hb. Will make {} more attempts.", e.getMessage(), retry);
+ }
+ }
+ }
+
+ @Override
+ public void add_listener(ConnectionStateListener listener) {
+ stateStorage.add_listener(listener);
+ }
+
+ @Override
+ public void sync_path(String path) {
+ stateStorage.sync_path(path);
+ }
+
+ @Override
+ public void delete_node_blobstore(String path, String nimbusHostPortInfo) {
+ stateStorage.delete_node_blobstore(path, nimbusHostPortInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
new file mode 100644
index 0000000..3574506
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.pacemaker.PacemakerClientPool;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.data.ACL;
+
+public class PaceMakerStateStorageFactory implements StateStorageFactory {
+ @Override
+ public IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) {
+ try {
+ ZKStateStorageFactory zkfact = new ZKStateStorageFactory();
+ IStateStorage zkState = zkfact.mkStore(config, auth_conf, acls, context);
+ return new PaceMakerStateStorage(new PacemakerClientPool(config), zkState);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java b/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
new file mode 100644
index 0000000..0929750
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zookeeper.data.ACL;
+
+public interface StateStorageFactory {
+
+ IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
new file mode 100644
index 0000000..51caad9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.*;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.storm.callback.ZKStateChangedCallback;
+import org.apache.storm.generated.*;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.Time;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StormClusterStateImpl implements IStormClusterState {
+
+ private static Logger LOG = LoggerFactory.getLogger(StormClusterStateImpl.class);
+
+ private IStateStorage stateStorage;
+
+ private ConcurrentHashMap<String, Runnable> assignmentInfoCallback;
+ private ConcurrentHashMap<String, Runnable> assignmentInfoWithVersionCallback;
+ private ConcurrentHashMap<String, Runnable> assignmentVersionCallback;
+ private AtomicReference<Runnable> supervisorsCallback;
+ // we want to reigister a topo directory getChildren callback for all workers of this dir
+ private ConcurrentHashMap<String, Runnable> backPressureCallback;
+ private AtomicReference<Runnable> assignmentsCallback;
+ private ConcurrentHashMap<String, Runnable> stormBaseCallback;
+ private AtomicReference<Runnable> blobstoreCallback;
+ private ConcurrentHashMap<String, Runnable> credentialsCallback;
+ private ConcurrentHashMap<String, Runnable> logConfigCallback;
+
+ private List<ACL> acls;
+ private String stateId;
+ private boolean solo;
+
+ public StormClusterStateImpl(IStateStorage StateStorage, List<ACL> acls, ClusterStateContext context, boolean solo) throws Exception {
+
+ this.stateStorage = StateStorage;
+ this.solo = solo;
+ this.acls = acls;
+
+ assignmentInfoCallback = new ConcurrentHashMap<>();
+ assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
+ assignmentVersionCallback = new ConcurrentHashMap<>();
+ supervisorsCallback = new AtomicReference<>();
+ backPressureCallback = new ConcurrentHashMap<>();
+ assignmentsCallback = new AtomicReference<>();
+ stormBaseCallback = new ConcurrentHashMap<>();
+ credentialsCallback = new ConcurrentHashMap<>();
+ logConfigCallback = new ConcurrentHashMap<>();
+ blobstoreCallback = new AtomicReference<>();
+
+ stateId = this.stateStorage.register(new ZKStateChangedCallback() {
+
+ public void changed(Watcher.Event.EventType type, String path) {
+ List<String> toks = tokenizePath(path);
+ int size = toks.size();
+ if (size >= 1) {
+ String root = toks.get(0);
+ if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) {
+ if (size == 1) {
+ // set null and get the old value
+ issueCallback(assignmentsCallback);
+ } else {
+ issueMapCallback(assignmentInfoCallback, toks.get(1));
+ issueMapCallback(assignmentVersionCallback, toks.get(1));
+ issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
+ }
+
+ } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) {
+ issueCallback(supervisorsCallback);
+ } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) {
+ issueCallback(blobstoreCallback);
+ } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) {
+ issueMapCallback(stormBaseCallback, toks.get(1));
+ } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) {
+ issueMapCallback(credentialsCallback, toks.get(1));
+ } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) {
+ issueMapCallback(logConfigCallback, toks.get(1));
+ } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
+ issueMapCallback(backPressureCallback, toks.get(1));
+ } else {
+ LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
+ Runtime.getRuntime().exit(30);
+ }
+
+ }
+
+ return;
+ }
+
+ });
+
+ String[] pathlist = { ClusterUtils.ASSIGNMENTS_SUBTREE,
+ ClusterUtils.STORMS_SUBTREE,
+ ClusterUtils.SUPERVISORS_SUBTREE,
+ ClusterUtils.WORKERBEATS_SUBTREE,
+ ClusterUtils.ERRORS_SUBTREE,
+ ClusterUtils.BLOBSTORE_SUBTREE,
+ ClusterUtils.NIMBUSES_SUBTREE,
+ ClusterUtils.LOGCONFIG_SUBTREE,
+ ClusterUtils.BACKPRESSURE_SUBTREE };
+ for (String path : pathlist) {
+ this.stateStorage.mkdirs(path, acls);
+ }
+
+ }
+
+ protected void issueCallback(AtomicReference<Runnable> cb) {
+ Runnable callback = cb.getAndSet(null);
+ if (callback != null)
+ callback.run();
+ }
+
+ protected void issueMapCallback(ConcurrentHashMap<String, Runnable> callbackConcurrentHashMap, String key) {
+ Runnable callback = callbackConcurrentHashMap.remove(key);
+ if (callback != null)
+ callback.run();
+ }
+
+ @Override
+ public List<String> assignments(Runnable callback) {
+ if (callback != null) {
+ assignmentsCallback.set(callback);
+ }
+ return stateStorage.get_children(ClusterUtils.ASSIGNMENTS_SUBTREE, callback != null);
+ }
+
+ @Override
+ public Assignment assignmentInfo(String stormId, Runnable callback) {
+ if (callback != null) {
+ assignmentInfoCallback.put(stormId, callback);
+ }
+ byte[] serialized = stateStorage.get_data(ClusterUtils.assignmentPath(stormId), callback != null);
+ return ClusterUtils.maybeDeserialize(serialized, Assignment.class);
+ }
+
+ @Override
+ public VersionedData<Assignment> assignmentInfoWithVersion(String stormId, Runnable callback) {
+ if (callback != null) {
+ assignmentInfoWithVersionCallback.put(stormId, callback);
+ }
+ Assignment assignment = null;
+ Integer version = 0;
+ VersionedData<byte[]> dataWithVersion = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
+ if (dataWithVersion != null) {
+ assignment = ClusterUtils.maybeDeserialize(dataWithVersion.getData(), Assignment.class);
+ version = dataWithVersion.getVersion();
+ }
+ return new VersionedData<Assignment>(version, assignment);
+ }
+
+ @Override
+ public Integer assignmentVersion(String stormId, Runnable callback) throws Exception {
+ if (callback != null) {
+ assignmentVersionCallback.put(stormId, callback);
+ }
+ return stateStorage.get_version(ClusterUtils.assignmentPath(stormId), callback != null);
+ }
+
+ // blobstore state
+ @Override
+ public List<String> blobstoreInfo(String blobKey) {
+ String path = ClusterUtils.blobstorePath(blobKey);
+ stateStorage.sync_path(path);
+ return stateStorage.get_children(path, false);
+ }
+
+ @Override
+ public List<NimbusSummary> nimbuses() {
+ List<NimbusSummary> nimbusSummaries = new ArrayList<>();
+ List<String> nimbusIds = stateStorage.get_children(ClusterUtils.NIMBUSES_SUBTREE, false);
+ for (String nimbusId : nimbusIds) {
+ byte[] serialized = stateStorage.get_data(ClusterUtils.nimbusPath(nimbusId), false);
+ // check for null which can exist because of a race condition in which nimbus nodes in zk may have been
+ // removed when connections are reconnected after getting children in the above line
+ if (serialized != null) {
+ NimbusSummary nimbusSummary = ClusterUtils.maybeDeserialize(serialized, NimbusSummary.class);
+ nimbusSummaries.add(nimbusSummary);
+ }
+ }
+ return nimbusSummaries;
+ }
+
+ @Override
+ public void addNimbusHost(final String nimbusId, final NimbusSummary nimbusSummary) {
+ // explicit delete for ephmeral node to ensure this session creates the entry.
+ stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
+ stateStorage.add_listener(new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+ LOG.info("Connection state listener invoked, zookeeper connection state has changed to {}", connectionState);
+ if (connectionState.equals(ConnectionState.RECONNECTED)) {
+ LOG.info("Connection state has changed to reconnected so setting nimbuses entry one more time");
+ // explicit delete for ephmeral node to ensure this session creates the entry.
+ stateStorage.delete_node(ClusterUtils.nimbusPath(nimbusId));
+ stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+ }
+
+ }
+ });
+
+ stateStorage.set_ephemeral_node(ClusterUtils.nimbusPath(nimbusId), Utils.serialize(nimbusSummary), acls);
+ }
+
+ @Override
+ public List<String> activeStorms() {
+ return stateStorage.get_children(ClusterUtils.STORMS_SUBTREE, false);
+ }
+
+ @Override
+ public StormBase stormBase(String stormId, Runnable callback) {
+ if (callback != null) {
+ stormBaseCallback.put(stormId, callback);
+ }
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(ClusterUtils.stormPath(stormId), callback != null), StormBase.class);
+ }
+
+ @Override
+ public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port) {
+ byte[] bytes = stateStorage.get_worker_hb(ClusterUtils.workerbeatPath(stormId, node, port), false);
+ return ClusterUtils.maybeDeserialize(bytes, ClusterWorkerHeartbeat.class);
+
+ }
+
+ @Override
+ public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo) {
+ List<ProfileRequest> requests = new ArrayList<>();
+ List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId);
+ for (ProfileRequest profileRequest : profileRequests) {
+ NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
+ if (nodeInfo1.equals(nodeInfo))
+ requests.add(profileRequest);
+ }
+ return requests;
+ }
+
+ @Override
+ public List<ProfileRequest> getTopologyProfileRequests(String stormId) {
+ List<ProfileRequest> profileRequests = new ArrayList<>();
+ String path = ClusterUtils.profilerConfigPath(stormId);
+ if (stateStorage.node_exists(path, false)) {
+ List<String> strs = stateStorage.get_children(path, false);
+ for (String str : strs) {
+ String childPath = path + ClusterUtils.ZK_SEPERATOR + str;
+ byte[] raw = stateStorage.get_data(childPath, false);
+ ProfileRequest request = ClusterUtils.maybeDeserialize(raw, ProfileRequest.class);
+ if (request != null)
+ profileRequests.add(request);
+ }
+ }
+ return profileRequests;
+ }
+
+ @Override
+ public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest) {
+ ProfileAction profileAction = profileRequest.get_action();
+ String host = profileRequest.get_nodeInfo().get_node();
+ Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+ String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+ stateStorage.set_data(path, Utils.serialize(profileRequest), acls);
+ }
+
+ @Override
+ public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest) {
+ ProfileAction profileAction = profileRequest.get_action();
+ String host = profileRequest.get_nodeInfo().get_node();
+ Long port = profileRequest.get_nodeInfo().get_port_iterator().next();
+ String path = ClusterUtils.profilerConfigPath(stormId, host, port, profileAction);
+ stateStorage.delete_node(path);
+ }
+
+ /**
+ * need to take executor->node+port in explicitly so that we don't run into a situation where a long dead worker with a skewed clock overrides all the
+ * timestamps. By only checking heartbeats with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, we avoid
+ * situations like that
+ *
+ * @param stormId
+ * @param executorNodePort
+ * @return
+ */
+ @Override
+ public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
+ Map<ExecutorInfo, ExecutorBeat> executorWhbs = new HashMap<>();
+
+ Map<NodeInfo, List<List<Long>>> nodePortExecutors = Utils.reverseMap(executorNodePort);
+
+ for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
+
+ String node = entry.getKey().get_node();
+ Long port = entry.getKey().get_port_iterator().next();
+ ClusterWorkerHeartbeat whb = getWorkerHeartbeat(stormId, node, port);
+ List<ExecutorInfo> executorInfoList = new ArrayList<>();
+ for (List<Long> list : entry.getValue()) {
+ executorInfoList.add(new ExecutorInfo(list.get(0).intValue(), list.get(list.size() - 1).intValue()));
+ }
+ if (whb != null)
+ executorWhbs.putAll(ClusterUtils.convertExecutorBeats(executorInfoList, whb));
+ }
+ return executorWhbs;
+ }
+
+ @Override
+ public List<String> supervisors(Runnable callback) {
+ if (callback != null) {
+ supervisorsCallback.set(callback);
+ }
+ return stateStorage.get_children(ClusterUtils.SUPERVISORS_SUBTREE, callback != null);
+ }
+
+ @Override
+ public SupervisorInfo supervisorInfo(String supervisorId) {
+ String path = ClusterUtils.supervisorPath(supervisorId);
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), SupervisorInfo.class);
+ }
+
+ @Override
+ public void setupHeatbeats(String stormId) {
+ stateStorage.mkdirs(ClusterUtils.workerbeatStormRoot(stormId), acls);
+ }
+
+ @Override
+ public void teardownHeartbeats(String stormId) {
+ try {
+ stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId));
+ } catch (Exception e) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
+ // do nothing
+ LOG.warn("Could not teardown heartbeats for {}.", stormId);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void teardownTopologyErrors(String stormId) {
+ try {
+ stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId));
+ } catch (Exception e) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
+ // do nothing
+ LOG.warn("Could not teardown errors for {}.", stormId);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public List<String> heartbeatStorms() {
+ return stateStorage.get_worker_hb_children(ClusterUtils.WORKERBEATS_SUBTREE, false);
+ }
+
+ @Override
+ public List<String> errorTopologies() {
+ return stateStorage.get_children(ClusterUtils.ERRORS_SUBTREE, false);
+ }
+
+ @Override
+ public List<String> backpressureTopologies() {
+ return stateStorage.get_children(ClusterUtils.BACKPRESSURE_SUBTREE, false);
+ }
+
+ @Override
+ public void setTopologyLogConfig(String stormId, LogConfig logConfig) {
+ stateStorage.set_data(ClusterUtils.logConfigPath(stormId), Utils.serialize(logConfig), acls);
+ }
+
+ @Override
+ public LogConfig topologyLogConfig(String stormId, Runnable cb) {
+ if (cb != null){
+ logConfigCallback.put(stormId, cb);
+ }
+ String path = ClusterUtils.logConfigPath(stormId);
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class);
+ }
+
+ @Override
+ public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info) {
+ if (info != null) {
+ String path = ClusterUtils.workerbeatPath(stormId, node, port);
+ stateStorage.set_worker_hb(path, Utils.serialize(info), acls);
+ }
+ }
+
+ @Override
+ public void removeWorkerHeartbeat(String stormId, String node, Long port) {
+ String path = ClusterUtils.workerbeatPath(stormId, node, port);
+ stateStorage.delete_worker_hb(path);
+ }
+
+ @Override
+ public void supervisorHeartbeat(String supervisorId, SupervisorInfo info) {
+ String path = ClusterUtils.supervisorPath(supervisorId);
+ stateStorage.set_ephemeral_node(path, Utils.serialize(info), acls);
+ }
+
+ /**
+ * if znode exists and to be not on?, delete; if exists and on?, do nothing; if not exists and to be on?, create; if not exists and not on?, do nothing;
+ *
+ * @param stormId
+ * @param node
+ * @param port
+ * @param on
+ */
+ @Override
+ public void workerBackpressure(String stormId, String node, Long port, boolean on) {
+ String path = ClusterUtils.backpressurePath(stormId, node, port);
+ boolean existed = stateStorage.node_exists(path, false);
+ if (existed) {
+ if (on == false)
+ stateStorage.delete_node(path);
+
+ } else {
+ if (on == true) {
+ stateStorage.set_ephemeral_node(path, null, acls);
+ }
+ }
+ }
+
+ /**
+ * Check whether a topology is in throttle-on status or not:
+ * if the backpresure/storm-id dir is not empty, this topology has throttle-on, otherwise throttle-off.
+ *
+ * @param stormId
+ * @param callback
+ * @return
+ */
+ @Override
+ public boolean topologyBackpressure(String stormId, Runnable callback) {
+ if (callback != null) {
+ backPressureCallback.put(stormId, callback);
+ }
+ String path = ClusterUtils.backpressureStormRoot(stormId);
+ List<String> childrens = null;
+ if(stateStorage.node_exists(path, false)) {
+ childrens = stateStorage.get_children(path, callback != null);
+ } else {
+ childrens = new ArrayList<>();
+ }
+ return childrens.size() > 0;
+
+ }
+
+ @Override
+ public void setupBackpressure(String stormId) {
+ stateStorage.mkdirs(ClusterUtils.backpressureStormRoot(stormId), acls);
+ }
+
+ @Override
+ public void removeBackpressure(String stormId) {
+ try {
+ stateStorage.delete_node(ClusterUtils.backpressureStormRoot(stormId));
+ } catch (Exception e) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
+ // do nothing
+ LOG.warn("Could not teardown backpressure node for {}.", stormId);
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public void removeWorkerBackpressure(String stormId, String node, Long port) {
+ String path = ClusterUtils.backpressurePath(stormId, node, port);
+ boolean existed = stateStorage.node_exists(path, false);
+ if (existed) {
+ stateStorage.delete_node(path);
+ }
+ }
+
+ @Override
+ public void activateStorm(String stormId, StormBase stormBase) {
+ String path = ClusterUtils.stormPath(stormId);
+ stateStorage.set_data(path, Utils.serialize(stormBase), acls);
+ }
+
+ /**
+ * To update this function due to APersistentMap/APersistentSet is clojure's structure
+ *
+ * @param stormId
+ * @param newElems
+ */
+ @Override
+ public void updateStorm(String stormId, StormBase newElems) {
+
+ StormBase stormBase = stormBase(stormId, null);
+ if (stormBase.get_component_executors() != null) {
+
+ Map<String, Integer> newComponentExecutors = new HashMap<>();
+ Map<String, Integer> componentExecutors = newElems.get_component_executors();
+ // componentExecutors maybe be APersistentMap, which don't support "put"
+ for (Map.Entry<String, Integer> entry : componentExecutors.entrySet()) {
+ newComponentExecutors.put(entry.getKey(), entry.getValue());
+ }
+ for (Map.Entry<String, Integer> entry : stormBase.get_component_executors().entrySet()) {
+ if (!componentExecutors.containsKey(entry.getKey())) {
+ newComponentExecutors.put(entry.getKey(), entry.getValue());
+ }
+ }
+ if (newComponentExecutors.size() > 0)
+ newElems.set_component_executors(newComponentExecutors);
+ }
+
+ Map<String, DebugOptions> ComponentDebug = new HashMap<>();
+ Map<String, DebugOptions> oldComponentDebug = stormBase.get_component_debug();
+
+ Map<String, DebugOptions> newComponentDebug = newElems.get_component_debug();
+ /// oldComponentDebug.keySet()/ newComponentDebug.keySet() maybe be APersistentSet, which don't support addAll
+ Set<String> debugOptionsKeys = new HashSet<>();
+ debugOptionsKeys.addAll(oldComponentDebug.keySet());
+ debugOptionsKeys.addAll(newComponentDebug.keySet());
+ for (String key : debugOptionsKeys) {
+ boolean enable = false;
+ double samplingpct = 0;
+ if (oldComponentDebug.containsKey(key)) {
+ enable = oldComponentDebug.get(key).is_enable();
+ samplingpct = oldComponentDebug.get(key).get_samplingpct();
+ }
+ if (newComponentDebug.containsKey(key)) {
+ enable = newComponentDebug.get(key).is_enable();
+ samplingpct += newComponentDebug.get(key).get_samplingpct();
+ }
+ DebugOptions debugOptions = new DebugOptions();
+ debugOptions.set_enable(enable);
+ debugOptions.set_samplingpct(samplingpct);
+ ComponentDebug.put(key, debugOptions);
+ }
+ if (ComponentDebug.size() > 0) {
+ newElems.set_component_debug(ComponentDebug);
+ }
+
+ if (StringUtils.isBlank(newElems.get_name())) {
+ newElems.set_name(stormBase.get_name());
+ }
+ if (newElems.get_status() == null) {
+ newElems.set_status(stormBase.get_status());
+ }
+ if (newElems.get_num_workers() == 0) {
+ newElems.set_num_workers(stormBase.get_num_workers());
+ }
+ if (newElems.get_launch_time_secs() == 0) {
+ newElems.set_launch_time_secs(stormBase.get_launch_time_secs());
+ }
+ if (StringUtils.isBlank(newElems.get_owner())) {
+ newElems.set_owner(stormBase.get_owner());
+ }
+ if (newElems.get_topology_action_options() == null) {
+ newElems.set_topology_action_options(stormBase.get_topology_action_options());
+ }
+ if (newElems.get_status() == null) {
+ newElems.set_status(stormBase.get_status());
+ }
+ stateStorage.set_data(ClusterUtils.stormPath(stormId), Utils.serialize(newElems), acls);
+ }
+
+ @Override
+ public void removeStormBase(String stormId) {
+ stateStorage.delete_node(ClusterUtils.stormPath(stormId));
+ }
+
+ @Override
+ public void setAssignment(String stormId, Assignment info) {
+ stateStorage.set_data(ClusterUtils.assignmentPath(stormId), Utils.serialize(info), acls);
+ }
+
+ @Override
+ public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo) {
+ String path = ClusterUtils.blobstorePath(key) + ClusterUtils.ZK_SEPERATOR + nimbusInfo.toHostPortString() + "-" + versionInfo;
+ LOG.info("set-path: {}", path);
+ stateStorage.mkdirs(ClusterUtils.blobstorePath(key), acls);
+ stateStorage.delete_node_blobstore(ClusterUtils.blobstorePath(key), nimbusInfo.toHostPortString());
+ stateStorage.set_ephemeral_node(path, null, acls);
+ }
+
+ @Override
+ public List<String> activeKeys() {
+ return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, false);
+ }
+
+ // blobstore state
+ @Override
+ public List<String> blobstore(Runnable callback) {
+ if (callback != null) {
+ blobstoreCallback.set(callback);
+ }
+ stateStorage.sync_path(ClusterUtils.BLOBSTORE_SUBTREE);
+ return stateStorage.get_children(ClusterUtils.BLOBSTORE_SUBTREE, callback != null);
+
+ }
+
+ @Override
+ public void removeStorm(String stormId) {
+ stateStorage.delete_node(ClusterUtils.assignmentPath(stormId));
+ stateStorage.delete_node(ClusterUtils.credentialsPath(stormId));
+ stateStorage.delete_node(ClusterUtils.logConfigPath(stormId));
+ stateStorage.delete_node(ClusterUtils.profilerConfigPath(stormId));
+ removeStormBase(stormId);
+ }
+
+ @Override
+ public void removeBlobstoreKey(String blobKey) {
+ LOG.debug("remove key {}", blobKey);
+ stateStorage.delete_node(ClusterUtils.blobstorePath(blobKey));
+ }
+
+ @Override
+ public void removeKeyVersion(String blobKey) {
+ stateStorage.delete_node(ClusterUtils.blobstoreMaxKeySequenceNumberPath(blobKey));
+ }
+
+ @Override
+ public void reportError(String stormId, String componentId, String node, Long port, Throwable error) {
+
+ String path = ClusterUtils.errorPath(stormId, componentId);
+ String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
+ ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
+ errorInfo.set_host(node);
+ errorInfo.set_port(port.intValue());
+ byte[] serData = Utils.serialize(errorInfo);
+ stateStorage.mkdirs(path, acls);
+ stateStorage.create_sequential(path + ClusterUtils.ZK_SEPERATOR + "e", serData, acls);
+ stateStorage.set_data(lastErrorPath, serData, acls);
+ List<String> childrens = stateStorage.get_children(path, false);
+
+ Collections.sort(childrens, new Comparator<String>() {
+ public int compare(String arg0, String arg1) {
+ return Long.compare(Long.parseLong(arg0.substring(1)), Long.parseLong(arg1.substring(1)));
+ }
+ });
+
+ while (childrens.size() > 10) {
+ stateStorage.delete_node(path + ClusterUtils.ZK_SEPERATOR + childrens.remove(0));
+ }
+ }
+
+ @Override
+ public List<ErrorInfo> errors(String stormId, String componentId) {
+ List<ErrorInfo> errorInfos = new ArrayList<>();
+ String path = ClusterUtils.errorPath(stormId, componentId);
+ if (stateStorage.node_exists(path, false)) {
+ List<String> childrens = stateStorage.get_children(path, false);
+ for (String child : childrens) {
+ String childPath = path + ClusterUtils.ZK_SEPERATOR + child;
+ ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class);
+ if (errorInfo != null)
+ errorInfos.add(errorInfo);
+ }
+ }
+ Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
+ public int compare(ErrorInfo arg0, ErrorInfo arg1) {
+ return Integer.compare(arg1.get_error_time_secs(), arg0.get_error_time_secs());
+ }
+ });
+
+ return errorInfos;
+ }
+
+ @Override
+ public ErrorInfo lastError(String stormId, String componentId) {
+
+ String path = ClusterUtils.lastErrorPath(stormId, componentId);
+ if (stateStorage.node_exists(path, false)) {
+ ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(path, false), ErrorInfo.class);
+ return errorInfo;
+ }
+
+ return null;
+ }
+
+ @Override
+ public void setCredentials(String stormId, Credentials creds, Map<String, Object> topoConf) throws NoSuchAlgorithmException {
+ List<ACL> aclList = ClusterUtils.mkTopoOnlyAcls(topoConf);
+ String path = ClusterUtils.credentialsPath(stormId);
+ stateStorage.set_data(path, Utils.serialize(creds), aclList);
+
+ }
+
+ @Override
+ public Credentials credentials(String stormId, Runnable callback) {
+ if (callback != null) {
+ credentialsCallback.put(stormId, callback);
+ }
+ String path = ClusterUtils.credentialsPath(stormId);
+ return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, callback != null), Credentials.class);
+
+ }
+
+ @Override
+ public void disconnect() {
+ stateStorage.unregister(stateId);
+ if (solo)
+ stateStorage.close();
+ }
+
+ private List<String> tokenizePath(String path) {
+ String[] toks = path.split("/");
+ java.util.ArrayList<String> rtn = new ArrayList<String>();
+ for (String str : toks) {
+ if (!str.isEmpty()) {
+ rtn.add(str);
+ }
+ }
+ return rtn;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/cluster/VersionedData.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/cluster/VersionedData.java b/storm-client/src/jvm/org/apache/storm/cluster/VersionedData.java
new file mode 100644
index 0000000..3de2a88
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/cluster/VersionedData.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+public class VersionedData<D> {
+ private final int version;
+ private final D data;
+
+ public VersionedData(int version, D data) {
+ this.version = version;
+ this.data = data;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public D getData() {
+ return data;
+ }
+}