You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/02/24 17:17:28 UTC
[18/27] storm git commit: removed any clojure internals
removed any clojure internals
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/afcd0c6c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/afcd0c6c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/afcd0c6c
Branch: refs/heads/master
Commit: afcd0c6c53aca7f99d39f10b91a7b45fda424fe5
Parents: d546387
Author: xiaojian.fxj <xi...@alibaba-inc.com>
Authored: Fri Feb 19 15:21:48 2016 +0800
Committer: xiaojian.fxj <xi...@alibaba-inc.com>
Committed: Fri Feb 19 15:22:22 2016 +0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/converter.clj | 9 +++
.../src/clj/org/apache/storm/daemon/nimbus.clj | 4 +-
.../clj/org/apache/storm/daemon/supervisor.clj | 8 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 6 +-
storm-core/src/clj/org/apache/storm/testing.clj | 2 +-
.../org/apache/storm/cluster/ClusterUtils.java | 41 +++-------
.../org/apache/storm/cluster/ExecutorBeat.java | 44 +++++++++++
.../org/apache/storm/cluster/IStateStorage.java | 11 ++-
.../storm/cluster/IStormClusterState.java | 24 +++---
.../storm/cluster/PaceMakerStateStorage.java | 4 +-
.../cluster/PaceMakerStateStorageFactory.java | 12 +--
.../storm/cluster/StateStorageFactory.java | 6 +-
.../storm/cluster/StormClusterStateImpl.java | 78 ++++++++++----------
.../apache/storm/cluster/ZKStateStorage.java | 3 +-
.../storm/cluster/ZKStateStorageFactory.java | 4 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 46 +++---------
16 files changed, 156 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
index 18647b1..c845cd4 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -18,6 +18,7 @@
StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
TopologyActionOptions DebugOptions ProfileRequest]
[org.apache.storm.utils Utils])
+ (:import [org.apache.storm.cluster ExecutorBeat])
(:use [org.apache.storm util stats log])
(:require [org.apache.storm.daemon [common :as common]]))
@@ -238,6 +239,14 @@
}
{}))
+(defn clojurify-zk-executor-hb [^ExecutorBeat executor-hb]
+ (if executor-hb
+ {:stats (.getStats executor-hb)
+ :uptime (.getUptime executor-hb)
+ :time-secs (.getTimeSecs executor-hb)
+ }
+ {}))
+
(defn thriftify-zk-worker-hb [worker-hb]
(if (not-empty (filter second (:executor-stats worker-hb)))
(doto (ClusterWorkerHeartbeat.)
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 6bdbdc0..beb6639 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -591,8 +591,8 @@
(let [storm-cluster-state (:storm-cluster-state nimbus)
executor-beats (let [executor-stats-java-map (.executorBeats storm-cluster-state storm-id (.get_executor_node_port (thriftify-assignment existing-assignment)))
executor-stats-clojurify (clojurify-structure executor-stats-java-map)]
- (->> (dofor [[^ExecutorInfo executor-info executor-heartbeat] executor-stats-clojurify]
- {[(.get_task_start executor-info) (.get_task_end executor-info)] executor-heartbeat})
+ (->> (dofor [[^ExecutorInfo executor-info ^ExecutorBeat executor-heartbeat] executor-stats-clojurify]
+ {[(.get_task_start executor-info) (.get_task_end executor-info)] (clojurify-zk-executor-hb executor-heartbeat)})
(apply merge)))
cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index c1f058f..58f6291 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -20,7 +20,7 @@
ConfigUtils]
[org.apache.storm.daemon Shutdownable]
[org.apache.storm Constants]
- [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils]
+ [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils IStateStorage]
[java.net JarURLConnection]
[java.net URI URLDecoder]
[org.apache.commons.io FileUtils])
@@ -69,8 +69,8 @@
(if (= assignment-version recorded-version)
{sid (get assignment-versions sid)}
(let [thriftify-assignment-version (.assignmentInfoWithVersion storm-cluster-state sid callback)
- assignment (clojurify-assignment (:data thriftify-assignment-version))]
- {sid {:data assignment :version (:version thriftify-assignment-version)}}))
+ assignment (clojurify-assignment (.get thriftify-assignment-version (IStateStorage/DATA)))]
+ {sid {:data assignment :version (.get thriftify-assignment-version (IStateStorage/VERSION))}}))
{sid nil})))
(apply merge)
(filter-val not-nil?))
@@ -1184,7 +1184,7 @@
(.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) (FileOutputStream. (ConfigUtils/supervisorStormConfPath tmproot)) nil)
(finally
(.shutdown blob-store)))
- (try (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) (catch Exception e))
+ (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
(setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot)
(let [classloader (.getContextClassLoader (Thread/currentThread))
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index b80cd9e..395be23 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -38,7 +38,7 @@
(:import [org.apache.storm.task WorkerTopologyContext])
(:import [org.apache.storm Constants])
(:import [org.apache.storm.security.auth AuthUtils])
- (:import [org.apache.storm.cluster ClusterStateContext DaemonType ZKStateStorage StormClusterStateImpl ClusterUtils])
+ (:import [org.apache.storm.cluster ClusterStateContext DaemonType ZKStateStorage StormClusterStateImpl ClusterUtils IStateStorage])
(:import [javax.security.auth Subject])
(:import [java.security PrivilegedExceptionAction])
(:import [org.apache.logging.log4j LogManager])
@@ -381,8 +381,8 @@
(let [version (.assignmentVersion storm-cluster-state storm-id callback)
assignment (if (= version (:version (get @(:assignment-versions worker) storm-id)))
(:data (get @(:assignment-versions worker) storm-id))
- (let [java-assignment (.assignmentInfoWithVersion storm-cluster-state storm-id callback)
- new-assignment {:data (clojurify-assignment (:data java-assignment)) :version version}]
+ (let [thriftify-assignment-version (.assignmentInfoWithVersion storm-cluster-state storm-id callback)
+ new-assignment {:data (clojurify-assignment (.get thriftify-assignment-version (IStateStorage/DATA))) :version version}]
(swap! (:assignment-versions worker) assoc storm-id new-assignment)
(:data new-assignment)))
my-assignment (-> assignment
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index eef7754..3dee54b 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -448,7 +448,7 @@
component->tasks)
task-ids (apply concat (vals component->tasks))
assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
- taskbeats (.taskbeats state storm-id (:task->node+port assignment)) ;hava question?
+ taskbeats (.taskbeats state storm-id (:task->node+port assignment))
heartbeats (dofor [id task-ids] (get taskbeats id))
stats (dofor [hb heartbeats] (if hb (stat-key (:stats hb)) 0))]
(reduce + stats)))
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 0c663f0..aae4231 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -17,9 +17,6 @@
*/
package org.apache.storm.cluster;
-import clojure.lang.APersistentMap;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.RT;
import org.apache.storm.Config;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
import org.apache.storm.generated.ExecutorInfo;
@@ -192,14 +189,15 @@ public class ClusterUtils {
* @param workerHeartbeat
* @return
*/
- public static Map<ExecutorInfo, APersistentMap> convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat workerHeartbeat) {
- Map<ExecutorInfo, APersistentMap> executorWhb = new HashMap<>();
+ public static Map<ExecutorInfo, ExecutorBeat> convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat workerHeartbeat) {
+ Map<ExecutorInfo, ExecutorBeat> executorWhb = new HashMap<>();
Map<ExecutorInfo, ExecutorStats> executorStatsMap = workerHeartbeat.get_executor_stats();
for (ExecutorInfo executor : executors) {
if (executorStatsMap.containsKey(executor)) {
- APersistentMap executorBeat =
- new PersistentArrayMap(new Object[] { RT.keyword(null, "time-secs"), workerHeartbeat.get_time_secs(), RT.keyword(null, "uptime"),
- workerHeartbeat.get_uptime_secs(), RT.keyword(null, "stats"), workerHeartbeat.get_executor_stats().get(executor) });
+ int time = workerHeartbeat.get_time_secs();
+ int uptime = workerHeartbeat.get_uptime_secs();
+ ExecutorStats executorStats = workerHeartbeat.get_executor_stats().get(executor);
+ ExecutorBeat executorBeat = new ExecutorBeat(time, uptime, executorStats);
executorWhb.put(executor, executorBeat);
}
}
@@ -210,13 +208,13 @@ public class ClusterUtils {
if (stateStorage instanceof IStateStorage) {
return new StormClusterStateImpl((IStateStorage) stateStorage, acls, context, false);
} else {
- IStateStorage Storage = _instance.mkStateStorageImpl((APersistentMap) stateStorage, (APersistentMap) stateStorage, acls, context);
+ IStateStorage Storage = _instance.mkStateStorageImpl((Map) stateStorage, (Map) stateStorage, acls, context);
return new StormClusterStateImpl(Storage, acls, context, true);
}
}
- public IStateStorage mkStateStorageImpl(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+ public IStateStorage mkStateStorageImpl(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
String className = null;
IStateStorage stateStorage = null;
if (config.get(Config.STORM_CLUSTER_STATE_STORE) != null) {
@@ -230,7 +228,7 @@ public class ClusterUtils {
return stateStorage;
}
- public static IStateStorage mkStateStorage(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+ public static IStateStorage mkStateStorage(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
return _instance.mkStateStorageImpl(config, auth_conf, acls, context);
}
@@ -238,26 +236,7 @@ public class ClusterUtils {
return _instance.mkStormClusterStateImpl(StateStorage, acls, context);
}
- // TO be remove
- public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
- HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
- if (map == null) {
- return rtn;
- }
- for (Map.Entry<K, V> entry : map.entrySet()) {
- K key = entry.getKey();
- V val = entry.getValue();
- List<K> list = rtn.get(val);
- if (list == null) {
- list = new ArrayList<K>();
- rtn.put(entry.getValue(), list);
- }
- list.add(key);
- }
- return rtn;
- }
-
- public static String StringifyError(Throwable error) {
+ public static String stringifyError(Throwable error) {
String errorString = null;
StringWriter result = null;
PrintWriter printWriter = null;
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java b/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
new file mode 100644
index 0000000..b32615e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ExecutorBeat.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.cluster;
+
+import org.apache.storm.generated.ExecutorStats;
+
+public class ExecutorBeat {
+ private final int timeSecs;
+ private final int uptime;
+ private final ExecutorStats stats;
+
+ public ExecutorBeat(int timeSecs, int uptime, ExecutorStats stats) {
+ this.timeSecs = timeSecs;
+ this.uptime = uptime;
+ this.stats = stats;
+ }
+
+ public int getTimeSecs() {
+ return timeSecs;
+ }
+
+ public int getUptime() {
+ return uptime;
+ }
+
+ public ExecutorStats getStats() {
+ return stats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
index 1a2b14f..0b6f043 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStateStorage.java
@@ -17,8 +17,8 @@
*/
package org.apache.storm.cluster;
-import clojure.lang.APersistentMap;
import java.util.List;
+import java.util.Map;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.storm.callback.ZKStateChangedCallback;
@@ -42,6 +42,9 @@ import org.apache.zookeeper.data.ACL;
*/
public interface IStateStorage {
+ public static final String DATA = "data";
+ public static final String VERSION = "version";
+
/**
* Registers a callback function that gets called when CuratorEvents happen.
* @param callback is a clojure IFn that accepts the type - translated to
@@ -149,14 +152,14 @@ public interface IStateStorage {
/**
* Get the data at the node along with its version. Data is returned
- * in an APersistentMap with clojure keyword keys :data and :version.
+ * in an Map with the keys data and version.
* @param path The path to look under
* @param watch Whether or not to set a watch on the path. Watched paths
* emit events which are consumed by functions registered with the
* register method. Very useful for catching updates to nodes.
- * @return An APersistentMap in the form {:data data :version version}
+ * @return An Map in the form {:data data :version version}
*/
- APersistentMap get_data_with_version(String path, boolean watch);
+ Map get_data_with_version(String path, boolean watch);
/**
* Write a worker heartbeat at the path.
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 01cf56a..c88935e 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -17,8 +17,6 @@
*/
package org.apache.storm.cluster;
-import clojure.lang.APersistentMap;
-import clojure.lang.IFn;
import org.apache.storm.generated.*;
import org.apache.storm.nimbus.NimbusInfo;
@@ -27,13 +25,13 @@ import java.util.List;
import java.util.Map;
public interface IStormClusterState {
- public List<String> assignments(IFn callback);
+ public List<String> assignments(Runnable callback);
- public Assignment assignmentInfo(String stormId, IFn callback);
+ public Assignment assignmentInfo(String stormId, Runnable callback);
- public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback);
+ public Map assignmentInfoWithVersion(String stormId, Runnable callback);
- public Integer assignmentVersion(String stormId, IFn callback) throws Exception;
+ public Integer assignmentVersion(String stormId, Runnable callback) throws Exception;
public List<String> blobstoreInfo(String blobKey);
@@ -43,7 +41,7 @@ public interface IStormClusterState {
public List<String> activeStorms();
- public StormBase stormBase(String stormId, IFn callback);
+ public StormBase stormBase(String stormId, Runnable callback);
public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port);
@@ -55,9 +53,9 @@ public interface IStormClusterState {
public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest);
- public Map<ExecutorInfo, APersistentMap> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
+ public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort);
- public List<String> supervisors(IFn callback);
+ public List<String> supervisors(Runnable callback);
public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist
@@ -73,7 +71,7 @@ public interface IStormClusterState {
public void setTopologyLogConfig(String stormId, LogConfig logConfig);
- public LogConfig topologyLogConfig(String stormId, IFn cb);
+ public LogConfig topologyLogConfig(String stormId, Runnable cb);
public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info);
@@ -83,7 +81,7 @@ public interface IStormClusterState {
public void workerBackpressure(String stormId, String node, Long port, boolean on);
- public boolean topologyBackpressure(String stormId, IFn callback);
+ public boolean topologyBackpressure(String stormId, Runnable callback);
public void setupBackpressure(String stormId);
@@ -101,7 +99,7 @@ public interface IStormClusterState {
public List<String> activeKeys();
- public List<String> blobstore(IFn callback);
+ public List<String> blobstore(Runnable callback);
public void removeStorm(String stormId);
@@ -117,7 +115,7 @@ public interface IStormClusterState {
public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException;
- public Credentials credentials(String stormId, IFn callback);
+ public Credentials credentials(String stormId, Runnable callback);
public void disconnect();
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
index a9c4d89..c29078e 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
@@ -17,7 +17,6 @@
*/
package org.apache.storm.cluster;
-import clojure.lang.APersistentMap;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.storm.callback.ZKStateChangedCallback;
import org.apache.storm.generated.*;
@@ -28,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Map;
public class PaceMakerStateStorage implements IStateStorage {
@@ -104,7 +104,7 @@ public class PaceMakerStateStorage implements IStateStorage {
}
@Override
- public APersistentMap get_data_with_version(String path, boolean watch) {
+ public Map get_data_with_version(String path, boolean watch) {
return stateStorage.get_data_with_version(path, watch);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
index eafd2e7..3111e04 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorageFactory.java
@@ -17,12 +17,12 @@
*/
package org.apache.storm.cluster;
-import clojure.lang.APersistentMap;
import org.apache.storm.pacemaker.PacemakerClient;
import org.apache.storm.utils.Utils;
import org.apache.zookeeper.data.ACL;
import java.util.List;
+import java.util.Map;
public class PaceMakerStateStorageFactory implements StateStorageFactory {
@@ -38,7 +38,7 @@ public class PaceMakerStateStorageFactory implements StateStorageFactory {
}
@Override
- public IStateStorage mkStore(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) {
+ public IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) {
try {
return new PaceMakerStateStorage(initMakeClient(config), initZKstate(config, auth_conf, acls, context));
} catch (Exception e) {
@@ -46,19 +46,19 @@ public class PaceMakerStateStorageFactory implements StateStorageFactory {
}
}
- public static IStateStorage initZKstate(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+ public static IStateStorage initZKstate(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
return _instance.initZKstateImpl(config, auth_conf, acls, context);
}
- public static PacemakerClient initMakeClient(APersistentMap config) {
+ public static PacemakerClient initMakeClient(Map config) {
return _instance.initMakeClientImpl(config);
}
- public IStateStorage initZKstateImpl(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
+ public IStateStorage initZKstateImpl(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception {
return ClusterUtils.mkStateStorage(config, auth_conf, acls, context);
}
- public PacemakerClient initMakeClientImpl(APersistentMap config) {
+ public PacemakerClient initMakeClientImpl(Map config) {
return new PacemakerClient(config);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
index 110da41..0929750 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
@@ -17,12 +17,12 @@
*/
package org.apache.storm.cluster;
-import clojure.lang.APersistentMap;
import java.util.List;
+import java.util.Map;
+
import org.apache.zookeeper.data.ACL;
public interface StateStorageFactory {
- IStateStorage mkStore(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context);
-
+ IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 17c8641..5fa586a 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -34,8 +34,6 @@ import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,17 +45,17 @@ public class StormClusterStateImpl implements IStormClusterState {
private IStateStorage stateStorage;
- private ConcurrentHashMap<String, IFn> assignmentInfoCallback;
- private ConcurrentHashMap<String, IFn> assignmentInfoWithVersionCallback;
- private ConcurrentHashMap<String, IFn> assignmentVersionCallback;
- private AtomicReference<IFn> supervisorsCallback;
+ private ConcurrentHashMap<String, Runnable> assignmentInfoCallback;
+ private ConcurrentHashMap<String, Runnable> assignmentInfoWithVersionCallback;
+ private ConcurrentHashMap<String, Runnable> assignmentVersionCallback;
+ private AtomicReference<Runnable> supervisorsCallback;
// we want to reigister a topo directory getChildren callback for all workers of this dir
- private ConcurrentHashMap<String, IFn> backPressureCallback;
- private AtomicReference<IFn> assignmentsCallback;
- private ConcurrentHashMap<String, IFn> stormBaseCallback;
- private AtomicReference<IFn> blobstoreCallback;
- private ConcurrentHashMap<String, IFn> credentialsCallback;
- private ConcurrentHashMap<String, IFn> logConfigCallback;
+ private ConcurrentHashMap<String, Runnable> backPressureCallback;
+ private AtomicReference<Runnable> assignmentsCallback;
+ private ConcurrentHashMap<String, Runnable> stormBaseCallback;
+ private AtomicReference<Runnable> blobstoreCallback;
+ private ConcurrentHashMap<String, Runnable> credentialsCallback;
+ private ConcurrentHashMap<String, Runnable> logConfigCallback;
private List<ACL> acls;
private String stateId;
@@ -129,20 +127,20 @@ public class StormClusterStateImpl implements IStormClusterState {
}
- protected void issueCallback(AtomicReference<IFn> cb) {
- IFn callback = cb.getAndSet(null);
+ protected void issueCallback(AtomicReference<Runnable> cb) {
+ Runnable callback = cb.getAndSet(null);
if (callback != null)
- callback.invoke();
+ callback.run();
}
- protected void issueMapCallback(ConcurrentHashMap<String, IFn> callbackConcurrentHashMap, String key) {
- IFn callback = callbackConcurrentHashMap.remove(key);
+ protected void issueMapCallback(ConcurrentHashMap<String, Runnable> callbackConcurrentHashMap, String key) {
+ Runnable callback = callbackConcurrentHashMap.remove(key);
if (callback != null)
- callback.invoke();
+ callback.run();
}
@Override
- public List<String> assignments(IFn callback) {
+ public List<String> assignments(Runnable callback) {
if (callback != null) {
assignmentsCallback.set(callback);
}
@@ -150,7 +148,7 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
- public Assignment assignmentInfo(String stormId, IFn callback) {
+ public Assignment assignmentInfo(String stormId, Runnable callback) {
if (callback != null) {
assignmentInfoCallback.put(stormId, callback);
}
@@ -159,23 +157,25 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
- public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback) {
+ public Map assignmentInfoWithVersion(String stormId, Runnable callback) {
+ Map map = new HashMap();
if (callback != null) {
assignmentInfoWithVersionCallback.put(stormId, callback);
}
Assignment assignment = null;
Integer version = 0;
- APersistentMap aPersistentMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
- if (aPersistentMap != null) {
- assignment = ClusterUtils.maybeDeserialize((byte[]) aPersistentMap.get(RT.keyword(null, "data")), Assignment.class);
- version = (Integer) aPersistentMap.get(RT.keyword(null, "version"));
+ Map dataWithVersionMap = stateStorage.get_data_with_version(ClusterUtils.assignmentPath(stormId), callback != null);
+ if (dataWithVersionMap != null) {
+ assignment = ClusterUtils.maybeDeserialize((byte[]) dataWithVersionMap.get(IStateStorage.DATA), Assignment.class);
+ version = (Integer) dataWithVersionMap.get(IStateStorage.VERSION);
}
- APersistentMap map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), assignment, RT.keyword(null, "version"), version });
+ map.put(IStateStorage.DATA, assignment);
+ map.put(IStateStorage.VERSION, version);
return map;
}
@Override
- public Integer assignmentVersion(String stormId, IFn callback) throws Exception {
+ public Integer assignmentVersion(String stormId, Runnable callback) throws Exception {
if (callback != null) {
assignmentVersionCallback.put(stormId, callback);
}
@@ -227,7 +227,7 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
- public StormBase stormBase(String stormId, IFn callback) {
+ public StormBase stormBase(String stormId, Runnable callback) {
if (callback != null) {
stormBaseCallback.put(stormId, callback);
}
@@ -298,10 +298,10 @@ public class StormClusterStateImpl implements IStormClusterState {
* @return
*/
@Override
- public Map<ExecutorInfo, APersistentMap> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
- Map<ExecutorInfo, APersistentMap> executorWhbs = new HashMap<>();
+ public Map<ExecutorInfo, ExecutorBeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort) {
+ Map<ExecutorInfo, ExecutorBeat> executorWhbs = new HashMap<>();
- Map<NodeInfo, List<List<Long>>> nodePortExecutors = ClusterUtils.reverseMap(executorNodePort);
+ Map<NodeInfo, List<List<Long>>> nodePortExecutors = Utils.reverseMap(executorNodePort);
for (Map.Entry<NodeInfo, List<List<Long>>> entry : nodePortExecutors.entrySet()) {
@@ -319,7 +319,7 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
- public List<String> supervisors(IFn callback) {
+ public List<String> supervisors(Runnable callback) {
if (callback != null) {
supervisorsCallback.set(callback);
}
@@ -342,7 +342,7 @@ public class StormClusterStateImpl implements IStormClusterState {
try {
stateStorage.delete_worker_hb(ClusterUtils.workerbeatStormRoot(stormId));
} catch (Exception e) {
- if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
// do nothing
LOG.warn("Could not teardown heartbeats for {}.", stormId);
} else {
@@ -356,7 +356,7 @@ public class StormClusterStateImpl implements IStormClusterState {
try {
stateStorage.delete_node(ClusterUtils.errorStormRoot(stormId));
} catch (Exception e) {
- if (Zookeeper.exceptionCause(KeeperException.class, e)) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.class, e)) {
// do nothing
LOG.warn("Could not teardown errors for {}.", stormId);
} else {
@@ -381,7 +381,7 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
- public LogConfig topologyLogConfig(String stormId, IFn cb) {
+ public LogConfig topologyLogConfig(String stormId, Runnable cb) {
String path = ClusterUtils.logConfigPath(stormId);
return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class);
}
@@ -437,7 +437,7 @@ public class StormClusterStateImpl implements IStormClusterState {
* @return
*/
@Override
- public boolean topologyBackpressure(String stormId, IFn callback) {
+ public boolean topologyBackpressure(String stormId, Runnable callback) {
if (callback != null) {
backPressureCallback.put(stormId, callback);
}
@@ -568,7 +568,7 @@ public class StormClusterStateImpl implements IStormClusterState {
// blobstore state
@Override
- public List<String> blobstore(IFn callback) {
+ public List<String> blobstore(Runnable callback) {
if (callback != null) {
blobstoreCallback.set(callback);
}
@@ -602,7 +602,7 @@ public class StormClusterStateImpl implements IStormClusterState {
String path = ClusterUtils.errorPath(stormId, componentId);
String lastErrorPath = ClusterUtils.lastErrorPath(stormId, componentId);
- ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.StringifyError(error), Time.currentTimeSecs());
+ ErrorInfo errorInfo = new ErrorInfo(ClusterUtils.stringifyError(error), Time.currentTimeSecs());
errorInfo.set_host(node);
errorInfo.set_port(port.intValue());
byte[] serData = Utils.serialize(errorInfo);
@@ -669,7 +669,7 @@ public class StormClusterStateImpl implements IStormClusterState {
}
@Override
- public Credentials credentials(String stormId, IFn callback) {
+ public Credentials credentials(String stormId, Runnable callback) {
if (callback != null) {
credentialsCallback.put(stormId, callback);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index b277751..56115ce 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -17,7 +17,6 @@
*/
package org.apache.storm.cluster;
-import clojure.lang.APersistentMap;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.*;
import org.apache.curator.framework.state.ConnectionState;
@@ -220,7 +219,7 @@ public class ZKStateStorage implements IStateStorage {
}
@Override
- public APersistentMap get_data_with_version(String path, boolean watch) {
+ public Map get_data_with_version(String path, boolean watch) {
return Zookeeper.getDataWithVersion(zkReader, path, watch);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
index 956c20e..232488b 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorageFactory.java
@@ -17,16 +17,16 @@
*/
package org.apache.storm.cluster;
-import clojure.lang.APersistentMap;
import org.apache.storm.utils.Utils;
import org.apache.zookeeper.data.ACL;
import java.util.List;
+import java.util.Map;
public class ZKStateStorageFactory implements StateStorageFactory {
@Override
- public IStateStorage mkStore(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) {
+ public IStateStorage mkStore(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) {
try {
return new ZKStateStorage(config, auth_conf, acls, context);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/afcd0c6c/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index f80b0a4..e5b2666 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -17,15 +17,11 @@
*/
package org.apache.storm.zookeeper;
-import clojure.lang.APersistentMap;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.RT;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
-import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.recipes.leader.Participant;
@@ -33,6 +29,7 @@ import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.storm.Config;
import org.apache.storm.callback.DefaultWatcherCallBack;
import org.apache.storm.callback.WatcherCallBack;
+import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.utils.Utils;
@@ -47,17 +44,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.Vector;
+
public class Zookeeper {
private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class);
@@ -169,7 +162,7 @@ public class Zookeeper {
zk.delete().deletingChildrenIfNeeded().forPath(normalizePath(path));
}
} catch (Exception e) {
- if (exceptionCause(KeeperException.NodeExistsException.class, e)) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
// do nothing
LOG.info("delete {} failed.", path, e);
} else {
@@ -195,7 +188,7 @@ public class Zookeeper {
try {
createNode(zk, npath, byteArray, org.apache.zookeeper.CreateMode.PERSISTENT, acls);
} catch (Exception e) {
- if (exceptionCause(KeeperException.NodeExistsException.class, e)) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
// this can happen when multiple clients doing mkdir at same time
}
}
@@ -224,7 +217,7 @@ public class Zookeeper {
}
}
} catch (Exception e) {
- if (exceptionCause(KeeperException.NoNodeException.class, e)) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
// this is fine b/c we still have a watch from the successful exists call
} else {
throw Utils.wrapInRuntime(e);
@@ -312,7 +305,7 @@ public class Zookeeper {
}
LOG.info("Starting inprocess zookeeper at port {} and dir {}", report, localdir);
factory.startup(zk);
- return Arrays.asList((Object)new Long(report), (Object)factory);
+ return Arrays.asList((Object) new Long(report), (Object) factory);
}
public static void shutdownInprocessZookeeper(NIOServerCnxnFactory handle) {
@@ -361,9 +354,8 @@ public class Zookeeper {
return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference, leaderLatchListenerAtomicReference);
}
- // To update @return to be a Map
- public static APersistentMap getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
- APersistentMap map = null;
+ public static Map getDataWithVersion(CuratorFramework zk, String path, boolean watch) {
+ Map map = new HashMap();
try {
byte[] bytes = null;
Stat stats = new Stat();
@@ -376,11 +368,12 @@ public class Zookeeper {
}
if (bytes != null) {
int version = stats.getVersion();
- map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), bytes, RT.keyword(null, "version"), version });
+ map.put(IStateStorage.DATA, bytes);
+ map.put(IStateStorage.VERSION, version);
}
}
} catch (Exception e) {
- if (exceptionCause(KeeperException.NoNodeException.class, e)) {
+ if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
// this is fine b/c we still have a watch from the successful exists call
} else {
Utils.wrapInRuntime(e);
@@ -423,19 +416,4 @@ public class Zookeeper {
String rtn = toksToPath(tokenizePath(path));
return rtn;
}
-
- // To remove exceptionCause if port Utils.try-cause to java
- public static boolean exceptionCause(Class klass, Throwable t) {
- boolean ret = false;
- Throwable throwable = t;
- while (throwable != null) {
- if (throwable.getClass() == klass) {
- ret = true;
- break;
- }
- throwable = throwable.getCause();
- }
- return ret;
- }
-
}