You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:59 UTC

[21/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java
index 5ad70cb..c2e07ee 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java
@@ -29,10 +29,8 @@ import com.alibaba.jstorm.daemon.nimbus.StatusType;
  * 
  * Dedicate Topology status
  * 
- * Topology status: active/inactive/killed/rebalancing killTimeSecs: when status
- * isn't killed, it is -1 and useless. when status is killed, do kill operation
- * after killTimeSecs seconds when status is rebalancing, do rebalancing opation
- * after delaySecs seconds restore oldStatus as current status
+ * Topology status: active/inactive/killed/rebalancing killTimeSecs: when status isn't killed, it is -1 and useless. when status is killed, do kill operation
+ * after killTimeSecs seconds when status is rebalancing, do rebalancing opation after delaySecs seconds restore oldStatus as current status
  */
 public class StormStatus implements Serializable {
 
@@ -99,9 +97,7 @@ public class StormStatus implements Serializable {
         }
 
         StormStatus check = (StormStatus) base;
-        if (check.getStatusType().equals(getStatusType())
-                && check.getKillTimeSecs() == getKillTimeSecs()
-                && check.getDelaySecs().equals(getDelaySecs())) {
+        if (check.getStatusType().equals(getStatusType()) && check.getKillTimeSecs() == getKillTimeSecs() && check.getDelaySecs().equals(getDelaySecs())) {
             return true;
         }
         return false;
@@ -109,15 +105,12 @@ public class StormStatus implements Serializable {
 
     @Override
     public int hashCode() {
-        return this.getStatusType().hashCode()
-                + this.getKillTimeSecs().hashCode()
-                + this.getDelaySecs().hashCode();
+        return this.getStatusType().hashCode() + this.getKillTimeSecs().hashCode() + this.getDelaySecs().hashCode();
     }
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
index bd60d45..1550c7e 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
@@ -17,18 +17,8 @@
  */
 package com.alibaba.jstorm.cluster;
 
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import backtype.storm.generated.TopologyTaskHbInfo;
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.cache.JStormCache;
 import com.alibaba.jstorm.callback.ClusterStateCallback;
 import com.alibaba.jstorm.callback.RunnableCallback;
@@ -38,14 +28,22 @@ import com.alibaba.jstorm.schedule.Assignment;
 import com.alibaba.jstorm.schedule.AssignmentBak;
 import com.alibaba.jstorm.task.TaskInfo;
 import com.alibaba.jstorm.task.error.TaskError;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
+import com.alibaba.jstorm.task.backpressure.SourceBackpressureInfo;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.PathUtils;
 import com.alibaba.jstorm.utils.TimeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class StormZkClusterState implements StormClusterState {
-    private static Logger LOG = LoggerFactory
-            .getLogger(StormZkClusterState.class);
+    private static Logger LOG = LoggerFactory.getLogger(StormZkClusterState.class);
 
     private ClusterState cluster_state;
 
@@ -67,12 +65,10 @@ public class StormZkClusterState implements StormClusterState {
         } else {
 
             solo = true;
-            cluster_state =
-                    new DistributedClusterState((Map) cluster_state_spec);
+            cluster_state = new DistributedClusterState((Map) cluster_state_spec);
         }
 
-        assignment_info_callback =
-                new ConcurrentHashMap<String, RunnableCallback>();
+        assignment_info_callback = new ConcurrentHashMap<String, RunnableCallback>();
         supervisors_callback = new AtomicReference<RunnableCallback>(null);
         assignments_callback = new AtomicReference<RunnableCallback>(null);
         storm_base_callback = new ConcurrentHashMap<String, RunnableCallback>();
@@ -85,8 +81,7 @@ public class StormZkClusterState implements StormClusterState {
                     LOG.warn("Input args is null");
                     return null;
                 } else if (args.length < 2) {
-                    LOG.warn("Input args is invalid, args length:"
-                            + args.length);
+                    LOG.warn("Input args is invalid, args length:" + args.length);
                     return null;
                 }
 
@@ -132,11 +127,8 @@ public class StormZkClusterState implements StormClusterState {
         });
 
         String[] pathlist =
-                JStormUtils.mk_arr(Cluster.SUPERVISORS_SUBTREE,
-                        Cluster.STORMS_SUBTREE, Cluster.ASSIGNMENTS_SUBTREE,
-                        Cluster.ASSIGNMENTS_BAK_SUBTREE, Cluster.TASKS_SUBTREE,
-                        Cluster.TASKBEATS_SUBTREE, Cluster.TASKERRORS_SUBTREE,
-                        Cluster.METRIC_SUBTREE);
+                JStormUtils.mk_arr(Cluster.SUPERVISORS_SUBTREE, Cluster.STORMS_SUBTREE, Cluster.ASSIGNMENTS_SUBTREE, Cluster.ASSIGNMENTS_BAK_SUBTREE,
+                        Cluster.TASKS_SUBTREE, Cluster.TASKBEATS_SUBTREE, Cluster.TASKERRORS_SUBTREE, Cluster.METRIC_SUBTREE, Cluster.BACKPRESSURE_SUBTREE);
         for (String path : pathlist) {
             cluster_state.mkdirs(path);
         }
@@ -146,8 +138,7 @@ public class StormZkClusterState implements StormClusterState {
     /**
      * @@@ TODO
      * 
-     *     Just add cache in lower ZK level In fact, for some Object
-     *     Assignment/TaskInfo/StormBase These object can be cache for long time
+     *     Just add cache in lower ZK level In fact, for some Object Assignment/TaskInfo/StormBase These object can be cache for long time
      * 
      * @param simpleCache
      */
@@ -221,10 +212,10 @@ public class StormZkClusterState implements StormClusterState {
             deleteObject(Cluster.storm_task_root(topologyId));
             teardown_heartbeats(topologyId);
             teardown_task_errors(topologyId);
+			teardown_backpressure(topologyId);
             deleteObject(Cluster.metric_path(topologyId));
         } catch (Exception e) {
-            LOG.warn("Failed to delete task root and monitor root for" 
-                    + topologyId);
+            LOG.warn("Failed to delete task root and monitor root for" + topologyId);
         }
         remove_storm_base(topologyId);
     }
@@ -240,8 +231,7 @@ public class StormZkClusterState implements StormClusterState {
     }
 
     @Override
-    public Assignment assignment_info(String topologyId,
-            RunnableCallback callback) throws Exception {
+    public Assignment assignment_info(String topologyId, RunnableCallback callback) throws Exception {
         if (callback != null) {
             assignment_info_callback.put(topologyId, callback);
         }
@@ -257,13 +247,11 @@ public class StormZkClusterState implements StormClusterState {
         if (callback != null) {
             assignments_callback.set(callback);
         }
-        return cluster_state.get_children(Cluster.ASSIGNMENTS_SUBTREE,
-                callback != null);
+        return cluster_state.get_children(Cluster.ASSIGNMENTS_SUBTREE, callback != null);
     }
 
     @Override
-    public void set_assignment(String topologyId, Assignment info)
-            throws Exception {
+    public void set_assignment(String topologyId, Assignment info) throws Exception {
         setObject(Cluster.assignment_path(topologyId), info);
     }
 
@@ -276,26 +264,22 @@ public class StormZkClusterState implements StormClusterState {
     }
 
     @Override
-    public void backup_assignment(String topologyName, AssignmentBak info)
-            throws Exception {
+    public void backup_assignment(String topologyName, AssignmentBak info) throws Exception {
         setObject(Cluster.assignment_bak_path(topologyName), info);
     }
 
     @Override
-    public StormBase storm_base(String topologyId, RunnableCallback callback)
-            throws Exception {
+    public StormBase storm_base(String topologyId, RunnableCallback callback) throws Exception {
         if (callback != null) {
             storm_base_callback.put(topologyId, callback);
         }
 
-        return (StormBase) getObject(Cluster.storm_path(topologyId),
-                callback != null);
+        return (StormBase) getObject(Cluster.storm_path(topologyId), callback != null);
 
     }
 
     @Override
-    public void activate_storm(String topologyId, StormBase stormBase)
-            throws Exception {
+    public void activate_storm(String topologyId, StormBase stormBase) throws Exception {
         String stormPath = Cluster.storm_path(topologyId);
 
         setObject(stormPath, stormBase);
@@ -307,8 +291,7 @@ public class StormZkClusterState implements StormClusterState {
     }
 
     @Override
-    public void update_storm(String topologyId, StormStatus newElems)
-            throws Exception {
+    public void update_storm(String topologyId, StormStatus newElems) throws Exception {
         /**
          * FIXME, maybe overwrite old callback
          */
@@ -323,8 +306,7 @@ public class StormZkClusterState implements StormClusterState {
     }
 
     @Override
-    public void set_storm_monitor(String topologyId, boolean isEnable)
-            throws Exception {
+    public void set_storm_monitor(String topologyId, boolean isEnable) throws Exception {
         // TODO Auto-generated method stub
         StormBase base = this.storm_base(topologyId, null);
 
@@ -340,30 +322,20 @@ public class StormZkClusterState implements StormClusterState {
     }
 
     @Override
-    public void setup_heartbeats(String topologyId) throws Exception {
-        String taskbeatPath = Cluster.taskbeat_storm_root(topologyId);
-
-        cluster_state.mkdirs(taskbeatPath);
-    }
-
-    @Override
-    public List<String> heartbeat_storms() throws Exception {
-        return cluster_state.get_children(Cluster.TASKBEATS_SUBTREE, false);
+    public void topology_heartbeat(String topologyId, TopologyTaskHbInfo info) throws Exception {
+        String taskPath = Cluster.taskbeat_storm_root(topologyId);
+        setObject(taskPath, info);
     }
 
     @Override
-    public List<String> heartbeat_tasks(String topologyId) throws Exception {
-        String taskbeatPath = Cluster.taskbeat_storm_root(topologyId);
-
-        return cluster_state.get_children(taskbeatPath, false);
+    public TopologyTaskHbInfo topology_heartbeat(String topologyId) throws Exception {
+        String taskPath = Cluster.taskbeat_storm_root(topologyId);
+        return (TopologyTaskHbInfo) getObject(taskPath, false);
     }
 
     @Override
-    public void remove_task_heartbeat(String topologyId, int taskId)
-            throws Exception {
-        String taskbeatPath = Cluster.taskbeat_path(topologyId, taskId);
-
-        deleteObject(taskbeatPath);
+    public List<String> heartbeat_storms() throws Exception {
+        return cluster_state.get_children(Cluster.TASKBEATS_SUBTREE, false);
     }
 
     @Override
@@ -379,14 +351,11 @@ public class StormZkClusterState implements StormClusterState {
     }
 
     @Override
-    public void report_task_error(String topologyId, int taskId, Throwable error)
-            throws Exception {
-        report_task_error(topologyId, taskId,
-                new String(JStormUtils.getErrorInfo(error)));
+    public void report_task_error(String topologyId, int taskId, Throwable error) throws Exception {
+        report_task_error(topologyId, taskId, new String(JStormUtils.getErrorInfo(error)), null);
     }
 
-    public void report_task_error(String topologyId, int taskId, String error)
-            throws Exception {
+    public void report_task_error(String topologyId, int taskId, String error, String tag) throws Exception {
         boolean found = false;
         String path = Cluster.taskerror_path(topologyId, taskId);
         cluster_state.mkdirs(path);
@@ -403,9 +372,10 @@ public class StormZkClusterState implements StormClusterState {
                 deleteObject(errorPath);
                 continue;
             }
-            if (errorInfo.equals(error)) {
-                deleteObject(errorPath);
-                setObject(timestampPath, error);
+            if (errorInfo.equals(error)
+                    || (tag != null && errorInfo.startsWith(tag))) {
+                cluster_state.delete_node(errorPath);
+                cluster_state.set_data(timestampPath, error.getBytes());
                 found = true;
                 break;
             }
@@ -429,8 +399,7 @@ public class StormZkClusterState implements StormClusterState {
     private static final String TASK_IS_DEAD = "is dead on"; // Full string is
                                                              // "task-id is dead on hostname:port"
 
-    private void setLastErrInfo(String topologyId, String error,
-            String timeStamp) throws Exception {
+    private void setLastErrInfo(String topologyId, String error, String timeStamp) throws Exception {
         // Set error information in task error topology patch
         // Last Error information format in ZK: map<report_duration, timestamp>
         // report_duration means only the errors will presented in web ui if the
@@ -440,13 +409,10 @@ public class StormZkClusterState implements StormClusterState {
         String lastErrTopoPath = Cluster.lasterror_path(topologyId);
         Map<Integer, String> lastErrInfo = null;
         try {
-            lastErrInfo =
-                    (Map<Integer, String>) getObject(lastErrTopoPath, false);
+            lastErrInfo = (Map<Integer, String>) getObject(lastErrTopoPath, false);
 
         } catch (Exception e) {
-            LOG.error(
-                    "Failed to get last error time. Remove the corrupt node for "
-                            + topologyId, e);
+            LOG.error("Failed to get last error time. Remove the corrupt node for " + topologyId, e);
             remove_lastErr_time(topologyId);
             lastErrInfo = null;
         }
@@ -466,15 +432,13 @@ public class StormZkClusterState implements StormClusterState {
     }
 
     @Override
-    public void remove_task_error(String topologyId, int taskId)
-            throws Exception {
+    public void remove_task_error(String topologyId, int taskId) throws Exception {
         String path = Cluster.taskerror_path(topologyId, taskId);
         cluster_state.delete_node(path);
     }
 
     @Override
-    public Map<Integer, String> topo_lastErr_time(String topologyId)
-            throws Exception {
+    public Map<Integer, String> topo_lastErr_time(String topologyId) throws Exception {
         String path = Cluster.lasterror_path(topologyId);
 
         return (Map<Integer, String>) getObject(path, false);
@@ -490,17 +454,18 @@ public class StormZkClusterState implements StormClusterState {
     public List<String> task_error_storms() throws Exception {
         return cluster_state.get_children(Cluster.TASKERRORS_SUBTREE, false);
     }
-    
+
     @Override
     public List<String> task_error_ids(String topologyId) throws Exception {
-    	return cluster_state.get_children(Cluster.taskerror_storm_root(topologyId), false);
+        return cluster_state.get_children(Cluster.taskerror_storm_root(topologyId), false);
     }
 
     @Override
-    public List<String> task_error_time(String topologyId, int taskId)
-            throws Exception {
+    public List<String> task_error_time(String topologyId, int taskId) throws Exception {
         String path = Cluster.taskerror_path(topologyId, taskId);
-        cluster_state.mkdirs(path);
+        if (cluster_state.node_existed(path, false) == false) {
+        	return new ArrayList<String>();
+        }
         return cluster_state.get_children(path, false);
     }
 
@@ -509,38 +474,37 @@ public class StormZkClusterState implements StormClusterState {
         String tasksPath = Cluster.storm_task_root(topologyId);
         Object data = getObject(tasksPath, false);
         if (data != null) {
-            Map<Integer, TaskInfo> taskInfoMap = ((Map<Integer, TaskInfo>)data);
-            for (Integer taskId : taskIds){
+            Map<Integer, TaskInfo> taskInfoMap = ((Map<Integer, TaskInfo>) data);
+            for (Integer taskId : taskIds) {
                 taskInfoMap.remove(taskId);
             }
-            //update zk node of tasks
+            // update zk node of tasks
             setObject(tasksPath, taskInfoMap);
         }
     }
 
     @Override
-    public String task_error_info(String topologyId, int taskId, long timeStamp)
-            throws Exception {
+    public String task_error_info(String topologyId, int taskId, long timeStamp) throws Exception {
         String path = Cluster.taskerror_path(topologyId, taskId);
-        cluster_state.mkdirs(path);
         path = path + "/" + timeStamp;
         return getString(path, false);
     }
 
     @Override
-    public List<TaskError> task_errors(String topologyId, int taskId)
-            throws Exception {
-        String path = Cluster.taskerror_path(topologyId, taskId);
-        cluster_state.mkdirs(path);
+    public List<TaskError> task_errors(String topologyId, int taskId) throws Exception {
+    	List<TaskError> errors = new ArrayList<TaskError>();
+    	String path = Cluster.taskerror_path(topologyId, taskId);
+    	if (cluster_state.node_existed(path, false) == false) {
+        	return errors;
+        }
 
         List<String> children = cluster_state.get_children(path, false);
-        List<TaskError> errors = new ArrayList<TaskError>();
+        
 
         for (String str : children) {
             byte[] v = cluster_state.get_data(path + "/" + str, false);
             if (v != null) {
-                TaskError error =
-                        new TaskError(new String(v), Integer.parseInt(str));
+                TaskError error = new TaskError(new String(v), Integer.parseInt(str));
                 errors.add(error);
             }
         }
@@ -572,45 +536,28 @@ public class StormZkClusterState implements StormClusterState {
             LOG.error("Could not teardown errors for " + topologyId, e);
         }
     }
+
     @Override
-    public void set_task(String topologyId, Map<Integer, TaskInfo>  taskInfoMap)
-            throws Exception {
+    public void set_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception {
         String stormTaskPath = Cluster.storm_task_root(topologyId);
-        if (taskInfoMap != null){
-            //reupdate zk node of tasks
+        if (taskInfoMap != null) {
+            // reupdate zk node of tasks
             setObject(stormTaskPath, taskInfoMap);
         }
     }
+
     @Override
-    public void add_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap)
-            throws Exception {
+    public void add_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception {
         String stormTaskPath = Cluster.storm_task_root(topologyId);
         Object data = getObject(stormTaskPath, false);
-        if (data != null){
-            ((Map<Integer, TaskInfo>)data).putAll(taskInfoMap);
-            //reupdate zk node of tasks
+        if (data != null) {
+            ((Map<Integer, TaskInfo>) data).putAll(taskInfoMap);
+            // reupdate zk node of tasks
             setObject(stormTaskPath, data);
         }
     }
 
     @Override
-    public TaskHeartbeat task_heartbeat(String topologyId, int taskId)
-            throws Exception {
-        String taskbeatPath = Cluster.taskbeat_path(topologyId, taskId);
-
-        return (TaskHeartbeat) getObjectSync(taskbeatPath, false);
-
-    }
-
-    @Override
-    public void task_heartbeat(String topologyId, int taskId, TaskHeartbeat info)
-            throws Exception {
-        String taskPath = Cluster.taskbeat_path(topologyId, taskId);
-
-        setObject(taskPath, info);
-    }
-
-    @Override
     public List<String> task_storms() throws Exception {
         return cluster_state.get_children(Cluster.TASKS_SUBTREE, false);
     }
@@ -623,23 +570,22 @@ public class StormZkClusterState implements StormClusterState {
         if (data == null) {
             return null;
         }
-        return ((Map<Integer, TaskInfo>)data).keySet();
+        return ((Map<Integer, TaskInfo>) data).keySet();
     }
 
     @Override
-    public Set<Integer> task_ids_by_componentId(String topologyId,
-            String componentId) throws Exception {
+    public Set<Integer> task_ids_by_componentId(String topologyId, String componentId) throws Exception {
         String stormTaskPath = Cluster.storm_task_root(topologyId);
         Object data = getObject(stormTaskPath, false);
         if (data == null) {
             return null;
         }
-        Map<Integer, TaskInfo> taskInfoMap = (Map<Integer, TaskInfo>)data;
+        Map<Integer, TaskInfo> taskInfoMap = (Map<Integer, TaskInfo>) data;
         Set<Integer> rtn = new HashSet<Integer>();
         Set<Integer> taskIds = taskInfoMap.keySet();
-        for(Integer taskId : taskIds){
+        for (Integer taskId : taskIds) {
             TaskInfo taskInfo = taskInfoMap.get(taskId);
-            if (taskInfo != null){
+            if (taskInfo != null) {
                 if (taskInfo.getComponentId().equalsIgnoreCase(componentId))
                     rtn.add(taskId);
             }
@@ -672,13 +618,11 @@ public class StormZkClusterState implements StormClusterState {
         if (callback != null) {
             supervisors_callback.set(callback);
         }
-        return cluster_state.get_children(Cluster.SUPERVISORS_SUBTREE,
-                callback != null);
+        return cluster_state.get_children(Cluster.SUPERVISORS_SUBTREE, callback != null);
     }
 
     @Override
-    public void supervisor_heartbeat(String supervisorId, SupervisorInfo info)
-            throws Exception {
+    public void supervisor_heartbeat(String supervisorId, SupervisorInfo info) throws Exception {
 
         String supervisorPath = Cluster.supervisor_path(supervisorId);
 
@@ -703,15 +647,13 @@ public class StormZkClusterState implements StormClusterState {
     }
 
     public String get_nimbus_slave_time(String host) throws Exception {
-        String path =
-                Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR + host;
-        return (String) getObject(path, false);
+        String path = Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR + host;
+        return getString(path, false);
     }
 
     @Override
     public void update_nimbus_slave(String host, int time) throws Exception {
-        setTempObject(Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR
-                + host, String.valueOf(time));
+        setTempObject(Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR + host, String.valueOf(time));
     }
 
     @Override
@@ -720,8 +662,24 @@ public class StormZkClusterState implements StormClusterState {
     }
 
     @Override
-    public boolean try_to_be_leader(String path, String host,
-            RunnableCallback callback) throws Exception {
+    public void update_nimbus_detail(String hostPort, Map map) throws Exception {
+        // TODO Auto-generated method stub
+        cluster_state.set_ephemeral_node(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + Cluster.ZK_SEPERATOR + hostPort, Utils.serialize(map));
+    }
+
+    @Override
+    public Map get_nimbus_detail(String hostPort, boolean watch) throws Exception {
+        byte[] data = cluster_state.get_data(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + Cluster.ZK_SEPERATOR + hostPort, watch);
+        return (Map) Utils.maybe_deserialize(data);
+    }
+    @Override
+    public void unregister_nimbus_detail(String hostPort) throws Exception {
+        cluster_state.delete_node(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + Cluster.ZK_SEPERATOR + hostPort);
+    }
+
+
+    @Override
+    public boolean try_to_be_leader(String path, String host, RunnableCallback callback) throws Exception {
         // TODO Auto-generated method stub
         if (callback != null)
             this.master_callback.set(callback);
@@ -736,24 +694,53 @@ public class StormZkClusterState implements StormClusterState {
     }
 
     @Override
-    public void set_topology_metric(String topologyId, Object metric)
-            throws Exception {
-        // TODO Auto-generated method stub
+    public void set_topology_metric(String topologyId, Object metric) throws Exception {
         String path = Cluster.metric_path(topologyId);
-
         setObject(path, metric);
     }
 
     @Override
     public Object get_topology_metric(String topologyId) throws Exception {
-        // TODO Auto-generated method stub
         return getObject(Cluster.metric_path(topologyId), false);
     }
 
-	@Override
-	public List<String> get_metrics() throws Exception {
-		// TODO Auto-generated method stub
-		return cluster_state.get_children(Cluster.METRIC_SUBTREE, false);
-	}
+    @Override
+    public List<String> get_metrics() throws Exception {
+        return cluster_state.get_children(Cluster.METRIC_SUBTREE, false);
+    }
+    @Override
+    public List<String> list_dirs(String path, boolean watch) throws  Exception {
+        List<String> subDirs = null;
+        subDirs = cluster_state.get_children(path, watch);
+        return subDirs;
+    }
 
+    @Override
+    public List<String> backpressureInfos() throws Exception {
+        return cluster_state.get_children(Cluster.BACKPRESSURE_SUBTREE, false);
+    }
+
+    @Override
+    public void set_backpressure_info(String topologyId, Map<String, SourceBackpressureInfo> sourceToBackpressureInfo) throws Exception {
+        String path = Cluster.backpressure_path(topologyId);
+        cluster_state.set_data(path, Utils.serialize(sourceToBackpressureInfo));
+    }
+
+    @Override
+    public Map<String, SourceBackpressureInfo> get_backpressure_info(String topologyId) throws Exception {
+        String path = Cluster.backpressure_path(topologyId);
+        byte[] data = cluster_state.get_data(path, false);
+        return (Map<String, SourceBackpressureInfo>) Utils.maybe_deserialize(data);
+    }
+
+    @Override
+    public void teardown_backpressure(String topologyId) {
+        try {
+            String backpressurePath = Cluster.backpressure_path(topologyId);
+
+            cluster_state.delete_node(backpressurePath);
+        } catch (Exception e) {
+            LOG.warn("Could not teardown backpressure info for " + topologyId, e);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java
new file mode 100644
index 0000000..120bbfb
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.common.metric.snapshot.AsmCounterSnapshot;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.codahale.metrics.Counter;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * counter wrapper. note that counter is a little special, every snapshot we only return the delta value instead of
+ * total value, which prevents data loss if certain tasks are killed.
+ */
+public class AsmCounter extends AsmMetric<Counter> {
+
+    private final Map<Integer, Counter> counterMap = new ConcurrentHashMap<>();
+    private Counter unFlushed = new Counter();
+
+    public AsmCounter() {
+        super();
+        for (int win : windowSeconds) {
+            counterMap.put(win, new Counter());
+        }
+    }
+
+    public void inc() {
+        update(1);
+    }
+
+    @Override
+    public void update(Number val) {
+        this.unFlushed.inc(val.longValue());
+    }
+
+    /**
+     * flush temp counter data to all windows & assoc metrics.
+     */
+    protected void doFlush() {
+        long v = unFlushed.getCount();
+        for (Counter counter : counterMap.values()) {
+            counter.inc(v);
+        }
+        for (AsmMetric assocMetric : assocMetrics) {
+            assocMetric.updateDirectly(v);
+        }
+
+        this.unFlushed.dec(v);
+    }
+
+    @Override
+    public Map<Integer, Counter> getWindowMetricMap() {
+        return counterMap;
+    }
+
+    @Override
+    public Counter mkInstance() {
+        return new Counter();
+    }
+
+    @Override
+    protected void updateSnapshot(int window) {
+        Counter counter = counterMap.get(window);
+        if (counter != null) {
+            AsmSnapshot snapshot = new AsmCounterSnapshot().setValue(counter.getCount())
+                    .setTs(System.currentTimeMillis()).setMetricId(metricId);
+            snapshots.put(window, snapshot);
+        }
+    }
+
+    @Override
+    public AsmMetric clone() {
+        return new AsmCounter();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java
new file mode 100644
index 0000000..4bc255a
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.common.metric.snapshot.AsmGaugeSnapshot;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.codahale.metrics.Gauge;
+
+import java.util.Map;
+
+/**
+ * gauges cannot be aggregated.
+ */
+public class AsmGauge extends AsmMetric<Gauge> {
+
+    private Gauge gauge;
+
+    public AsmGauge(Gauge<Double> gauge) {
+        this.aggregate = false;
+        this.gauge = gauge;
+    }
+
+    @Override
+    public void update(Number obj) {
+        // nothing to do for gauges.
+    }
+
+    @Override
+    public AsmMetric clone() {
+        AsmMetric metric = new AsmGauge(this.gauge);
+        metric.setMetricName(this.getMetricName());
+        return metric;
+    }
+
+    @Override
+    public Map<Integer, Gauge> getWindowMetricMap() {
+        return null;
+    }
+
+    @Override
+    public Gauge mkInstance() {
+        return null;
+    }
+
+    @Override
+    protected void doFlush() {
+        // nothing to do for gauges.
+    }
+
+    @Override
+    protected void updateSnapshot(int window) {
+        double v = (Double) gauge.getValue();
+        AsmSnapshot snapshot =  new AsmGaugeSnapshot().setValue(v)
+                .setTs(System.currentTimeMillis()).setMetricId(metricId);
+        snapshots.put(window, snapshot);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java
new file mode 100644
index 0000000..43c8dbc
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.common.metric.snapshot.AsmHistogramSnapshot;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * each window has a separate histogram, which is recreated after the window cycle.
+ */
+public class AsmHistogram extends AsmMetric<Histogram> {
+
+    private final Map<Integer, Histogram> histogramMap = new ConcurrentHashMap<Integer, Histogram>();
+    private Histogram unFlushed = newHistogram();
+
+    public AsmHistogram() {
+        super();
+        for (int win : windowSeconds) {
+            histogramMap.put(win, newHistogram());
+        }
+    }
+
+    @Override
+    public void update(Number obj) {
+        if (sample()) {
+            this.unFlushed.update(obj.longValue());
+        }
+    }
+
+    @Override
+    public void updateDirectly(Number obj) {
+        this.unFlushed.update(obj.longValue());
+    }
+
+    @Override
+    public Map<Integer, Histogram> getWindowMetricMap() {
+        return histogramMap;
+    }
+
+    @Override
+    public Histogram mkInstance() {
+        return newHistogram();
+    }
+
+    @Override
+    protected void updateSnapshot(int window) {
+        Histogram histogram = histogramMap.get(window);
+        if (histogram != null) {
+            AsmSnapshot snapshot = new AsmHistogramSnapshot().setSnapshot(histogram.getSnapshot())
+                    .setTs(System.currentTimeMillis()).setMetricId(metricId);
+            snapshots.put(window, snapshot);
+        }
+    }
+
+    /**
+     * flush temp histogram data to all windows & assoc metrics.
+     */
+    protected void doFlush() {
+        long[] values = unFlushed.getSnapshot().getValues();
+        for (Histogram histogram : histogramMap.values()) {
+            for (long val : values) {
+                histogram.update(val);
+            }
+        }
+        for (long val : values) {
+            for (AsmMetric metric : this.assocMetrics) {
+                metric.updateDirectly(val);
+            }
+        }
+        this.unFlushed = newHistogram();
+    }
+
+    @Override
+    public AsmMetric clone() {
+        return new AsmHistogram();
+    }
+
+    private Histogram newHistogram() {
+        return new Histogram(new ExponentiallyDecayingReservoir());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java
new file mode 100644
index 0000000..8959800
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.common.metric.snapshot.AsmMeterSnapshot;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.codahale.metrics.Meter;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * one meter & one snapshot for all windows. since meter is window-sliding, there's no need to recreate new ones.
+ */
+public class AsmMeter extends AsmMetric<Meter> {
+    private final Meter meter = new Meter();
+
+    public void mark() {
+        meter.mark(1l);
+    }
+
+    @Override
+    public void update(Number obj) {
+        meter.mark(obj.longValue());
+        for (AsmMetric metric : this.assocMetrics) {
+            metric.update(obj);
+        }
+    }
+
+
+    @Override
+    public AsmMetric clone() {
+        return new AsmMeter();
+    }
+
+    @Override
+    public Map<Integer, Meter> getWindowMetricMap() {
+        return null;
+    }
+
+    @Override
+    protected void doFlush() {
+        // nothing to do for meters.
+    }
+
+    @Override
+    protected void updateSnapshot(int window) {
+        AsmMeterSnapshot meterSnapshot = new AsmMeterSnapshot();
+        meterSnapshot.setM1(meter.getOneMinuteRate()).setM5(meter.getFiveMinuteRate()).setM15(meter.getFifteenMinuteRate()).setMean(meter.getMeanRate())
+                .setTs(System.currentTimeMillis()).setMetricId(metricId);
+        snapshots.put(window, meterSnapshot);
+    }
+
+    @Override
+    public Meter mkInstance() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java
new file mode 100644
index 0000000..d399e12
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.alibaba.jstorm.metric.AsmWindow;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.utils.TimeUtils;
+import com.codahale.metrics.Metric;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class AsmMetric<T extends Metric> {
+    protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final Joiner JOINER = Joiner.on(".");
+
+    protected static final List<Integer> windowSeconds = Lists
+            .newArrayList(AsmWindow.M1_WINDOW, AsmWindow.M10_WINDOW, AsmWindow.H2_WINDOW, AsmWindow.D1_WINDOW);
+    protected static final List<Integer> nettyWindows = Lists.newArrayList(AsmWindow.M1_WINDOW);
+
+    protected static int minWindow = AsmWindow.M1_WINDOW;
+    protected static final List<Integer> EMPTY_WIN = Lists.newArrayListWithCapacity(0);
+    /**
+     * sample rate for meter, histogram and timer, note that counter & gauge are not sampled.
+     */
+    private static double sampleRate = ConfigExtension.DEFAULT_METRIC_SAMPLE_RATE;
+
+    protected int op = MetricOp.REPORT;
+    protected volatile long metricId = 0L;
+    protected String metricName;
+    protected boolean aggregate = true;
+    protected volatile long lastFlushTime = TimeUtils.current_time_secs() - AsmWindow.M1_WINDOW;
+    protected Map<Integer, Long> rollingTimeMap = new ConcurrentHashMap<>();
+    protected Map<Integer, Boolean> rollingDirtyMap = new ConcurrentHashMap<>();
+
+    protected final Map<Integer, AsmSnapshot> snapshots = new ConcurrentHashMap<Integer, AsmSnapshot>();
+
+    protected Set<AsmMetric> assocMetrics = new HashSet<AsmMetric>();
+
+    public AsmMetric() {
+        for (Integer win : windowSeconds) {
+            rollingTimeMap.put(win, lastFlushTime);
+            rollingDirtyMap.put(win, false);
+        }
+    }
+
+    /**
+     * keep a random for each instance to avoid competition (although it's thread-safe).
+     */
+    private final Random rand = new Random();
+
+    protected boolean sample() {
+        return rand.nextDouble() <= sampleRate;
+    }
+
+    public static void setSampleRate(double sampleRate) {
+        AsmMetric.sampleRate = sampleRate;
+    }
+
+    /**
+     * In order to improve performance
+     */
+    public abstract void update(Number obj);
+
+
+    public void updateDirectly(Number obj) {
+        update(obj);
+    }
+
+    public abstract AsmMetric clone();
+
+    public AsmMetric setOp(int op) {
+        this.op = op;
+        return this;
+    }
+
+    public int getOp() {
+        return this.op;
+    }
+
+    /**
+     * for test
+     */
+    public static void setWindowSeconds(List<Integer> windows) {
+        synchronized (windowSeconds) {
+            windowSeconds.clear();
+            windowSeconds.addAll(windows);
+
+            minWindow = getMinWindow(windows);
+        }
+    }
+
+    public static int getMinWindow(List<Integer> windows) {
+        int min = Integer.MAX_VALUE;
+        for (int win : windows) {
+            if (win < min) {
+                min = win;
+            }
+        }
+        return min;
+    }
+
+    public void addAssocMetrics(AsmMetric... metrics) {
+        Collections.addAll(assocMetrics, metrics);
+    }
+
+    public long getMetricId() {
+        return metricId;
+    }
+
+    public void setMetricId(long metricId) {
+        this.metricId = metricId;
+    }
+
+    public String getMetricName() {
+        return metricName;
+    }
+
+    public void setMetricName(String metricName) {
+        this.metricName = metricName;
+    }
+
+    public void flush() {
+        long time = TimeUtils.current_time_secs();
+        List<Integer> windows = getValidWindows();
+        if (windows.size() == 0) {
+            return;
+        }
+
+        doFlush();
+
+        List<Integer> rollwindows = rollWindows(time, windows);
+
+        for (int win : windows) {
+            if (rollwindows.contains(win)) {
+                updateSnapshot(win);
+
+                Map<Integer, T> metricMap = getWindowMetricMap();
+                if (metricMap != null) {
+                    metricMap.put(win, mkInstance());
+                }
+            } else if (!rollingDirtyMap.get(win)) {
+                //if this window has never been passed, we still update this window snapshot
+                updateSnapshot(win);
+            }
+        }
+        this.lastFlushTime = TimeUtils.current_time_secs();
+    }
+
+    public List<Integer> rollWindows(long time, List<Integer> windows) {
+        List<Integer> rolling = new ArrayList<>();
+        for (Integer win : windows) {
+            long rollingTime = rollingTimeMap.get(win);
+            // might delay somehow, so add extra 5 sec bias
+            if (time - rollingTime >= win - 5) {
+                rolling.add(win);
+                rollingDirtyMap.put(win, true);     //mark this window has been passed
+                rollingTimeMap.put(win, (long) TimeUtils.current_time_secs());
+            }
+        }
+        return rolling;
+    }
+
+    /**
+     * flush temp data to all windows & assoc metrics.
+     */
+    protected abstract void doFlush();
+
+    public abstract Map<Integer, T> getWindowMetricMap();
+
+    public abstract T mkInstance();
+
+    protected abstract void updateSnapshot(int window);
+
+    public Map<Integer, AsmSnapshot> getSnapshots() {
+        return snapshots;
+    }
+
+    /**
+     * DO NOT judge whether to flush by 60sec because there might be nuance by the alignment of time(maybe less than 1 sec?)
+     * so we subtract 5 sec from a min flush window.
+     */
+    public List<Integer> getValidWindows() {
+        long diff = TimeUtils.current_time_secs() - this.lastFlushTime + 5;
+        if (diff < minWindow) {
+            // logger.warn("no valid windows for metric:{}, diff:{}", this.metricName, diff);
+            return EMPTY_WIN;
+        }
+        // for netty metrics, use only 1min window
+        if (this.metricName.startsWith(MetaType.NETTY.getV())) {
+            return nettyWindows;
+        }
+
+        return windowSeconds;
+    }
+
+    public boolean isAggregate() {
+        return aggregate;
+    }
+
+    public void setAggregate(boolean aggregate) {
+        this.aggregate = aggregate;
+    }
+
+    public static String mkName(Object... parts) {
+        return JOINER.join(parts);
+    }
+
+    public static class MetricOp {
+        public static final int LOG = 1;
+        public static final int REPORT = 2;
+    }
+
+    public static class Builder {
+        public static AsmMetric build(MetricType metricType) {
+            AsmMetric metric;
+            if (metricType == MetricType.COUNTER) {
+                metric = new AsmCounter();
+            } else if (metricType == MetricType.METER) {
+                metric = new AsmMeter();
+            } else if (metricType == MetricType.HISTOGRAM) {
+                metric = new AsmHistogram();
+            } else if (metricType == MetricType.TIMER) {
+                metric = new AsmTimer();
+            } else {
+                throw new IllegalArgumentException("invalid metric type:" + metricType);
+            }
+            return metric;
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        AsmMeter meter = new AsmMeter();
+        int t = 0, f = 0;
+        for (int i = 0; i < 100; i++) {
+            if (meter.sample()) {
+                t++;
+            } else {
+                f++;
+            }
+        }
+        System.out.println(t + "," + f);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java
new file mode 100644
index 0000000..d9f46e1
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.common.metric.snapshot.AsmMeterSnapshot;
+import com.alibaba.jstorm.common.metric.snapshot.AsmTimerSnapshot;
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * same as histogram, each window has a separate timer, which is recreated after the window cycle. note that all data in a timer are measured by nanoseconds. so
+ * for most cases, you can replace with histograms.
+ */
+public class AsmTimer extends AsmMetric<Timer> {
+    private final Map<Integer, Timer> timerMap = new ConcurrentHashMap<Integer, Timer>();
+    private Timer unFlushed = newTimer();
+
+    public AsmTimer() {
+        super();
+        for (int win : windowSeconds) {
+            timerMap.put(win, newTimer());
+        }
+    }
+
+    @Override
+    public void update(Number obj) {
+        if (sample()) {
+            this.unFlushed.update(obj.longValue(), TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void updateDirectly(Number obj) {
+        this.unFlushed.update(obj.longValue(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public Map<Integer, Timer> getWindowMetricMap() {
+        return timerMap;
+    }
+
+    @Override
+    public Timer mkInstance() {
+        return newTimer();
+    }
+
+    @Override
+    protected void updateSnapshot(int window) {
+        Timer timer = timerMap.get(window);
+        if (timer != null){
+            AsmTimerSnapshot timerSnapshot = new AsmTimerSnapshot();
+            timerSnapshot.setHistogram(timer.getSnapshot());
+            timerSnapshot.setMeter(new AsmMeterSnapshot().setM1(timer.getOneMinuteRate()).setM5(timer.getFiveMinuteRate())
+                    .setM15(timer.getFifteenMinuteRate()).setMean(timer.getMeanRate()));
+            if (metricId > 0) {
+                timerSnapshot.setMetricId(metricId);
+            }
+            timerSnapshot.setTs(System.currentTimeMillis());
+            snapshots.put(window, timerSnapshot);
+        }
+    }
+
+    /**
+     * flush temp timer data to all windows & assoc metrics.
+     */
+    protected void doFlush() {
+        long[] values = unFlushed.getSnapshot().getValues();
+        for (Timer timer : timerMap.values()) {
+            for (long val : values) {
+                timer.update(val, TimeUnit.MILLISECONDS);
+
+                for (AsmMetric metric : this.assocMetrics) {
+                    metric.updateDirectly(val);
+                }
+            }
+        }
+        this.unFlushed = newTimer();
+    }
+
+    @Override
+    public AsmMetric clone() {
+        return new AsmTimer();
+    }
+
+    private Timer newTimer() {
+        return new Timer(new ExponentiallyDecayingReservoir());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java
deleted file mode 100755
index f9e97dd..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import com.alibaba.jstorm.common.metric.operator.convert.DefaultConvertor;
-import com.alibaba.jstorm.common.metric.operator.merger.SumMerger;
-import com.alibaba.jstorm.common.metric.operator.updater.AddUpdater;
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-/**
- * The class is similar to com.codahale.metrics.Counter
- * 
- * Sum all window's value
- * 
- * how to use Counter , please refer to Sampling Interface
- * 
- * @author zhongyan.feng
- *
- * @param <T>
- */
-public class Counter<T extends Number> extends Metric<T, T> {
-    private static final long serialVersionUID = -1362345159511508074L;
-
-    /**
-     * 
-     * @param defaultValue
-     */
-    public Counter(T zero) {
-        updater = new AddUpdater<T>();
-        merger = new SumMerger<T>();
-        convertor = new DefaultConvertor<T>();
-        defaultValue = zero;
-
-        init();
-    }
-
-    public static void main(String[] args) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java
new file mode 100644
index 0000000..03d13be
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java
@@ -0,0 +1,34 @@
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+/**
+ * @author wange
+ * @since 15/6/23
+ */
+public class CounterData extends MetricBaseData implements KVSerializable {
+    private long v;
+
+    public long getV() {
+        return v;
+    }
+
+    public void setV(long v) {
+        this.v = v;
+    }
+
+    @Override
+    public byte[] getValue() {
+        return Bytes.toBytes(v);
+    }
+
+    @Override
+    public Object fromKV(byte[] key, byte[] value) {
+        parseKey(key);
+        this.v = Bytes.toLong(value);
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java
deleted file mode 100755
index 30fa110..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import com.alibaba.jstorm.common.metric.window.Metric;
-import com.alibaba.jstorm.common.metric.window.StatBuckets;
-
-public class Gauge<T extends Number> extends Metric<Number, Number> {
-    private static final long serialVersionUID = 1985614006717750790L;
-
-    protected com.codahale.metrics.Gauge<T> gauge;
-
-    public Gauge(com.codahale.metrics.Gauge<T> gauge) {
-        this.gauge = gauge;
-
-        init();
-    }
-
-    @Override
-    public void init() {
-
-    }
-
-    @Override
-    public void update(Number obj) {
-        // TODO Auto-generated method stub
-    }
-
-    @Override
-    public Map<Integer, Number> getSnapshot() {
-        // TODO Auto-generated method stub
-        Number value = gauge.getValue();
-
-        Map<Integer, Number> ret = new TreeMap<Integer, Number>();
-        for (Integer timeKey : windowSeconds) {
-            ret.put(timeKey, value);
-        }
-        ret.put(StatBuckets.ALL_TIME_WINDOW, value);
-
-        return ret;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java
new file mode 100644
index 0000000..134a194
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java
@@ -0,0 +1,34 @@
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+/**
+ * @author wange
+ * @since 15/6/23
+ */
+public class GaugeData extends MetricBaseData implements KVSerializable {
+    private double v;
+
+    public double getV() {
+        return v;
+    }
+
+    public void setV(double v) {
+        this.v = v;
+    }
+
+    @Override
+    public byte[] getValue() {
+        return Bytes.toBytes(v);
+    }
+
+    @Override
+    public Object fromKV(byte[] key, byte[] value) {
+        parseKey(key);
+        this.v = Bytes.toDouble(value);
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java
deleted file mode 100755
index 7276fdf..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import com.alibaba.jstorm.common.metric.operator.convert.Convertor;
-import com.alibaba.jstorm.common.metric.operator.merger.AvgMerger;
-import com.alibaba.jstorm.common.metric.operator.updater.AvgUpdater;
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-/**
- * Meter is used to compute tps
- * 
- * Attention: 1.
- * 
- * @author zhongyan.feng
- * 
- */
-public class Histogram extends Metric<Double, Histogram.HistorgramPair> {
-    private static final long serialVersionUID = -1362345159511508074L;
-
-    public Histogram() {
-        defaultValue =
-                new HistorgramPair();
-        updater = new AvgUpdater();
-        merger = new AvgMerger();
-        convertor = new HistogramConvertor();
-
-        init();
-    }
-
-    public static class HistogramConvertor implements
-            Convertor<HistorgramPair, Double> {
-        private static final long serialVersionUID = -1569170826785657226L;
-
-        @Override
-        public Double convert(HistorgramPair from) {
-            // TODO Auto-generated method stub
-            if (from == null) {
-                return 0.0d;
-            }
-
-            if (from.getTimes() == 0) {
-                return 0.0d;
-            } else {
-                return from.getSum()/ from.getTimes();
-            }
-        }
-
-    }
-    
-    public static class HistorgramPair {
-        private double sum;
-        private long times;
-        
-        public HistorgramPair() {
-            
-        }
-        
-        public HistorgramPair(double sum, long times){
-            this.sum = sum;
-            this.times = times;
-        }
-
-        public double getSum() {
-            return sum;
-        }
-
-        public void setSum(double sum) {
-            this.sum = sum;
-        }
-        
-        public void addValue(double value) {
-            sum += value;
-        }
-
-        public long getTimes() {
-            return times;
-        }
-
-        public void setTimes(long times) {
-            this.times = times;
-        }
-
-        public void addTimes(long time) {
-            times += time;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak
deleted file mode 100755
index b830789..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alibaba.jstorm.common.metric.operator.convert.Convertor;
-import com.alibaba.jstorm.common.metric.operator.merger.AvgMerger2;
-import com.alibaba.jstorm.common.metric.operator.updater.AvgUpdater2;
-import com.alibaba.jstorm.common.metric.window.Metric;
-import com.alibaba.jstorm.utils.Pair;
-import com.google.common.util.concurrent.AtomicDouble;
-
-/**
- * Meter is used to compute tps
- * 
- * Attention: 1.
- * 
- * @author zhongyan.feng
- * 
- */
-public class Histogram extends Metric<Double, Pair<AtomicDouble, AtomicLong>> {
-    private static final long serialVersionUID = -1362345159511508074L;
-
-    public Histogram() {
-        defaultValue =
-                new Pair<AtomicDouble, AtomicLong>(new AtomicDouble(0.0),
-                        new AtomicLong(0));
-        updater = new AvgUpdater2();
-        merger = new AvgMerger2();
-        convertor = new HistogramConvertor();
-
-        init();
-    }
-
-    public static class HistogramConvertor implements
-            Convertor<Pair<AtomicDouble, AtomicLong>, Double> {
-        private static final long serialVersionUID = -1569170826785657226L;
-
-        @Override
-        public Double convert(Pair<AtomicDouble, AtomicLong> from) {
-            // TODO Auto-generated method stub
-            if (from == null) {
-                return 0.0d;
-            }
-
-            if (from.getSecond().get() == 0) {
-                return 0.0d;
-            } else {
-                return from.getFirst().get() / from.getSecond().get();
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java
new file mode 100644
index 0000000..5f5ae97
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java
@@ -0,0 +1,135 @@
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+/**
+ * @author wange
+ * @since 15/6/23
+ */
+public class HistogramData extends MetricBaseData implements KVSerializable {
+    private long min;
+    private long max;
+    private double mean;
+    private double p50;
+    private double p75;
+    private double p95;
+    private double p98;
+    private double p99;
+    private double p999;
+    private double stddev;
+
+    public long getMin() {
+        return min;
+    }
+
+    public void setMin(long min) {
+        this.min = min;
+    }
+
+    public long getMax() {
+        return max;
+    }
+
+    public void setMax(long max) {
+        this.max = max;
+    }
+
+    public double getMean() {
+        return mean;
+    }
+
+    public void setMean(double mean) {
+        this.mean = mean;
+    }
+
+    public double getP50() {
+        return p50;
+    }
+
+    public void setP50(double p50) {
+        this.p50 = p50;
+    }
+
+    public double getP75() {
+        return p75;
+    }
+
+    public void setP75(double p75) {
+        this.p75 = p75;
+    }
+
+    public double getP95() {
+        return p95;
+    }
+
+    public void setP95(double p95) {
+        this.p95 = p95;
+    }
+
+    public double getP98() {
+        return p98;
+    }
+
+    public void setP98(double p98) {
+        this.p98 = p98;
+    }
+
+    public double getP99() {
+        return p99;
+    }
+
+    public void setP99(double p99) {
+        this.p99 = p99;
+    }
+
+    public double getP999() {
+        return p999;
+    }
+
+    public void setP999(double p999) {
+        this.p999 = p999;
+    }
+
+    public double getStddev() {
+        return stddev;
+    }
+
+    public void setStddev(double stddev) {
+        this.stddev = stddev;
+    }
+
+    @Override
+    public byte[] getValue() {
+        byte[] ret = new byte[8 * 9];
+        Bytes.putLong(ret, 0, min);
+        Bytes.putLong(ret, 8, max);
+        Bytes.putDouble(ret, 16, p50);
+        Bytes.putDouble(ret, 24, p75);
+        Bytes.putDouble(ret, 32, p95);
+        Bytes.putDouble(ret, 40, p98);
+        Bytes.putDouble(ret, 48, p99);
+        Bytes.putDouble(ret, 56, p999);
+        Bytes.putDouble(ret, 64, mean);
+
+        return ret;
+    }
+
+    @Override
+    public Object fromKV(byte[] key, byte[] value) {
+        parseKey(key);
+
+        this.min = Bytes.toLong(value, 0, KVSerializable.LONG_SIZE);
+        this.max = Bytes.toLong(value, 8, KVSerializable.LONG_SIZE);
+        this.p50 = Bytes.toDouble(value, 16);
+        this.p75 = Bytes.toDouble(value, 24);
+        this.p95 = Bytes.toDouble(value, 32);
+        this.p98 = Bytes.toDouble(value, 40);
+        this.p99 = Bytes.toDouble(value, 48);
+        this.p999 = Bytes.toDouble(value, 56);
+        this.mean = Bytes.toDouble(value, 64);
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java
deleted file mode 100755
index ac58912..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alibaba.jstorm.common.metric.operator.convert.AtomicLongToLong;
-import com.alibaba.jstorm.common.metric.operator.merger.LongSumMerger;
-import com.alibaba.jstorm.common.metric.operator.updater.LongAddUpdater;
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public class LongCounter extends Metric<Long, AtomicLong> {
-    private static final long serialVersionUID = -1362345159511508074L;
-
-    public LongCounter() {
-        super.defaultValue = new AtomicLong(0);
-        super.updater = new LongAddUpdater();
-        super.merger = new LongSumMerger();
-        super.convertor = new AtomicLongToLong();
-
-        init();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java
deleted file mode 100755
index e56d025..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import com.alibaba.jstorm.common.metric.operator.convert.DefaultConvertor;
-import com.alibaba.jstorm.common.metric.operator.merger.TpsMerger;
-import com.alibaba.jstorm.common.metric.operator.updater.AddUpdater;
-import com.alibaba.jstorm.common.metric.window.Metric;
-import com.alibaba.jstorm.common.metric.window.RollingWindow;
-
-/**
- * Meter is used to compute tps
- * 
- * Attention: 1.
- * 
- * @author zhongyan.feng
- * 
- */
-public class Meter extends Metric<Double, Double> {
-    private static final long serialVersionUID = -1362345159511508074L;
-
-    public Meter() {
-        defaultValue = 0.0d;
-        updater = new AddUpdater<Double>();
-        merger = new TpsMerger();
-        convertor = new DefaultConvertor<Double>();
-
-        init();
-    }
-
-    public void update() {
-        update(Double.valueOf(1));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java
new file mode 100644
index 0000000..2df87aa
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java
@@ -0,0 +1,71 @@
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+/**
+ * @author wange
+ * @since 15/6/23
+ */
+public class MeterData extends MetricBaseData implements KVSerializable {
+    private double m1;
+    private double m5;
+    private double m15;
+    private double mean;
+
+    public double getM1() {
+        return m1;
+    }
+
+    public void setM1(double m1) {
+        this.m1 = m1;
+    }
+
+    public double getM5() {
+        return m5;
+    }
+
+    public void setM5(double m5) {
+        this.m5 = m5;
+    }
+
+    public double getM15() {
+        return m15;
+    }
+
+    public void setM15(double m15) {
+        this.m15 = m15;
+    }
+
+    public double getMean() {
+        return mean;
+    }
+
+    public void setMean(double mean) {
+        this.mean = mean;
+    }
+
+    @Override
+    public byte[] getValue() {
+        byte[] ret = new byte[8 * 4];
+        Bytes.putDouble(ret, 0, m1);
+        Bytes.putDouble(ret, 8, m5);
+        Bytes.putDouble(ret, 16, m15);
+        Bytes.putDouble(ret, 24, mean);
+
+        return ret;
+    }
+
+    @Override
+    public Object fromKV(byte[] key, byte[] value) {
+        parseKey(key);
+
+        this.m1 = Bytes.toDouble(value, 0);
+        this.m5 = Bytes.toDouble(value, 8);
+        this.m15 = Bytes.toDouble(value, 16);
+        this.mean = Bytes.toDouble(value, 24);
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java
new file mode 100644
index 0000000..907762d
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java
@@ -0,0 +1,59 @@
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+import java.util.Date;
+
+/**
+ * @author wange
+ * @since 15/7/22
+ */
+public abstract class MetricBaseData implements KVSerializable {
+    protected long metricId;
+    protected int win;
+    protected Date ts;
+
+    public long getMetricId() {
+        return metricId;
+    }
+
+    public void setMetricId(long metricId) {
+        this.metricId = metricId;
+    }
+
+    public Date getTs() {
+        return ts;
+    }
+
+    public void setTs(Date ts) {
+        this.ts = ts;
+    }
+
+    public int getWin() {
+        return win;
+    }
+
+    public void setWin(int win) {
+        this.win = win;
+    }
+
+    @Override
+    public byte[] getKey() {
+        return makeKey(metricId, win, ts.getTime());
+    }
+
+    public static byte[] makeKey(long metricId, int win, long ts) {
+        byte[] ret = new byte[8 + 4 + 8];
+        Bytes.putLong(ret, 0, metricId);
+        Bytes.putInt(ret, 8, win);
+        Bytes.putLong(ret, 12, ts);
+        return ret;
+    }
+
+    protected void parseKey(byte[] key) {
+        this.metricId = Bytes.toLong(key, 0, KVSerializable.LONG_SIZE);
+        this.win = Bytes.toInt(key, 8, KVSerializable.INT_SIZE);
+        this.ts = new Date(Bytes.toLong(key, 12, KVSerializable.LONG_SIZE));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java
deleted file mode 100755
index 92b1f6b..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.io.Serializable;
-
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public interface MetricFilter extends Serializable {
-    /**
-     * Matches all metrics, regardless of type or name.
-     */
-    MetricFilter ALL = new MetricFilter() {
-        private static final long serialVersionUID = 7089987006352295530L;
-
-        @Override
-        public boolean matches(String name, Metric metric) {
-            return true;
-        }
-    };
-
-    /**
-     * Returns {@code true} if the metric matches the filter; {@code false}
-     * otherwise.
-     *
-     * @param name the metric's name
-     * @param metric the metric
-     * @return {@code true} if the metric matches the filter
-     */
-    boolean matches(String name, Metric metric);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java
new file mode 100644
index 0000000..da6b4dd
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java
@@ -0,0 +1,213 @@
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.metric.KVSerializable;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.utils.JStormUtils;
+
+import java.util.Date;
+
+/**
+ * @author wange
+ * @since 15/6/18
+ */
+public class MetricMeta implements KVSerializable {
+    // common
+    private long id;
+    // string id
+    private String sid;
+    private String clusterName;
+    private String topologyId;
+    private int metricType;
+    private String metricGroup = MetricUtils.DEFAULT_GROUP;//sys group
+    private String metricName;
+    private Date gmtCreate = new Date();
+
+    // task meta
+    private String component = MetricUtils.EMPTY;
+    private int taskId = 0;
+    private String streamId = MetricUtils.EMPTY;
+    private int metaType;
+
+    // worker meta
+    private String host = MetricUtils.EMPTY;
+    private int port = 0;
+
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+        this.sid = id + "";
+    }
+
+    public String getSid() {
+        return sid;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getTopologyId() {
+        return topologyId;
+    }
+
+    public void setTopologyId(String topologyId) {
+        this.topologyId = topologyId;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public int getMetricType() {
+        return metricType;
+    }
+
+    public void setMetricType(int metricType) {
+        this.metricType = metricType;
+    }
+
+    public String getMetricGroup() {
+        return metricGroup;
+    }
+
+    public void setMetricGroup(String metricGroup) {
+        this.metricGroup = metricGroup;
+    }
+
+    public String getMetricName() {
+        return metricName;
+    }
+
+    public void setMetricName(String metricName) {
+        this.metricName = metricName;
+    }
+
+    public Date getGmtCreate() {
+        return gmtCreate;
+    }
+
+    public void setGmtCreate(Date gmtCreate) {
+        this.gmtCreate = gmtCreate;
+    }
+
+    public String getComponent() {
+        return component;
+    }
+
+    public void setComponent(String component) {
+        this.component = component;
+    }
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public int getMetaType() {
+        return metaType;
+    }
+
+    public void setMetaType(int metaType) {
+        this.metaType = metaType;
+    }
+
+    public boolean isWorkerMetric() {
+        return this.metaType == MetaType.NETTY.getT() || this.getMetaType() == MetaType.WORKER.getT() ||
+                this.metaType == MetaType.TOPOLOGY.getT();
+    }
+
+    public String getFQN() {
+        MetaType meta = MetaType.parse(metaType);
+        MetricType metric = MetricType.parse(metricType);
+        String types = meta.getV() + metric.getV();
+        if (isWorkerMetric()) {
+            return MetricUtils.concat2(types, topologyId, host, port, metricGroup, metricName);
+        }
+        return MetricUtils.concat2(types, topologyId, component, taskId, streamId, metricGroup, metricName);
+    }
+
+    /**
+     * key: clusterName + topologyId + metaType + id
+     */
+    @Override
+    public byte[] getKey() {
+        StringBuilder sb = new StringBuilder(64);
+        sb.append(clusterName).append(MetricUtils.AT).append(topologyId).append(MetricUtils.AT)
+                .append(metaType).append(MetricUtils.AT).append(id);
+        return sb.toString().getBytes();
+    }
+
+    /**
+     * value: component + taskId + streamId + metricType + host + port + metricGroup + metricName
+     */
+    @Override
+    public byte[] getValue() {
+        StringBuilder sb = new StringBuilder(64);
+        sb.append(component).append(MetricUtils.AT).append(taskId).append(MetricUtils.AT)
+                .append(streamId).append(MetricUtils.AT).append(metricType).append(MetricUtils.AT)
+                .append(host).append(MetricUtils.AT).append(port).append(MetricUtils.AT)
+                .append(metricGroup).append(MetricUtils.AT).append(metricName);
+        return sb.toString().getBytes();
+    }
+
+    @Override
+    public Object fromKV(byte[] key, byte[] value) {
+        String[] keyParts = new String(key).split(MetricUtils.DELIM);
+        if (keyParts.length >= 4) {
+            this.clusterName = keyParts[0];
+            this.topologyId = keyParts[1];
+            this.metaType = Integer.valueOf(keyParts[2]);
+            this.id = Long.valueOf(keyParts[3]);
+            this.sid = this.id + "";
+        }
+        String[] valueParts = new String(value).split(MetricUtils.DELIM);
+        if (valueParts.length >= 8) {
+            this.component = valueParts[0];
+            this.taskId = JStormUtils.parseInt(valueParts[1], 0);
+            this.streamId = valueParts[2];
+            this.metricType = JStormUtils.parseInt(valueParts[3], 0);
+            this.host = valueParts[4];
+            this.port = JStormUtils.parseInt(valueParts[5], 0);
+            this.metricGroup = valueParts[6];
+            this.metricName = valueParts[7];
+        }
+        return this;
+    }
+
+    public static MetricMeta parse(String name) {
+        return MetricMetaParser.fromMetricName(name);
+    }
+
+}