You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:59 UTC
[21/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java
index 5ad70cb..c2e07ee 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormStatus.java
@@ -29,10 +29,8 @@ import com.alibaba.jstorm.daemon.nimbus.StatusType;
*
* Dedicate Topology status
*
- * Topology status: active/inactive/killed/rebalancing killTimeSecs: when status
- * isn't killed, it is -1 and useless. when status is killed, do kill operation
- * after killTimeSecs seconds when status is rebalancing, do rebalancing opation
- * after delaySecs seconds restore oldStatus as current status
+ * Topology status: active/inactive/killed/rebalancing killTimeSecs: when status isn't killed, it is -1 and useless. when status is killed, do kill operation
+ * after killTimeSecs seconds when status is rebalancing, do rebalancing opation after delaySecs seconds restore oldStatus as current status
*/
public class StormStatus implements Serializable {
@@ -99,9 +97,7 @@ public class StormStatus implements Serializable {
}
StormStatus check = (StormStatus) base;
- if (check.getStatusType().equals(getStatusType())
- && check.getKillTimeSecs() == getKillTimeSecs()
- && check.getDelaySecs().equals(getDelaySecs())) {
+ if (check.getStatusType().equals(getStatusType()) && check.getKillTimeSecs() == getKillTimeSecs() && check.getDelaySecs().equals(getDelaySecs())) {
return true;
}
return false;
@@ -109,15 +105,12 @@ public class StormStatus implements Serializable {
@Override
public int hashCode() {
- return this.getStatusType().hashCode()
- + this.getKillTimeSecs().hashCode()
- + this.getDelaySecs().hashCode();
+ return this.getStatusType().hashCode() + this.getKillTimeSecs().hashCode() + this.getDelaySecs().hashCode();
}
@Override
public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
index bd60d45..1550c7e 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cluster/StormZkClusterState.java
@@ -17,18 +17,8 @@
*/
package com.alibaba.jstorm.cluster;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import backtype.storm.generated.TopologyTaskHbInfo;
import backtype.storm.utils.Utils;
-
import com.alibaba.jstorm.cache.JStormCache;
import com.alibaba.jstorm.callback.ClusterStateCallback;
import com.alibaba.jstorm.callback.RunnableCallback;
@@ -38,14 +28,22 @@ import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.AssignmentBak;
import com.alibaba.jstorm.task.TaskInfo;
import com.alibaba.jstorm.task.error.TaskError;
-import com.alibaba.jstorm.task.heartbeat.TaskHeartbeat;
+import com.alibaba.jstorm.task.backpressure.SourceBackpressureInfo;
import com.alibaba.jstorm.utils.JStormUtils;
import com.alibaba.jstorm.utils.PathUtils;
import com.alibaba.jstorm.utils.TimeUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
public class StormZkClusterState implements StormClusterState {
- private static Logger LOG = LoggerFactory
- .getLogger(StormZkClusterState.class);
+ private static Logger LOG = LoggerFactory.getLogger(StormZkClusterState.class);
private ClusterState cluster_state;
@@ -67,12 +65,10 @@ public class StormZkClusterState implements StormClusterState {
} else {
solo = true;
- cluster_state =
- new DistributedClusterState((Map) cluster_state_spec);
+ cluster_state = new DistributedClusterState((Map) cluster_state_spec);
}
- assignment_info_callback =
- new ConcurrentHashMap<String, RunnableCallback>();
+ assignment_info_callback = new ConcurrentHashMap<String, RunnableCallback>();
supervisors_callback = new AtomicReference<RunnableCallback>(null);
assignments_callback = new AtomicReference<RunnableCallback>(null);
storm_base_callback = new ConcurrentHashMap<String, RunnableCallback>();
@@ -85,8 +81,7 @@ public class StormZkClusterState implements StormClusterState {
LOG.warn("Input args is null");
return null;
} else if (args.length < 2) {
- LOG.warn("Input args is invalid, args length:"
- + args.length);
+ LOG.warn("Input args is invalid, args length:" + args.length);
return null;
}
@@ -132,11 +127,8 @@ public class StormZkClusterState implements StormClusterState {
});
String[] pathlist =
- JStormUtils.mk_arr(Cluster.SUPERVISORS_SUBTREE,
- Cluster.STORMS_SUBTREE, Cluster.ASSIGNMENTS_SUBTREE,
- Cluster.ASSIGNMENTS_BAK_SUBTREE, Cluster.TASKS_SUBTREE,
- Cluster.TASKBEATS_SUBTREE, Cluster.TASKERRORS_SUBTREE,
- Cluster.METRIC_SUBTREE);
+ JStormUtils.mk_arr(Cluster.SUPERVISORS_SUBTREE, Cluster.STORMS_SUBTREE, Cluster.ASSIGNMENTS_SUBTREE, Cluster.ASSIGNMENTS_BAK_SUBTREE,
+ Cluster.TASKS_SUBTREE, Cluster.TASKBEATS_SUBTREE, Cluster.TASKERRORS_SUBTREE, Cluster.METRIC_SUBTREE, Cluster.BACKPRESSURE_SUBTREE);
for (String path : pathlist) {
cluster_state.mkdirs(path);
}
@@ -146,8 +138,7 @@ public class StormZkClusterState implements StormClusterState {
/**
* @@@ TODO
*
- * Just add cache in lower ZK level In fact, for some Object
- * Assignment/TaskInfo/StormBase These object can be cache for long time
+ * Just add cache in lower ZK level In fact, for some Object Assignment/TaskInfo/StormBase These object can be cache for long time
*
* @param simpleCache
*/
@@ -221,10 +212,10 @@ public class StormZkClusterState implements StormClusterState {
deleteObject(Cluster.storm_task_root(topologyId));
teardown_heartbeats(topologyId);
teardown_task_errors(topologyId);
+ teardown_backpressure(topologyId);
deleteObject(Cluster.metric_path(topologyId));
} catch (Exception e) {
- LOG.warn("Failed to delete task root and monitor root for"
- + topologyId);
+ LOG.warn("Failed to delete task root and monitor root for" + topologyId);
}
remove_storm_base(topologyId);
}
@@ -240,8 +231,7 @@ public class StormZkClusterState implements StormClusterState {
}
@Override
- public Assignment assignment_info(String topologyId,
- RunnableCallback callback) throws Exception {
+ public Assignment assignment_info(String topologyId, RunnableCallback callback) throws Exception {
if (callback != null) {
assignment_info_callback.put(topologyId, callback);
}
@@ -257,13 +247,11 @@ public class StormZkClusterState implements StormClusterState {
if (callback != null) {
assignments_callback.set(callback);
}
- return cluster_state.get_children(Cluster.ASSIGNMENTS_SUBTREE,
- callback != null);
+ return cluster_state.get_children(Cluster.ASSIGNMENTS_SUBTREE, callback != null);
}
@Override
- public void set_assignment(String topologyId, Assignment info)
- throws Exception {
+ public void set_assignment(String topologyId, Assignment info) throws Exception {
setObject(Cluster.assignment_path(topologyId), info);
}
@@ -276,26 +264,22 @@ public class StormZkClusterState implements StormClusterState {
}
@Override
- public void backup_assignment(String topologyName, AssignmentBak info)
- throws Exception {
+ public void backup_assignment(String topologyName, AssignmentBak info) throws Exception {
setObject(Cluster.assignment_bak_path(topologyName), info);
}
@Override
- public StormBase storm_base(String topologyId, RunnableCallback callback)
- throws Exception {
+ public StormBase storm_base(String topologyId, RunnableCallback callback) throws Exception {
if (callback != null) {
storm_base_callback.put(topologyId, callback);
}
- return (StormBase) getObject(Cluster.storm_path(topologyId),
- callback != null);
+ return (StormBase) getObject(Cluster.storm_path(topologyId), callback != null);
}
@Override
- public void activate_storm(String topologyId, StormBase stormBase)
- throws Exception {
+ public void activate_storm(String topologyId, StormBase stormBase) throws Exception {
String stormPath = Cluster.storm_path(topologyId);
setObject(stormPath, stormBase);
@@ -307,8 +291,7 @@ public class StormZkClusterState implements StormClusterState {
}
@Override
- public void update_storm(String topologyId, StormStatus newElems)
- throws Exception {
+ public void update_storm(String topologyId, StormStatus newElems) throws Exception {
/**
* FIXME, maybe overwrite old callback
*/
@@ -323,8 +306,7 @@ public class StormZkClusterState implements StormClusterState {
}
@Override
- public void set_storm_monitor(String topologyId, boolean isEnable)
- throws Exception {
+ public void set_storm_monitor(String topologyId, boolean isEnable) throws Exception {
// TODO Auto-generated method stub
StormBase base = this.storm_base(topologyId, null);
@@ -340,30 +322,20 @@ public class StormZkClusterState implements StormClusterState {
}
@Override
- public void setup_heartbeats(String topologyId) throws Exception {
- String taskbeatPath = Cluster.taskbeat_storm_root(topologyId);
-
- cluster_state.mkdirs(taskbeatPath);
- }
-
- @Override
- public List<String> heartbeat_storms() throws Exception {
- return cluster_state.get_children(Cluster.TASKBEATS_SUBTREE, false);
+ public void topology_heartbeat(String topologyId, TopologyTaskHbInfo info) throws Exception {
+ String taskPath = Cluster.taskbeat_storm_root(topologyId);
+ setObject(taskPath, info);
}
@Override
- public List<String> heartbeat_tasks(String topologyId) throws Exception {
- String taskbeatPath = Cluster.taskbeat_storm_root(topologyId);
-
- return cluster_state.get_children(taskbeatPath, false);
+ public TopologyTaskHbInfo topology_heartbeat(String topologyId) throws Exception {
+ String taskPath = Cluster.taskbeat_storm_root(topologyId);
+ return (TopologyTaskHbInfo) getObject(taskPath, false);
}
@Override
- public void remove_task_heartbeat(String topologyId, int taskId)
- throws Exception {
- String taskbeatPath = Cluster.taskbeat_path(topologyId, taskId);
-
- deleteObject(taskbeatPath);
+ public List<String> heartbeat_storms() throws Exception {
+ return cluster_state.get_children(Cluster.TASKBEATS_SUBTREE, false);
}
@Override
@@ -379,14 +351,11 @@ public class StormZkClusterState implements StormClusterState {
}
@Override
- public void report_task_error(String topologyId, int taskId, Throwable error)
- throws Exception {
- report_task_error(topologyId, taskId,
- new String(JStormUtils.getErrorInfo(error)));
+ public void report_task_error(String topologyId, int taskId, Throwable error) throws Exception {
+ report_task_error(topologyId, taskId, new String(JStormUtils.getErrorInfo(error)), null);
}
- public void report_task_error(String topologyId, int taskId, String error)
- throws Exception {
+ public void report_task_error(String topologyId, int taskId, String error, String tag) throws Exception {
boolean found = false;
String path = Cluster.taskerror_path(topologyId, taskId);
cluster_state.mkdirs(path);
@@ -403,9 +372,10 @@ public class StormZkClusterState implements StormClusterState {
deleteObject(errorPath);
continue;
}
- if (errorInfo.equals(error)) {
- deleteObject(errorPath);
- setObject(timestampPath, error);
+ if (errorInfo.equals(error)
+ || (tag != null && errorInfo.startsWith(tag))) {
+ cluster_state.delete_node(errorPath);
+ cluster_state.set_data(timestampPath, error.getBytes());
found = true;
break;
}
@@ -429,8 +399,7 @@ public class StormZkClusterState implements StormClusterState {
private static final String TASK_IS_DEAD = "is dead on"; // Full string is
// "task-id is dead on hostname:port"
- private void setLastErrInfo(String topologyId, String error,
- String timeStamp) throws Exception {
+ private void setLastErrInfo(String topologyId, String error, String timeStamp) throws Exception {
// Set error information in task error topology patch
// Last Error information format in ZK: map<report_duration, timestamp>
// report_duration means only the errors will presented in web ui if the
@@ -440,13 +409,10 @@ public class StormZkClusterState implements StormClusterState {
String lastErrTopoPath = Cluster.lasterror_path(topologyId);
Map<Integer, String> lastErrInfo = null;
try {
- lastErrInfo =
- (Map<Integer, String>) getObject(lastErrTopoPath, false);
+ lastErrInfo = (Map<Integer, String>) getObject(lastErrTopoPath, false);
} catch (Exception e) {
- LOG.error(
- "Failed to get last error time. Remove the corrupt node for "
- + topologyId, e);
+ LOG.error("Failed to get last error time. Remove the corrupt node for " + topologyId, e);
remove_lastErr_time(topologyId);
lastErrInfo = null;
}
@@ -466,15 +432,13 @@ public class StormZkClusterState implements StormClusterState {
}
@Override
- public void remove_task_error(String topologyId, int taskId)
- throws Exception {
+ public void remove_task_error(String topologyId, int taskId) throws Exception {
String path = Cluster.taskerror_path(topologyId, taskId);
cluster_state.delete_node(path);
}
@Override
- public Map<Integer, String> topo_lastErr_time(String topologyId)
- throws Exception {
+ public Map<Integer, String> topo_lastErr_time(String topologyId) throws Exception {
String path = Cluster.lasterror_path(topologyId);
return (Map<Integer, String>) getObject(path, false);
@@ -490,17 +454,18 @@ public class StormZkClusterState implements StormClusterState {
public List<String> task_error_storms() throws Exception {
return cluster_state.get_children(Cluster.TASKERRORS_SUBTREE, false);
}
-
+
@Override
public List<String> task_error_ids(String topologyId) throws Exception {
- return cluster_state.get_children(Cluster.taskerror_storm_root(topologyId), false);
+ return cluster_state.get_children(Cluster.taskerror_storm_root(topologyId), false);
}
@Override
- public List<String> task_error_time(String topologyId, int taskId)
- throws Exception {
+ public List<String> task_error_time(String topologyId, int taskId) throws Exception {
String path = Cluster.taskerror_path(topologyId, taskId);
- cluster_state.mkdirs(path);
+ if (cluster_state.node_existed(path, false) == false) {
+ return new ArrayList<String>();
+ }
return cluster_state.get_children(path, false);
}
@@ -509,38 +474,37 @@ public class StormZkClusterState implements StormClusterState {
String tasksPath = Cluster.storm_task_root(topologyId);
Object data = getObject(tasksPath, false);
if (data != null) {
- Map<Integer, TaskInfo> taskInfoMap = ((Map<Integer, TaskInfo>)data);
- for (Integer taskId : taskIds){
+ Map<Integer, TaskInfo> taskInfoMap = ((Map<Integer, TaskInfo>) data);
+ for (Integer taskId : taskIds) {
taskInfoMap.remove(taskId);
}
- //update zk node of tasks
+ // update zk node of tasks
setObject(tasksPath, taskInfoMap);
}
}
@Override
- public String task_error_info(String topologyId, int taskId, long timeStamp)
- throws Exception {
+ public String task_error_info(String topologyId, int taskId, long timeStamp) throws Exception {
String path = Cluster.taskerror_path(topologyId, taskId);
- cluster_state.mkdirs(path);
path = path + "/" + timeStamp;
return getString(path, false);
}
@Override
- public List<TaskError> task_errors(String topologyId, int taskId)
- throws Exception {
- String path = Cluster.taskerror_path(topologyId, taskId);
- cluster_state.mkdirs(path);
+ public List<TaskError> task_errors(String topologyId, int taskId) throws Exception {
+ List<TaskError> errors = new ArrayList<TaskError>();
+ String path = Cluster.taskerror_path(topologyId, taskId);
+ if (cluster_state.node_existed(path, false) == false) {
+ return errors;
+ }
List<String> children = cluster_state.get_children(path, false);
- List<TaskError> errors = new ArrayList<TaskError>();
+
for (String str : children) {
byte[] v = cluster_state.get_data(path + "/" + str, false);
if (v != null) {
- TaskError error =
- new TaskError(new String(v), Integer.parseInt(str));
+ TaskError error = new TaskError(new String(v), Integer.parseInt(str));
errors.add(error);
}
}
@@ -572,45 +536,28 @@ public class StormZkClusterState implements StormClusterState {
LOG.error("Could not teardown errors for " + topologyId, e);
}
}
+
@Override
- public void set_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap)
- throws Exception {
+ public void set_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception {
String stormTaskPath = Cluster.storm_task_root(topologyId);
- if (taskInfoMap != null){
- //reupdate zk node of tasks
+ if (taskInfoMap != null) {
+ // reupdate zk node of tasks
setObject(stormTaskPath, taskInfoMap);
}
}
+
@Override
- public void add_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap)
- throws Exception {
+ public void add_task(String topologyId, Map<Integer, TaskInfo> taskInfoMap) throws Exception {
String stormTaskPath = Cluster.storm_task_root(topologyId);
Object data = getObject(stormTaskPath, false);
- if (data != null){
- ((Map<Integer, TaskInfo>)data).putAll(taskInfoMap);
- //reupdate zk node of tasks
+ if (data != null) {
+ ((Map<Integer, TaskInfo>) data).putAll(taskInfoMap);
+ // reupdate zk node of tasks
setObject(stormTaskPath, data);
}
}
@Override
- public TaskHeartbeat task_heartbeat(String topologyId, int taskId)
- throws Exception {
- String taskbeatPath = Cluster.taskbeat_path(topologyId, taskId);
-
- return (TaskHeartbeat) getObjectSync(taskbeatPath, false);
-
- }
-
- @Override
- public void task_heartbeat(String topologyId, int taskId, TaskHeartbeat info)
- throws Exception {
- String taskPath = Cluster.taskbeat_path(topologyId, taskId);
-
- setObject(taskPath, info);
- }
-
- @Override
public List<String> task_storms() throws Exception {
return cluster_state.get_children(Cluster.TASKS_SUBTREE, false);
}
@@ -623,23 +570,22 @@ public class StormZkClusterState implements StormClusterState {
if (data == null) {
return null;
}
- return ((Map<Integer, TaskInfo>)data).keySet();
+ return ((Map<Integer, TaskInfo>) data).keySet();
}
@Override
- public Set<Integer> task_ids_by_componentId(String topologyId,
- String componentId) throws Exception {
+ public Set<Integer> task_ids_by_componentId(String topologyId, String componentId) throws Exception {
String stormTaskPath = Cluster.storm_task_root(topologyId);
Object data = getObject(stormTaskPath, false);
if (data == null) {
return null;
}
- Map<Integer, TaskInfo> taskInfoMap = (Map<Integer, TaskInfo>)data;
+ Map<Integer, TaskInfo> taskInfoMap = (Map<Integer, TaskInfo>) data;
Set<Integer> rtn = new HashSet<Integer>();
Set<Integer> taskIds = taskInfoMap.keySet();
- for(Integer taskId : taskIds){
+ for (Integer taskId : taskIds) {
TaskInfo taskInfo = taskInfoMap.get(taskId);
- if (taskInfo != null){
+ if (taskInfo != null) {
if (taskInfo.getComponentId().equalsIgnoreCase(componentId))
rtn.add(taskId);
}
@@ -672,13 +618,11 @@ public class StormZkClusterState implements StormClusterState {
if (callback != null) {
supervisors_callback.set(callback);
}
- return cluster_state.get_children(Cluster.SUPERVISORS_SUBTREE,
- callback != null);
+ return cluster_state.get_children(Cluster.SUPERVISORS_SUBTREE, callback != null);
}
@Override
- public void supervisor_heartbeat(String supervisorId, SupervisorInfo info)
- throws Exception {
+ public void supervisor_heartbeat(String supervisorId, SupervisorInfo info) throws Exception {
String supervisorPath = Cluster.supervisor_path(supervisorId);
@@ -703,15 +647,13 @@ public class StormZkClusterState implements StormClusterState {
}
public String get_nimbus_slave_time(String host) throws Exception {
- String path =
- Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR + host;
- return (String) getObject(path, false);
+ String path = Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR + host;
+ return getString(path, false);
}
@Override
public void update_nimbus_slave(String host, int time) throws Exception {
- setTempObject(Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR
- + host, String.valueOf(time));
+ setTempObject(Cluster.NIMBUS_SLAVE_SUBTREE + Cluster.ZK_SEPERATOR + host, String.valueOf(time));
}
@Override
@@ -720,8 +662,24 @@ public class StormZkClusterState implements StormClusterState {
}
@Override
- public boolean try_to_be_leader(String path, String host,
- RunnableCallback callback) throws Exception {
+ public void update_nimbus_detail(String hostPort, Map map) throws Exception {
+ // TODO Auto-generated method stub
+ cluster_state.set_ephemeral_node(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + Cluster.ZK_SEPERATOR + hostPort, Utils.serialize(map));
+ }
+
+ @Override
+ public Map get_nimbus_detail(String hostPort, boolean watch) throws Exception {
+ byte[] data = cluster_state.get_data(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + Cluster.ZK_SEPERATOR + hostPort, watch);
+ return (Map) Utils.maybe_deserialize(data);
+ }
+ @Override
+ public void unregister_nimbus_detail(String hostPort) throws Exception {
+ cluster_state.delete_node(Cluster.NIMBUS_SLAVE_DETAIL_SUBTREE + Cluster.ZK_SEPERATOR + hostPort);
+ }
+
+
+ @Override
+ public boolean try_to_be_leader(String path, String host, RunnableCallback callback) throws Exception {
// TODO Auto-generated method stub
if (callback != null)
this.master_callback.set(callback);
@@ -736,24 +694,53 @@ public class StormZkClusterState implements StormClusterState {
}
@Override
- public void set_topology_metric(String topologyId, Object metric)
- throws Exception {
- // TODO Auto-generated method stub
+ public void set_topology_metric(String topologyId, Object metric) throws Exception {
String path = Cluster.metric_path(topologyId);
-
setObject(path, metric);
}
@Override
public Object get_topology_metric(String topologyId) throws Exception {
- // TODO Auto-generated method stub
return getObject(Cluster.metric_path(topologyId), false);
}
- @Override
- public List<String> get_metrics() throws Exception {
- // TODO Auto-generated method stub
- return cluster_state.get_children(Cluster.METRIC_SUBTREE, false);
- }
+ @Override
+ public List<String> get_metrics() throws Exception {
+ return cluster_state.get_children(Cluster.METRIC_SUBTREE, false);
+ }
+ @Override
+ public List<String> list_dirs(String path, boolean watch) throws Exception {
+ List<String> subDirs = null;
+ subDirs = cluster_state.get_children(path, watch);
+ return subDirs;
+ }
+ @Override
+ public List<String> backpressureInfos() throws Exception {
+ return cluster_state.get_children(Cluster.BACKPRESSURE_SUBTREE, false);
+ }
+
+ @Override
+ public void set_backpressure_info(String topologyId, Map<String, SourceBackpressureInfo> sourceToBackpressureInfo) throws Exception {
+ String path = Cluster.backpressure_path(topologyId);
+ cluster_state.set_data(path, Utils.serialize(sourceToBackpressureInfo));
+ }
+
+ @Override
+ public Map<String, SourceBackpressureInfo> get_backpressure_info(String topologyId) throws Exception {
+ String path = Cluster.backpressure_path(topologyId);
+ byte[] data = cluster_state.get_data(path, false);
+ return (Map<String, SourceBackpressureInfo>) Utils.maybe_deserialize(data);
+ }
+
+ @Override
+ public void teardown_backpressure(String topologyId) {
+ try {
+ String backpressurePath = Cluster.backpressure_path(topologyId);
+
+ cluster_state.delete_node(backpressurePath);
+ } catch (Exception e) {
+ LOG.warn("Could not teardown backpressure info for " + topologyId, e);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java
new file mode 100644
index 0000000..120bbfb
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmCounter.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.common.metric.snapshot.AsmCounterSnapshot;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.codahale.metrics.Counter;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * counter wrapper. note that counter is a little special, every snapshot we only return the delta value instead of
+ * total value, which prevents data loss if certain tasks are killed.
+ */
+public class AsmCounter extends AsmMetric<Counter> {
+
+ private final Map<Integer, Counter> counterMap = new ConcurrentHashMap<>();
+ private Counter unFlushed = new Counter();
+
+ public AsmCounter() {
+ super();
+ for (int win : windowSeconds) {
+ counterMap.put(win, new Counter());
+ }
+ }
+
+ public void inc() {
+ update(1);
+ }
+
+ @Override
+ public void update(Number val) {
+ this.unFlushed.inc(val.longValue());
+ }
+
+ /**
+ * flush temp counter data to all windows & assoc metrics.
+ */
+ protected void doFlush() {
+ long v = unFlushed.getCount();
+ for (Counter counter : counterMap.values()) {
+ counter.inc(v);
+ }
+ for (AsmMetric assocMetric : assocMetrics) {
+ assocMetric.updateDirectly(v);
+ }
+
+ this.unFlushed.dec(v);
+ }
+
+ @Override
+ public Map<Integer, Counter> getWindowMetricMap() {
+ return counterMap;
+ }
+
+ @Override
+ public Counter mkInstance() {
+ return new Counter();
+ }
+
+ @Override
+ protected void updateSnapshot(int window) {
+ Counter counter = counterMap.get(window);
+ if (counter != null) {
+ AsmSnapshot snapshot = new AsmCounterSnapshot().setValue(counter.getCount())
+ .setTs(System.currentTimeMillis()).setMetricId(metricId);
+ snapshots.put(window, snapshot);
+ }
+ }
+
+ @Override
+ public AsmMetric clone() {
+ return new AsmCounter();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java
new file mode 100644
index 0000000..4bc255a
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmGauge.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.common.metric.snapshot.AsmGaugeSnapshot;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.codahale.metrics.Gauge;
+
+import java.util.Map;
+
+/**
+ * gauges cannot be aggregated.
+ */
+public class AsmGauge extends AsmMetric<Gauge> {
+
+ private Gauge gauge;
+
+ public AsmGauge(Gauge<Double> gauge) {
+ this.aggregate = false;
+ this.gauge = gauge;
+ }
+
+ @Override
+ public void update(Number obj) {
+ // nothing to do for gauges.
+ }
+
+ @Override
+ public AsmMetric clone() {
+ AsmMetric metric = new AsmGauge(this.gauge);
+ metric.setMetricName(this.getMetricName());
+ return metric;
+ }
+
+ @Override
+ public Map<Integer, Gauge> getWindowMetricMap() {
+ return null;
+ }
+
+ @Override
+ public Gauge mkInstance() {
+ return null;
+ }
+
+ @Override
+ protected void doFlush() {
+ // nothing to do for gauges.
+ }
+
+ @Override
+ protected void updateSnapshot(int window) {
+ double v = (Double) gauge.getValue();
+ AsmSnapshot snapshot = new AsmGaugeSnapshot().setValue(v)
+ .setTs(System.currentTimeMillis()).setMetricId(metricId);
+ snapshots.put(window, snapshot);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java
new file mode 100644
index 0000000..43c8dbc
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmHistogram.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.common.metric.snapshot.AsmHistogramSnapshot;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Histogram;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * each window has a separate histogram, which is recreated after the window cycle.
+ */
+public class AsmHistogram extends AsmMetric<Histogram> {
+
+ private final Map<Integer, Histogram> histogramMap = new ConcurrentHashMap<Integer, Histogram>();
+ private Histogram unFlushed = newHistogram();
+
+ public AsmHistogram() {
+ super();
+ for (int win : windowSeconds) {
+ histogramMap.put(win, newHistogram());
+ }
+ }
+
+ @Override
+ public void update(Number obj) {
+ if (sample()) {
+ this.unFlushed.update(obj.longValue());
+ }
+ }
+
+ @Override
+ public void updateDirectly(Number obj) {
+ this.unFlushed.update(obj.longValue());
+ }
+
+ @Override
+ public Map<Integer, Histogram> getWindowMetricMap() {
+ return histogramMap;
+ }
+
+ @Override
+ public Histogram mkInstance() {
+ return newHistogram();
+ }
+
+ @Override
+ protected void updateSnapshot(int window) {
+ Histogram histogram = histogramMap.get(window);
+ if (histogram != null) {
+ AsmSnapshot snapshot = new AsmHistogramSnapshot().setSnapshot(histogram.getSnapshot())
+ .setTs(System.currentTimeMillis()).setMetricId(metricId);
+ snapshots.put(window, snapshot);
+ }
+ }
+
+ /**
+ * flush temp histogram data to all windows & assoc metrics.
+ */
+ protected void doFlush() {
+ long[] values = unFlushed.getSnapshot().getValues();
+ for (Histogram histogram : histogramMap.values()) {
+ for (long val : values) {
+ histogram.update(val);
+ }
+ }
+ for (long val : values) {
+ for (AsmMetric metric : this.assocMetrics) {
+ metric.updateDirectly(val);
+ }
+ }
+ this.unFlushed = newHistogram();
+ }
+
+ @Override
+ public AsmMetric clone() {
+ return new AsmHistogram();
+ }
+
+ private Histogram newHistogram() {
+ return new Histogram(new ExponentiallyDecayingReservoir());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java
new file mode 100644
index 0000000..8959800
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMeter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.common.metric.snapshot.AsmMeterSnapshot;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.codahale.metrics.Meter;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * one meter & one snapshot for all windows. since meter is window-sliding, there's no need to recreate new ones.
+ */
+public class AsmMeter extends AsmMetric<Meter> {
+ private final Meter meter = new Meter();
+
+ public void mark() {
+ meter.mark(1l);
+ }
+
+ @Override
+ public void update(Number obj) {
+ meter.mark(obj.longValue());
+ for (AsmMetric metric : this.assocMetrics) {
+ metric.update(obj);
+ }
+ }
+
+
+ @Override
+ public AsmMetric clone() {
+ return new AsmMeter();
+ }
+
+ @Override
+ public Map<Integer, Meter> getWindowMetricMap() {
+ return null;
+ }
+
+ @Override
+ protected void doFlush() {
+ // nothing to do for meters.
+ }
+
+ @Override
+ protected void updateSnapshot(int window) {
+ AsmMeterSnapshot meterSnapshot = new AsmMeterSnapshot();
+ meterSnapshot.setM1(meter.getOneMinuteRate()).setM5(meter.getFiveMinuteRate()).setM15(meter.getFifteenMinuteRate()).setMean(meter.getMeanRate())
+ .setTs(System.currentTimeMillis()).setMetricId(metricId);
+ snapshots.put(window, meterSnapshot);
+ }
+
+ @Override
+ public Meter mkInstance() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java
new file mode 100644
index 0000000..d399e12
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmMetric.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.alibaba.jstorm.metric.AsmWindow;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.utils.TimeUtils;
+import com.codahale.metrics.Metric;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class AsmMetric<T extends Metric> {
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private static final Joiner JOINER = Joiner.on(".");
+
+ protected static final List<Integer> windowSeconds = Lists
+ .newArrayList(AsmWindow.M1_WINDOW, AsmWindow.M10_WINDOW, AsmWindow.H2_WINDOW, AsmWindow.D1_WINDOW);
+ protected static final List<Integer> nettyWindows = Lists.newArrayList(AsmWindow.M1_WINDOW);
+
+ protected static int minWindow = AsmWindow.M1_WINDOW;
+ protected static final List<Integer> EMPTY_WIN = Lists.newArrayListWithCapacity(0);
+ /**
+ * sample rate for meter, histogram and timer, note that counter & gauge are not sampled.
+ */
+ private static double sampleRate = ConfigExtension.DEFAULT_METRIC_SAMPLE_RATE;
+
+ protected int op = MetricOp.REPORT;
+ protected volatile long metricId = 0L;
+ protected String metricName;
+ protected boolean aggregate = true;
+ protected volatile long lastFlushTime = TimeUtils.current_time_secs() - AsmWindow.M1_WINDOW;
+ protected Map<Integer, Long> rollingTimeMap = new ConcurrentHashMap<>();
+ protected Map<Integer, Boolean> rollingDirtyMap = new ConcurrentHashMap<>();
+
+ protected final Map<Integer, AsmSnapshot> snapshots = new ConcurrentHashMap<Integer, AsmSnapshot>();
+
+ protected Set<AsmMetric> assocMetrics = new HashSet<AsmMetric>();
+
+ public AsmMetric() {
+ for (Integer win : windowSeconds) {
+ rollingTimeMap.put(win, lastFlushTime);
+ rollingDirtyMap.put(win, false);
+ }
+ }
+
+ /**
+ * keep a random for each instance to avoid competition (although it's thread-safe).
+ */
+ private final Random rand = new Random();
+
+ protected boolean sample() {
+ return rand.nextDouble() <= sampleRate;
+ }
+
+ public static void setSampleRate(double sampleRate) {
+ AsmMetric.sampleRate = sampleRate;
+ }
+
+ /**
+ * In order to improve performance
+ */
+ public abstract void update(Number obj);
+
+
+ public void updateDirectly(Number obj) {
+ update(obj);
+ }
+
+ public abstract AsmMetric clone();
+
+ public AsmMetric setOp(int op) {
+ this.op = op;
+ return this;
+ }
+
+ public int getOp() {
+ return this.op;
+ }
+
+ /**
+ * for test
+ */
+ public static void setWindowSeconds(List<Integer> windows) {
+ synchronized (windowSeconds) {
+ windowSeconds.clear();
+ windowSeconds.addAll(windows);
+
+ minWindow = getMinWindow(windows);
+ }
+ }
+
+ public static int getMinWindow(List<Integer> windows) {
+ int min = Integer.MAX_VALUE;
+ for (int win : windows) {
+ if (win < min) {
+ min = win;
+ }
+ }
+ return min;
+ }
+
+ public void addAssocMetrics(AsmMetric... metrics) {
+ Collections.addAll(assocMetrics, metrics);
+ }
+
+ public long getMetricId() {
+ return metricId;
+ }
+
+ public void setMetricId(long metricId) {
+ this.metricId = metricId;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ public void flush() {
+ long time = TimeUtils.current_time_secs();
+ List<Integer> windows = getValidWindows();
+ if (windows.size() == 0) {
+ return;
+ }
+
+ doFlush();
+
+ List<Integer> rollwindows = rollWindows(time, windows);
+
+ for (int win : windows) {
+ if (rollwindows.contains(win)) {
+ updateSnapshot(win);
+
+ Map<Integer, T> metricMap = getWindowMetricMap();
+ if (metricMap != null) {
+ metricMap.put(win, mkInstance());
+ }
+ } else if (!rollingDirtyMap.get(win)) {
+ //if this window has never been passed, we still update this window snapshot
+ updateSnapshot(win);
+ }
+ }
+ this.lastFlushTime = TimeUtils.current_time_secs();
+ }
+
+ public List<Integer> rollWindows(long time, List<Integer> windows) {
+ List<Integer> rolling = new ArrayList<>();
+ for (Integer win : windows) {
+ long rollingTime = rollingTimeMap.get(win);
+ // might delay somehow, so add extra 5 sec bias
+ if (time - rollingTime >= win - 5) {
+ rolling.add(win);
+ rollingDirtyMap.put(win, true); //mark this window has been passed
+ rollingTimeMap.put(win, (long) TimeUtils.current_time_secs());
+ }
+ }
+ return rolling;
+ }
+
+ /**
+ * flush temp data to all windows & assoc metrics.
+ */
+ protected abstract void doFlush();
+
+ public abstract Map<Integer, T> getWindowMetricMap();
+
+ public abstract T mkInstance();
+
+ protected abstract void updateSnapshot(int window);
+
+ public Map<Integer, AsmSnapshot> getSnapshots() {
+ return snapshots;
+ }
+
+ /**
+ * DO NOT judge whether to flush by 60sec because there might be nuance by the alignment of time(maybe less than 1 sec?)
+ * so we subtract 5 sec from a min flush window.
+ */
+ public List<Integer> getValidWindows() {
+ long diff = TimeUtils.current_time_secs() - this.lastFlushTime + 5;
+ if (diff < minWindow) {
+ // logger.warn("no valid windows for metric:{}, diff:{}", this.metricName, diff);
+ return EMPTY_WIN;
+ }
+ // for netty metrics, use only 1min window
+ if (this.metricName.startsWith(MetaType.NETTY.getV())) {
+ return nettyWindows;
+ }
+
+ return windowSeconds;
+ }
+
+ public boolean isAggregate() {
+ return aggregate;
+ }
+
+ public void setAggregate(boolean aggregate) {
+ this.aggregate = aggregate;
+ }
+
+ public static String mkName(Object... parts) {
+ return JOINER.join(parts);
+ }
+
+ public static class MetricOp {
+ public static final int LOG = 1;
+ public static final int REPORT = 2;
+ }
+
+ public static class Builder {
+ public static AsmMetric build(MetricType metricType) {
+ AsmMetric metric;
+ if (metricType == MetricType.COUNTER) {
+ metric = new AsmCounter();
+ } else if (metricType == MetricType.METER) {
+ metric = new AsmMeter();
+ } else if (metricType == MetricType.HISTOGRAM) {
+ metric = new AsmHistogram();
+ } else if (metricType == MetricType.TIMER) {
+ metric = new AsmTimer();
+ } else {
+ throw new IllegalArgumentException("invalid metric type:" + metricType);
+ }
+ return metric;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ AsmMeter meter = new AsmMeter();
+ int t = 0, f = 0;
+ for (int i = 0; i < 100; i++) {
+ if (meter.sample()) {
+ t++;
+ } else {
+ f++;
+ }
+ }
+ System.out.println(t + "," + f);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java
new file mode 100644
index 0000000..d9f46e1
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/AsmTimer.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.common.metric.snapshot.AsmMeterSnapshot;
+import com.alibaba.jstorm.common.metric.snapshot.AsmTimerSnapshot;
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * same as histogram, each window has a separate timer, which is recreated after the window cycle. note that all data in a timer are measured by nanoseconds. so
+ * for most cases, you can replace with histograms.
+ */
+public class AsmTimer extends AsmMetric<Timer> {
+ private final Map<Integer, Timer> timerMap = new ConcurrentHashMap<Integer, Timer>();
+ private Timer unFlushed = newTimer();
+
+ public AsmTimer() {
+ super();
+ for (int win : windowSeconds) {
+ timerMap.put(win, newTimer());
+ }
+ }
+
+ @Override
+ public void update(Number obj) {
+ if (sample()) {
+ this.unFlushed.update(obj.longValue(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void updateDirectly(Number obj) {
+ this.unFlushed.update(obj.longValue(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Map<Integer, Timer> getWindowMetricMap() {
+ return timerMap;
+ }
+
+ @Override
+ public Timer mkInstance() {
+ return newTimer();
+ }
+
+ @Override
+ protected void updateSnapshot(int window) {
+ Timer timer = timerMap.get(window);
+ if (timer != null){
+ AsmTimerSnapshot timerSnapshot = new AsmTimerSnapshot();
+ timerSnapshot.setHistogram(timer.getSnapshot());
+ timerSnapshot.setMeter(new AsmMeterSnapshot().setM1(timer.getOneMinuteRate()).setM5(timer.getFiveMinuteRate())
+ .setM15(timer.getFifteenMinuteRate()).setMean(timer.getMeanRate()));
+ if (metricId > 0) {
+ timerSnapshot.setMetricId(metricId);
+ }
+ timerSnapshot.setTs(System.currentTimeMillis());
+ snapshots.put(window, timerSnapshot);
+ }
+ }
+
+ /**
+ * flush temp timer data to all windows & assoc metrics.
+ */
+ protected void doFlush() {
+ long[] values = unFlushed.getSnapshot().getValues();
+ for (Timer timer : timerMap.values()) {
+ for (long val : values) {
+ timer.update(val, TimeUnit.MILLISECONDS);
+
+ for (AsmMetric metric : this.assocMetrics) {
+ metric.updateDirectly(val);
+ }
+ }
+ }
+ this.unFlushed = newTimer();
+ }
+
+ @Override
+ public AsmMetric clone() {
+ return new AsmTimer();
+ }
+
+ private Timer newTimer() {
+ return new Timer(new ExponentiallyDecayingReservoir());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java
deleted file mode 100755
index f9e97dd..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Counter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import com.alibaba.jstorm.common.metric.operator.convert.DefaultConvertor;
-import com.alibaba.jstorm.common.metric.operator.merger.SumMerger;
-import com.alibaba.jstorm.common.metric.operator.updater.AddUpdater;
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-/**
- * The class is similar to com.codahale.metrics.Counter
- *
- * Sum all window's value
- *
- * how to use Counter , please refer to Sampling Interface
- *
- * @author zhongyan.feng
- *
- * @param <T>
- */
-public class Counter<T extends Number> extends Metric<T, T> {
- private static final long serialVersionUID = -1362345159511508074L;
-
- /**
- *
- * @param defaultValue
- */
- public Counter(T zero) {
- updater = new AddUpdater<T>();
- merger = new SumMerger<T>();
- convertor = new DefaultConvertor<T>();
- defaultValue = zero;
-
- init();
- }
-
- public static void main(String[] args) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java
new file mode 100644
index 0000000..03d13be
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/CounterData.java
@@ -0,0 +1,34 @@
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+/**
+ * @author wange
+ * @since 15/6/23
+ */
+public class CounterData extends MetricBaseData implements KVSerializable {
+ private long v;
+
+ public long getV() {
+ return v;
+ }
+
+ public void setV(long v) {
+ this.v = v;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return Bytes.toBytes(v);
+ }
+
+ @Override
+ public Object fromKV(byte[] key, byte[] value) {
+ parseKey(key);
+ this.v = Bytes.toLong(value);
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java
deleted file mode 100755
index 30fa110..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Gauge.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-import com.alibaba.jstorm.common.metric.window.Metric;
-import com.alibaba.jstorm.common.metric.window.StatBuckets;
-
-public class Gauge<T extends Number> extends Metric<Number, Number> {
- private static final long serialVersionUID = 1985614006717750790L;
-
- protected com.codahale.metrics.Gauge<T> gauge;
-
- public Gauge(com.codahale.metrics.Gauge<T> gauge) {
- this.gauge = gauge;
-
- init();
- }
-
- @Override
- public void init() {
-
- }
-
- @Override
- public void update(Number obj) {
- // TODO Auto-generated method stub
- }
-
- @Override
- public Map<Integer, Number> getSnapshot() {
- // TODO Auto-generated method stub
- Number value = gauge.getValue();
-
- Map<Integer, Number> ret = new TreeMap<Integer, Number>();
- for (Integer timeKey : windowSeconds) {
- ret.put(timeKey, value);
- }
- ret.put(StatBuckets.ALL_TIME_WINDOW, value);
-
- return ret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java
new file mode 100644
index 0000000..134a194
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/GaugeData.java
@@ -0,0 +1,34 @@
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+/**
+ * @author wange
+ * @since 15/6/23
+ */
+public class GaugeData extends MetricBaseData implements KVSerializable {
+ private double v;
+
+ public double getV() {
+ return v;
+ }
+
+ public void setV(double v) {
+ this.v = v;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return Bytes.toBytes(v);
+ }
+
+ @Override
+ public Object fromKV(byte[] key, byte[] value) {
+ parseKey(key);
+ this.v = Bytes.toDouble(value);
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java
deleted file mode 100755
index 7276fdf..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import com.alibaba.jstorm.common.metric.operator.convert.Convertor;
-import com.alibaba.jstorm.common.metric.operator.merger.AvgMerger;
-import com.alibaba.jstorm.common.metric.operator.updater.AvgUpdater;
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-/**
- * Meter is used to compute tps
- *
- * Attention: 1.
- *
- * @author zhongyan.feng
- *
- */
-public class Histogram extends Metric<Double, Histogram.HistorgramPair> {
- private static final long serialVersionUID = -1362345159511508074L;
-
- public Histogram() {
- defaultValue =
- new HistorgramPair();
- updater = new AvgUpdater();
- merger = new AvgMerger();
- convertor = new HistogramConvertor();
-
- init();
- }
-
- public static class HistogramConvertor implements
- Convertor<HistorgramPair, Double> {
- private static final long serialVersionUID = -1569170826785657226L;
-
- @Override
- public Double convert(HistorgramPair from) {
- // TODO Auto-generated method stub
- if (from == null) {
- return 0.0d;
- }
-
- if (from.getTimes() == 0) {
- return 0.0d;
- } else {
- return from.getSum()/ from.getTimes();
- }
- }
-
- }
-
- public static class HistorgramPair {
- private double sum;
- private long times;
-
- public HistorgramPair() {
-
- }
-
- public HistorgramPair(double sum, long times){
- this.sum = sum;
- this.times = times;
- }
-
- public double getSum() {
- return sum;
- }
-
- public void setSum(double sum) {
- this.sum = sum;
- }
-
- public void addValue(double value) {
- sum += value;
- }
-
- public long getTimes() {
- return times;
- }
-
- public void setTimes(long times) {
- this.times = times;
- }
-
- public void addTimes(long time) {
- times += time;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak
deleted file mode 100755
index b830789..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Histogram.java.bak
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alibaba.jstorm.common.metric.operator.convert.Convertor;
-import com.alibaba.jstorm.common.metric.operator.merger.AvgMerger2;
-import com.alibaba.jstorm.common.metric.operator.updater.AvgUpdater2;
-import com.alibaba.jstorm.common.metric.window.Metric;
-import com.alibaba.jstorm.utils.Pair;
-import com.google.common.util.concurrent.AtomicDouble;
-
-/**
- * Meter is used to compute tps
- *
- * Attention: 1.
- *
- * @author zhongyan.feng
- *
- */
-public class Histogram extends Metric<Double, Pair<AtomicDouble, AtomicLong>> {
- private static final long serialVersionUID = -1362345159511508074L;
-
- public Histogram() {
- defaultValue =
- new Pair<AtomicDouble, AtomicLong>(new AtomicDouble(0.0),
- new AtomicLong(0));
- updater = new AvgUpdater2();
- merger = new AvgMerger2();
- convertor = new HistogramConvertor();
-
- init();
- }
-
- public static class HistogramConvertor implements
- Convertor<Pair<AtomicDouble, AtomicLong>, Double> {
- private static final long serialVersionUID = -1569170826785657226L;
-
- @Override
- public Double convert(Pair<AtomicDouble, AtomicLong> from) {
- // TODO Auto-generated method stub
- if (from == null) {
- return 0.0d;
- }
-
- if (from.getSecond().get() == 0) {
- return 0.0d;
- } else {
- return from.getFirst().get() / from.getSecond().get();
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java
new file mode 100644
index 0000000..5f5ae97
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/HistogramData.java
@@ -0,0 +1,135 @@
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+/**
+ * @author wange
+ * @since 15/6/23
+ */
+public class HistogramData extends MetricBaseData implements KVSerializable {
+ private long min;
+ private long max;
+ private double mean;
+ private double p50;
+ private double p75;
+ private double p95;
+ private double p98;
+ private double p99;
+ private double p999;
+ private double stddev;
+
+ public long getMin() {
+ return min;
+ }
+
+ public void setMin(long min) {
+ this.min = min;
+ }
+
+ public long getMax() {
+ return max;
+ }
+
+ public void setMax(long max) {
+ this.max = max;
+ }
+
+ public double getMean() {
+ return mean;
+ }
+
+ public void setMean(double mean) {
+ this.mean = mean;
+ }
+
+ public double getP50() {
+ return p50;
+ }
+
+ public void setP50(double p50) {
+ this.p50 = p50;
+ }
+
+ public double getP75() {
+ return p75;
+ }
+
+ public void setP75(double p75) {
+ this.p75 = p75;
+ }
+
+ public double getP95() {
+ return p95;
+ }
+
+ public void setP95(double p95) {
+ this.p95 = p95;
+ }
+
+ public double getP98() {
+ return p98;
+ }
+
+ public void setP98(double p98) {
+ this.p98 = p98;
+ }
+
+ public double getP99() {
+ return p99;
+ }
+
+ public void setP99(double p99) {
+ this.p99 = p99;
+ }
+
+ public double getP999() {
+ return p999;
+ }
+
+ public void setP999(double p999) {
+ this.p999 = p999;
+ }
+
+ public double getStddev() {
+ return stddev;
+ }
+
+ public void setStddev(double stddev) {
+ this.stddev = stddev;
+ }
+
+ @Override
+ public byte[] getValue() {
+ byte[] ret = new byte[8 * 9];
+ Bytes.putLong(ret, 0, min);
+ Bytes.putLong(ret, 8, max);
+ Bytes.putDouble(ret, 16, p50);
+ Bytes.putDouble(ret, 24, p75);
+ Bytes.putDouble(ret, 32, p95);
+ Bytes.putDouble(ret, 40, p98);
+ Bytes.putDouble(ret, 48, p99);
+ Bytes.putDouble(ret, 56, p999);
+ Bytes.putDouble(ret, 64, mean);
+
+ return ret;
+ }
+
+ @Override
+ public Object fromKV(byte[] key, byte[] value) {
+ parseKey(key);
+
+ this.min = Bytes.toLong(value, 0, KVSerializable.LONG_SIZE);
+ this.max = Bytes.toLong(value, 8, KVSerializable.LONG_SIZE);
+ this.p50 = Bytes.toDouble(value, 16);
+ this.p75 = Bytes.toDouble(value, 24);
+ this.p95 = Bytes.toDouble(value, 32);
+ this.p98 = Bytes.toDouble(value, 40);
+ this.p99 = Bytes.toDouble(value, 48);
+ this.p999 = Bytes.toDouble(value, 56);
+ this.mean = Bytes.toDouble(value, 64);
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java
deleted file mode 100755
index ac58912..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/LongCounter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alibaba.jstorm.common.metric.operator.convert.AtomicLongToLong;
-import com.alibaba.jstorm.common.metric.operator.merger.LongSumMerger;
-import com.alibaba.jstorm.common.metric.operator.updater.LongAddUpdater;
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public class LongCounter extends Metric<Long, AtomicLong> {
- private static final long serialVersionUID = -1362345159511508074L;
-
- public LongCounter() {
- super.defaultValue = new AtomicLong(0);
- super.updater = new LongAddUpdater();
- super.merger = new LongSumMerger();
- super.convertor = new AtomicLongToLong();
-
- init();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java
deleted file mode 100755
index e56d025..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Meter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import com.alibaba.jstorm.common.metric.operator.convert.DefaultConvertor;
-import com.alibaba.jstorm.common.metric.operator.merger.TpsMerger;
-import com.alibaba.jstorm.common.metric.operator.updater.AddUpdater;
-import com.alibaba.jstorm.common.metric.window.Metric;
-import com.alibaba.jstorm.common.metric.window.RollingWindow;
-
-/**
- * Meter is used to compute tps
- *
- * Attention: 1.
- *
- * @author zhongyan.feng
- *
- */
-public class Meter extends Metric<Double, Double> {
- private static final long serialVersionUID = -1362345159511508074L;
-
- public Meter() {
- defaultValue = 0.0d;
- updater = new AddUpdater<Double>();
- merger = new TpsMerger();
- convertor = new DefaultConvertor<Double>();
-
- init();
- }
-
- public void update() {
- update(Double.valueOf(1));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java
new file mode 100644
index 0000000..2df87aa
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MeterData.java
@@ -0,0 +1,71 @@
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+/**
+ * @author wange
+ * @since 15/6/23
+ */
+public class MeterData extends MetricBaseData implements KVSerializable {
+ private double m1;
+ private double m5;
+ private double m15;
+ private double mean;
+
+ public double getM1() {
+ return m1;
+ }
+
+ public void setM1(double m1) {
+ this.m1 = m1;
+ }
+
+ public double getM5() {
+ return m5;
+ }
+
+ public void setM5(double m5) {
+ this.m5 = m5;
+ }
+
+ public double getM15() {
+ return m15;
+ }
+
+ public void setM15(double m15) {
+ this.m15 = m15;
+ }
+
+ public double getMean() {
+ return mean;
+ }
+
+ public void setMean(double mean) {
+ this.mean = mean;
+ }
+
+ @Override
+ public byte[] getValue() {
+ byte[] ret = new byte[8 * 4];
+ Bytes.putDouble(ret, 0, m1);
+ Bytes.putDouble(ret, 8, m5);
+ Bytes.putDouble(ret, 16, m15);
+ Bytes.putDouble(ret, 24, mean);
+
+ return ret;
+ }
+
+ @Override
+ public Object fromKV(byte[] key, byte[] value) {
+ parseKey(key);
+
+ this.m1 = Bytes.toDouble(value, 0);
+ this.m5 = Bytes.toDouble(value, 8);
+ this.m15 = Bytes.toDouble(value, 16);
+ this.mean = Bytes.toDouble(value, 24);
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java
new file mode 100644
index 0000000..907762d
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricBaseData.java
@@ -0,0 +1,59 @@
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+import java.util.Date;
+
+/**
+ * @author wange
+ * @since 15/7/22
+ */
+public abstract class MetricBaseData implements KVSerializable {
+ protected long metricId;
+ protected int win;
+ protected Date ts;
+
+ public long getMetricId() {
+ return metricId;
+ }
+
+ public void setMetricId(long metricId) {
+ this.metricId = metricId;
+ }
+
+ public Date getTs() {
+ return ts;
+ }
+
+ public void setTs(Date ts) {
+ this.ts = ts;
+ }
+
+ public int getWin() {
+ return win;
+ }
+
+ public void setWin(int win) {
+ this.win = win;
+ }
+
+ @Override
+ public byte[] getKey() {
+ return makeKey(metricId, win, ts.getTime());
+ }
+
+ public static byte[] makeKey(long metricId, int win, long ts) {
+ byte[] ret = new byte[8 + 4 + 8];
+ Bytes.putLong(ret, 0, metricId);
+ Bytes.putInt(ret, 8, win);
+ Bytes.putLong(ret, 12, ts);
+ return ret;
+ }
+
+ protected void parseKey(byte[] key) {
+ this.metricId = Bytes.toLong(key, 0, KVSerializable.LONG_SIZE);
+ this.win = Bytes.toInt(key, 8, KVSerializable.INT_SIZE);
+ this.ts = new Date(Bytes.toLong(key, 12, KVSerializable.LONG_SIZE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java
deleted file mode 100755
index 92b1f6b..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricFilter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.io.Serializable;
-
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public interface MetricFilter extends Serializable {
- /**
- * Matches all metrics, regardless of type or name.
- */
- MetricFilter ALL = new MetricFilter() {
- private static final long serialVersionUID = 7089987006352295530L;
-
- @Override
- public boolean matches(String name, Metric metric) {
- return true;
- }
- };
-
- /**
- * Returns {@code true} if the metric matches the filter; {@code false}
- * otherwise.
- *
- * @param name the metric's name
- * @param metric the metric
- * @return {@code true} if the metric matches the filter
- */
- boolean matches(String name, Metric metric);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java
new file mode 100644
index 0000000..da6b4dd
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMeta.java
@@ -0,0 +1,213 @@
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.metric.KVSerializable;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.utils.JStormUtils;
+
+import java.util.Date;
+
+/**
+ * @author wange
+ * @since 15/6/18
+ */
+public class MetricMeta implements KVSerializable {
+ // common
+ private long id;
+ // string id
+ private String sid;
+ private String clusterName;
+ private String topologyId;
+ private int metricType;
+ private String metricGroup = MetricUtils.DEFAULT_GROUP;//sys group
+ private String metricName;
+ private Date gmtCreate = new Date();
+
+ // task meta
+ private String component = MetricUtils.EMPTY;
+ private int taskId = 0;
+ private String streamId = MetricUtils.EMPTY;
+ private int metaType;
+
+ // worker meta
+ private String host = MetricUtils.EMPTY;
+ private int port = 0;
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ this.sid = id + "";
+ }
+
+ public String getSid() {
+ return sid;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getTopologyId() {
+ return topologyId;
+ }
+
+ public void setTopologyId(String topologyId) {
+ this.topologyId = topologyId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public int getMetricType() {
+ return metricType;
+ }
+
+ public void setMetricType(int metricType) {
+ this.metricType = metricType;
+ }
+
+ public String getMetricGroup() {
+ return metricGroup;
+ }
+
+ public void setMetricGroup(String metricGroup) {
+ this.metricGroup = metricGroup;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ public Date getGmtCreate() {
+ return gmtCreate;
+ }
+
+ public void setGmtCreate(Date gmtCreate) {
+ this.gmtCreate = gmtCreate;
+ }
+
+ public String getComponent() {
+ return component;
+ }
+
+ public void setComponent(String component) {
+ this.component = component;
+ }
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ }
+
+ public int getMetaType() {
+ return metaType;
+ }
+
+ public void setMetaType(int metaType) {
+ this.metaType = metaType;
+ }
+
+ public boolean isWorkerMetric() {
+ return this.metaType == MetaType.NETTY.getT() || this.getMetaType() == MetaType.WORKER.getT() ||
+ this.metaType == MetaType.TOPOLOGY.getT();
+ }
+
+ public String getFQN() {
+ MetaType meta = MetaType.parse(metaType);
+ MetricType metric = MetricType.parse(metricType);
+ String types = meta.getV() + metric.getV();
+ if (isWorkerMetric()) {
+ return MetricUtils.concat2(types, topologyId, host, port, metricGroup, metricName);
+ }
+ return MetricUtils.concat2(types, topologyId, component, taskId, streamId, metricGroup, metricName);
+ }
+
+ /**
+ * key: clusterName + topologyId + metaType + id
+ */
+ @Override
+ public byte[] getKey() {
+ StringBuilder sb = new StringBuilder(64);
+ sb.append(clusterName).append(MetricUtils.AT).append(topologyId).append(MetricUtils.AT)
+ .append(metaType).append(MetricUtils.AT).append(id);
+ return sb.toString().getBytes();
+ }
+
+ /**
+ * value: component + taskId + streamId + metricType + host + port + metricGroup + metricName
+ */
+ @Override
+ public byte[] getValue() {
+ StringBuilder sb = new StringBuilder(64);
+ sb.append(component).append(MetricUtils.AT).append(taskId).append(MetricUtils.AT)
+ .append(streamId).append(MetricUtils.AT).append(metricType).append(MetricUtils.AT)
+ .append(host).append(MetricUtils.AT).append(port).append(MetricUtils.AT)
+ .append(metricGroup).append(MetricUtils.AT).append(metricName);
+ return sb.toString().getBytes();
+ }
+
+ @Override
+ public Object fromKV(byte[] key, byte[] value) {
+ String[] keyParts = new String(key).split(MetricUtils.DELIM);
+ if (keyParts.length >= 4) {
+ this.clusterName = keyParts[0];
+ this.topologyId = keyParts[1];
+ this.metaType = Integer.valueOf(keyParts[2]);
+ this.id = Long.valueOf(keyParts[3]);
+ this.sid = this.id + "";
+ }
+ String[] valueParts = new String(value).split(MetricUtils.DELIM);
+ if (valueParts.length >= 8) {
+ this.component = valueParts[0];
+ this.taskId = JStormUtils.parseInt(valueParts[1], 0);
+ this.streamId = valueParts[2];
+ this.metricType = JStormUtils.parseInt(valueParts[3], 0);
+ this.host = valueParts[4];
+ this.port = JStormUtils.parseInt(valueParts[5], 0);
+ this.metricGroup = valueParts[6];
+ this.metricName = valueParts[7];
+ }
+ return this;
+ }
+
+ public static MetricMeta parse(String name) {
+ return MetricMetaParser.fromMetricName(name);
+ }
+
+}