You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:26 UTC
[47/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IFailValueSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IFailValueSpout.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IFailValueSpout.java
deleted file mode 100644
index 913709d..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/client/spout/IFailValueSpout.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.alibaba.jstorm.client.spout;
-
-import java.util.List;
-
-/**
- * This interface will list emit values when tuple fails
- *
- * if spout implement this interface,
- * spout won't call ISpout.fail() when tuple fail
- *
- * @author longda
- */
-public interface IFailValueSpout {
- void fail(Object msgId, List<Object> values);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java
deleted file mode 100644
index de64e2f..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/ClusterState.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package com.alibaba.jstorm.cluster;
-
-import java.util.List;
-import java.util.UUID;
-
-import com.alibaba.jstorm.callback.ClusterStateCallback;
-
-
-/**
- * All ZK interface
- *
- * @author yannian
- *
- */
-public interface ClusterState {
- public void set_ephemeral_node(String path, byte[] data) throws Exception;
-
- public void delete_node(String path) throws Exception;
-
- public void set_data(String path, byte[] data) throws Exception;
-
- public byte[] get_data(String path, boolean watch) throws Exception;
-
- public List<String> get_children(String path, boolean watch)
- throws Exception;
-
- public void mkdirs(String path) throws Exception;
-
- public void tryToBeLeader(String path, byte[] host) throws Exception;
-
- public void close();
-
- public UUID register(ClusterStateCallback callback);
-
- public ClusterStateCallback unregister(UUID id);
-
- public boolean node_existed(String path, boolean watch) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java
deleted file mode 100644
index 95224f0..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/cluster/DistributedClusterState.java
+++ /dev/null
@@ -1,175 +0,0 @@
-package com.alibaba.jstorm.cluster;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.log4j.Logger;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-import backtype.storm.Config;
-
-import com.alibaba.jstorm.callback.ClusterStateCallback;
-import com.alibaba.jstorm.callback.WatcherCallBack;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.PathUtils;
-import com.alibaba.jstorm.zk.Zookeeper;
-
-/**
- * All ZK interface implementation
- *
- * @author yannian.mu
- *
- */
-public class DistributedClusterState implements ClusterState {
-
- private static Logger LOG = Logger.getLogger(DistributedClusterState.class);
-
- private Zookeeper zkobj = new Zookeeper();
- private CuratorFramework zk;
- private WatcherCallBack watcher;
-
- /**
- * why run all callbacks, when receive one event
- */
- private ConcurrentHashMap<UUID, ClusterStateCallback> callbacks = new ConcurrentHashMap<UUID, ClusterStateCallback>();
-
- private Map<Object, Object> conf;
- private AtomicBoolean active;
-
- public DistributedClusterState(Map<Object, Object> _conf) throws Exception {
- conf = _conf;
-
- // just mkdir STORM_ZOOKEEPER_ROOT dir
- CuratorFramework _zk = mkZk();
- String path = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT));
- zkobj.mkdirs(_zk, path);
- _zk.close();
-
- active = new AtomicBoolean(true);
-
- watcher = new WatcherCallBack() {
- @Override
- public void execute(KeeperState state, EventType type, String path) {
- if (active.get()) {
- if (!(state.equals(KeeperState.SyncConnected))) {
- LOG.warn("Received event " + state + ":" + type + ":"
- + path + " with disconnected Zookeeper.");
- } else {
- LOG.info("Received event " + state + ":" + type + ":"
- + path);
- }
-
- if (!type.equals(EventType.None)) {
- for (Entry<UUID, ClusterStateCallback> e : callbacks
- .entrySet()) {
- ClusterStateCallback fn = e.getValue();
- fn.execute(type, path);
- }
- }
- }
- }
- };
- zk = null;
- zk = mkZk(watcher);
-
- }
-
- @SuppressWarnings("unchecked")
- private CuratorFramework mkZk() throws IOException {
- return zkobj.mkClient(conf,
- (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS),
- conf.get(Config.STORM_ZOOKEEPER_PORT), "");
- }
-
- @SuppressWarnings("unchecked")
- private CuratorFramework mkZk(WatcherCallBack watcher)
- throws NumberFormatException, IOException {
- return zkobj.mkClient(conf,
- (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS),
- conf.get(Config.STORM_ZOOKEEPER_PORT),
- String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcher);
- }
-
- @Override
- public void close() {
- this.active.set(false);
- zk.close();
- }
-
- @Override
- public void delete_node(String path) throws Exception {
- zkobj.deletereRcursive(zk, path);
- }
-
- @Override
- public List<String> get_children(String path, boolean watch)
- throws Exception {
- return zkobj.getChildren(zk, path, watch);
- }
-
- @Override
- public byte[] get_data(String path, boolean watch) throws Exception {
- return zkobj.getData(zk, path, watch);
- }
-
- @Override
- public void mkdirs(String path) throws Exception {
- zkobj.mkdirs(zk, path);
-
- }
-
- @Override
- public void set_data(String path, byte[] data) throws Exception {
- if (data.length > (JStormUtils.SIZE_1_K * 800))
- throw new Exception("Writing 800k+ data into ZK is not allowed!");
- if (zkobj.exists(zk, path, false)) {
- zkobj.setData(zk, path, data);
- } else {
- zkobj.mkdirs(zk, PathUtils.parent_path(path));
- zkobj.createNode(zk, path, data, CreateMode.PERSISTENT);
- }
-
- }
-
- @Override
- public void set_ephemeral_node(String path, byte[] data) throws Exception {
- zkobj.mkdirs(zk, PathUtils.parent_path(path));
- if (zkobj.exists(zk, path, false)) {
- zkobj.setData(zk, path, data);
- } else {
- zkobj.createNode(zk, path, data, CreateMode.EPHEMERAL);
- }
- }
-
- @Override
- public UUID register(ClusterStateCallback callback) {
- UUID id = UUID.randomUUID();
- this.callbacks.put(id, callback);
- return id;
- }
-
- @Override
- public ClusterStateCallback unregister(UUID id) {
- return this.callbacks.remove(id);
- }
-
- @Override
- public boolean node_existed(String path, boolean watch) throws Exception {
- // TODO Auto-generated method stub
- return zkobj.existsNode(zk, path, watch);
- }
-
- @Override
- public void tryToBeLeader(String path, byte[] host) throws Exception {
- // TODO Auto-generated method stub
- zkobj.createNode(zk, path, host, CreateMode.EPHEMERAL);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/common/stats/StatBuckets.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/common/stats/StatBuckets.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/common/stats/StatBuckets.java
deleted file mode 100644
index 26701fd..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/common/stats/StatBuckets.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package com.alibaba.jstorm.common.stats;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class StatBuckets {
-
- public static final Integer NUM_STAT_BUCKETS = 20;
-
- public static final Integer MINUTE_WINDOW = 600;
- public static final Integer HOUR_WINDOW = 10800;
- public static final Integer DAY_WINDOW = 86400;
-
- public static final String MINUTE_WINDOW_STR = "0d0h10m0s";
- public static final String HOUR_WINDOW_STR = "0d3h0m0s";
- public static final String DAY_WINDOW_STR = "1d0h0m0s";
- public static final String ALL_WINDOW_STR = "All-time";
-
- public static Integer[] STAT_BUCKETS = { MINUTE_WINDOW / NUM_STAT_BUCKETS,
- HOUR_WINDOW / NUM_STAT_BUCKETS, DAY_WINDOW / NUM_STAT_BUCKETS };
-
- private static final String[][] PRETTYSECDIVIDERS = {
- new String[] { "s", "60" }, new String[] { "m", "60" },
- new String[] { "h", "24" }, new String[] { "d", null } };
-
- /**
- * Service b
- *
- * @param key
- * @return
- */
- public static String parseTimeKey(Integer key) {
- if (key == 0) {
- return ALL_WINDOW_STR;
- } else {
- return String.valueOf(key);
- }
- }
-
- /**
- *
- * Default is the latest result
- *
- * @param showKey
- * @return
- */
- public static String getTimeKey(String showKey) {
- String window = null;
- if (showKey == null) {
- window = String.valueOf(MINUTE_WINDOW);
- } else if (showKey.equals(MINUTE_WINDOW_STR)) {
- window = String.valueOf(MINUTE_WINDOW);
- } else if (showKey.equals(HOUR_WINDOW_STR)) {
- window = String.valueOf(HOUR_WINDOW);
- } else if (showKey.equals(DAY_WINDOW_STR)) {
- window = String.valueOf(DAY_WINDOW);
- } else if (showKey.equals(ALL_WINDOW_STR)) {
- window = ALL_WINDOW_STR;
- } else {
- window = String.valueOf(MINUTE_WINDOW);
- }
-
- return window;
- }
-
- /**
- * Default is the latest result
- *
- * @param showStr
- * @return
- */
- public static String getShowTimeStr(String showStr) {
- if (showStr == null) {
- return MINUTE_WINDOW_STR;
- } else if (showStr.equals(MINUTE_WINDOW_STR)
- || showStr.equals(HOUR_WINDOW_STR)
- || showStr.equals(DAY_WINDOW_STR)
- || showStr.equals(ALL_WINDOW_STR)) {
- return showStr;
-
- } else {
- return MINUTE_WINDOW_STR;
- }
-
- }
-
- /**
- * seconds to string like 1d20h30m40s
- *
- * @param secs
- * @return
- */
- public static String prettyUptimeStr(int secs) {
- int diversize = PRETTYSECDIVIDERS.length;
-
- List<String> tmp = new ArrayList<String>();
- int div = secs;
- for (int i = 0; i < diversize; i++) {
- if (PRETTYSECDIVIDERS[i][1] != null) {
- Integer d = Integer.parseInt(PRETTYSECDIVIDERS[i][1]);
- tmp.add(div % d + PRETTYSECDIVIDERS[i][0]);
- div = div / d;
- } else {
- tmp.add(div + PRETTYSECDIVIDERS[i][0]);
- }
- }
-
- String rtn = "";
- int tmpSzie = tmp.size();
- for (int j = tmpSzie - 1; j > -1; j--) {
- rtn += tmp.get(j);
- }
- return rtn;
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java
deleted file mode 100644
index e9b76b3..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/common/stats/StaticsType.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.alibaba.jstorm.common.stats;
-
-public enum StaticsType {
- emitted, send_tps, recv_tps, acked, failed, transferred, process_latencies;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/JStormHistogram.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/JStormHistogram.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/JStormHistogram.java
deleted file mode 100644
index 863deaa..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/JStormHistogram.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-import com.codahale.metrics.Histogram;
-
-public class JStormHistogram {
- private static boolean isEnable = true;
-
- public static boolean isEnable() {
- return isEnable;
- }
-
- public static void setEnable(boolean isEnable) {
- JStormHistogram.isEnable = isEnable;
- }
-
- private Histogram instance;
- private String name;
-
- public JStormHistogram(String name, Histogram instance) {
- this.name = name;
- this.instance = instance;
- }
-
- public void update(int value) {
- if (isEnable == true) {
- instance.update(value);
- }
- }
-
- public void update(long value) {
- if (isEnable == true) {
- instance.update(value);
- }
- }
-
- public Histogram getInstance() {
- return instance;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/JStormTimer.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/JStormTimer.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/JStormTimer.java
deleted file mode 100644
index dac94db..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/JStormTimer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.log4j.Logger;
-
-import com.codahale.metrics.Timer;
-
-public class JStormTimer {
- private static final Logger LOG = Logger.getLogger(JStormTimer.class);
- private static boolean isEnable = true;
-
- public static boolean isEnable() {
- return isEnable;
- }
-
- public static void setEnable(boolean isEnable) {
- JStormTimer.isEnable = isEnable;
- }
-
-
- private Timer instance;
- private String name;
- public JStormTimer(String name, Timer instance) {
- this.name = name;
- this.instance = instance;
- this.timerContext = new AtomicReference<Timer.Context>();
- }
-
- /**
- * This logic isn't perfect, it will miss metrics when it is called
- * in the same time. But this method performance is better than
- * create a new instance wrapper Timer.Context
- */
- private AtomicReference<Timer.Context> timerContext = null;
- public void start() {
- if (JStormTimer.isEnable == false) {
- return ;
- }
-
- if (timerContext.compareAndSet(null, instance.time()) == false) {
- LOG.warn("Already start timer " + name);
- return ;
- }
-
- }
-
- public void stop() {
- Timer.Context context = timerContext.getAndSet(null);
- if (context != null) {
- context.stop();
- }
- }
-
- public Timer getInstance() {
- return instance;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricDef.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricDef.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricDef.java
deleted file mode 100644
index 882057e..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricDef.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-public class MetricDef {
- // metric name for task
- public static final String DESERIALIZE_QUEUE = "Deserialize_Queue";
- public static final String DESERIALIZE_TIME = "Deserialize_Time";
- public static final String SERIALIZE_QUEUE = "Serialize_Queue";
- public static final String SERIALIZE_TIME = "Serialize_Time";
- public static final String EXECUTE_QUEUE = "Executor_Queue";
- public static final String EXECUTE_TIME = "Execute_Time";
- public static final String ACKER_TIME = "Acker_Time";
- public static final String EMPTY_CPU_RATIO = "Empty_Cpu_Ratio";
- public static final String PENDING_MAP = "Pending_Num";
- public static final String EMIT_TIME = "Emit_Time";
-
- // metric name for worker
- public static final String NETWORK_MSG_TRANS_TIME = "Network_Transmit_Time";
- public static final String NETTY_SERV_DECODE_TIME = "Netty_Server_Decode_Time";
- public static final String DISPATCH_TIME = "Virtual_Port_Dispatch_Time";
- public static final String DISPATCH_QUEUE = "Virtual_Port_Dispatch_Queue";
- public static final String BATCH_TUPLE_TIME = "Batch_Tuple_Time";
- public static final String BATCH_TUPLE_QUEUE = "Batch_Tuple_Queue";
- public static final String DRAINER_TIME = "Drainer_Time";
- public static final String DRAINER_QUEUE = "Drainer_Queue";
- public static final String NETTY_CLI_SEND_TIME = "Netty_Client_Send_Time";
- public static final String NETTY_CLI_BATCH_SIZE = "Netty_Client_Send_Batch_Size";
- public static final String NETTY_CLI_SEND_PENDING = "Netty_Client_Send_Pendings";
- public static final String NETTY_CLI_SYNC_BATCH_QUEUE = "Netty_Client_Sync_BatchQueue";
- public static final String NETTY_CLI_SYNC_DISR_QUEUE = "Netty_Client_Sync_DisrQueue";
-
- public static final String ZMQ_SEND_TIME = "ZMQ_Send_Time";
- public static final String ZMQ_SEND_MSG_SIZE = "ZMQ_Send_MSG_Size";
-
- public static final String CPU_USED_RATIO = "Used_Cpu";
- public static final String MEMORY_USED = "Used_Memory";
-
- public static final String REMOTE_CLI_ADDR = "Remote_Client_Address";
- public static final String REMOTE_SERV_ADDR = "Remote_Server_Address";
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricInfo.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricInfo.java
deleted file mode 100644
index 09a2a10..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricInfo.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-import com.codahale.metrics.Metric;
-
-public class MetricInfo {
- private Metric metric;
- private String prefix;
- private String name;
-
- public MetricInfo(String prefix, String name, Metric metric) {
- this.prefix = prefix;
- this.name = name;
- this.metric = metric;
- }
-
- public String getPrefix() {
- return prefix;
- }
-
- public String getName() {
- return name;
- }
-
- public Metric getMetric() {
- return metric;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java
deleted file mode 100644
index c60525a..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/MetricJstack.java
+++ /dev/null
@@ -1,123 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-
-import com.codahale.metrics.Gauge;
-
-public class MetricJstack implements Gauge<String> {
-
- private String getTaskName(long id, String name) {
- if (name == null) {
- return Long.toString(id);
- }
- return id + " (" + name + ")";
- }
-
- public String dumpThread() throws Exception {
- StringBuilder writer = new StringBuilder();
-
- ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
-
- boolean contention = threadMXBean.isThreadContentionMonitoringEnabled();
-
- long[] threadIds = threadMXBean.getAllThreadIds();
- writer.append(threadIds.length + " active threads:");
- for (long tid : threadIds) {
- writer.append(tid).append(" ");
- }
- writer.append("\n");
-
- long[] deadLockTids = threadMXBean.findDeadlockedThreads();
- if (deadLockTids != null) {
- writer.append(threadIds.length + " deadlocked threads:");
- for (long tid : deadLockTids) {
- writer.append(tid).append(" ");
- }
- writer.append("\n");
- }
-
- long[] deadLockMonitorTids = threadMXBean
- .findMonitorDeadlockedThreads();
- if (deadLockMonitorTids != null) {
- writer.append(threadIds.length + " deadlocked monitor threads:");
- for (long tid : deadLockMonitorTids) {
- writer.append(tid).append(" ");
- }
- writer.append("\n");
- }
-
- for (long tid : threadIds) {
- ThreadInfo info = threadMXBean
- .getThreadInfo(tid, Integer.MAX_VALUE);
- if (info == null) {
- writer.append(" Inactive").append("\n");
- continue;
- }
- writer.append(
- "Thread "
- + getTaskName(info.getThreadId(),
- info.getThreadName()) + ":").append("\n");
- Thread.State state = info.getThreadState();
- writer.append(" State: " + state).append("\n");
- writer.append(" Blocked count: " + info.getBlockedCount()).append(
- "\n");
- writer.append(" Waited count: " + info.getWaitedCount()).append(
- "\n");
- writer.append(" Cpu time:")
- .append(threadMXBean.getThreadCpuTime(tid) / 1000000)
- .append("ms").append("\n");
- writer.append(" User time:")
- .append(threadMXBean.getThreadUserTime(tid) / 1000000)
- .append("ms").append("\n");
- if (contention) {
- writer.append(" Blocked time: " + info.getBlockedTime())
- .append("\n");
- writer.append(" Waited time: " + info.getWaitedTime()).append(
- "\n");
- }
- if (state == Thread.State.WAITING) {
- writer.append(" Waiting on " + info.getLockName())
- .append("\n");
- } else if (state == Thread.State.BLOCKED) {
- writer.append(" Blocked on " + info.getLockName())
- .append("\n");
- writer.append(
- " Blocked by "
- + getTaskName(info.getLockOwnerId(),
- info.getLockOwnerName())).append("\n");
- }
-
- }
- for (long tid : threadIds) {
- ThreadInfo info = threadMXBean
- .getThreadInfo(tid, Integer.MAX_VALUE);
- if (info == null) {
- writer.append(" Inactive").append("\n");
- continue;
- }
-
- writer.append(
- "Thread "
- + getTaskName(info.getThreadId(),
- info.getThreadName()) + ": Stack").append(
- "\n");
- for (StackTraceElement frame : info.getStackTrace()) {
- writer.append(" " + frame.toString()).append("\n");
- }
- }
-
- return writer.toString();
- }
-
- @Override
- public String getValue() {
- try {
- return dumpThread();
- } catch (Exception e) {
- return "Failed to get jstack thread info";
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/Metrics.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/Metrics.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/Metrics.java
deleted file mode 100644
index 3e50c0a..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/Metrics.java
+++ /dev/null
@@ -1,330 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.utils.DisruptorQueue;
-
-import com.alibaba.jstorm.client.metric.MetricCallback;
-//import com.alibaba.jstorm.daemon.worker.Worker;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.MetricSet;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
-
-public class Metrics {
-
- public enum MetricType {
- TASK, WORKER
- }
-
- private static final Logger LOG = Logger.getLogger(Metrics.class);
- //private static final Logger DEFAULT_LOG = Logger.getLogger(Worker.class);
-
- private static final MetricRegistry metrics = new MetricRegistry();
-
- private static final MetricRegistry jstack = new MetricRegistry();
-
- private static Map<String, List<MetricInfo>> taskMetricMap = new ConcurrentHashMap<String, List<MetricInfo>>();
- private static List<MetricInfo> workerMetricList = new ArrayList<MetricInfo>();
- private static UserDefMetric userDefMetric = new UserDefMetric();
-
- static {
- try {
- registerAll("jvm-thread-state", new ThreadStatesGaugeSet());
- registerAll("jvm-mem", new MemoryUsageGaugeSet());
- registerAll("jvm-gc", new GarbageCollectorMetricSet());
-
- jstack.register("jstack", new MetricJstack());
- } catch (Exception e) {
- LOG.warn("Failed to regist jvm metrics");
- }
- }
-
- public static MetricRegistry getMetrics() {
- return metrics;
- }
-
- public static MetricRegistry getJstack() {
- return jstack;
- }
-
- public static UserDefMetric getUserDefMetric() {
- return userDefMetric;
- }
-
- public static boolean unregister(String name) {
- LOG.info("Unregister metric " + name);
- return metrics.remove(name);
- }
-
- public static boolean unregister(String prefix, String name, String id, Metrics.MetricType type) {
- String MetricName;
- if (prefix == null)
- MetricName = name;
- else
- MetricName = prefix + "-" + name;
- boolean ret = unregister(MetricName);
-
- if (ret == true) {
- List<MetricInfo> metricList = null;
- if (type == MetricType.WORKER) {
- metricList = workerMetricList;
- } else {
- metricList = taskMetricMap.get(id);
- }
-
- boolean found = false;
- if (metricList != null) {
- for (MetricInfo metric : metricList) {
- if(metric.getName().equals(name)) {
- if (prefix != null) {
- if (metric.getPrefix().equals(prefix)) {
- metricList.remove(metric);
- found = true;
- break;
- }
- } else {
- if (metric.getPrefix() == null) {
- metricList.remove(metric);
- found = true;
- break;
- }
- }
- }
- }
- }
- if (found != true)
- LOG.warn("Name " + name + " is not found when unregister from metricList");
- }
- return ret;
- }
-
- public static boolean unregisterUserDefine(String name) {
- boolean ret = unregister(name);
-
- if (ret == true) {
- userDefMetric.remove(name);
- userDefMetric.unregisterCallback(name);
- }
-
- return ret;
- }
-
- public static <T extends Metric> T register(String name, T metric)
- throws IllegalArgumentException {
- LOG.info("Register Metric " + name);
- return metrics.register(name, metric);
- }
-
- public static <T extends Metric> T register(String prefix, String name, T metric,
- String idStr, MetricType metricType) throws IllegalArgumentException {
- String metricName;
- if (prefix == null)
- metricName = name;
- else
- metricName = prefix + "-" + name;
- T ret = register(metricName, metric);
- updateMetric(prefix, name, metricType, ret, idStr);
- return ret;
- }
-
- public static void registerUserDefine(String name, Object metric, MetricCallback callback) {
- if(metric instanceof Gauge<?>) {
- userDefMetric.addToGauge(name, (Gauge<?>)metric);
- } else if (metric instanceof Timer) {
- userDefMetric.addToTimer(name, (Timer)metric);
- } else if (metric instanceof Counter) {
- userDefMetric.addToCounter(name, (Counter)metric);
- } else if (metric instanceof Meter) {
- userDefMetric.addToMeter(name, (Meter)metric);
- } else if (metric instanceof Histogram) {
- userDefMetric.addToHistogram(name, (Histogram)metric);
- } else if (metric instanceof JStormTimer) {
- userDefMetric.addToTimer(name, ((JStormTimer)metric).getInstance());
- } else if (metric instanceof JStormHistogram) {
- userDefMetric.addToHistogram(name, ((JStormHistogram)metric).getInstance());
- } else {
- LOG.warn("registerUserDefine, unknow Metric type, name=" + name);
- }
-
- if (callback != null) {
- userDefMetric.registerCallback(callback, name);
- }
- }
-
-
- // copy from MetricRegistry
- public static void registerAll(String prefix, MetricSet metrics)
- throws IllegalArgumentException {
- for (Map.Entry<String, Metric> entry : metrics.getMetrics().entrySet()) {
- if (entry.getValue() instanceof MetricSet) {
- registerAll(MetricRegistry.name(prefix, entry.getKey()),
- (MetricSet) entry.getValue());
- } else {
- register(MetricRegistry.name(prefix, entry.getKey()),
- entry.getValue());
- }
- }
- }
-
- private static void updateMetric(String prefix, String name, MetricType metricType,
- Metric metric, String idStr) {
- Map<String, List<MetricInfo>> metricMap;
- List<MetricInfo> metricList;
- if (metricType == MetricType.TASK) {
- metricMap = taskMetricMap;
- metricList = metricMap.get(idStr);
- if (null == metricList) {
- metricList = new ArrayList<MetricInfo>();
- metricMap.put(idStr, metricList);
- }
- } else if (metricType == MetricType.WORKER) {
- metricList = workerMetricList;
- } else {
- LOG.error("updateMetricMap: unknown metric type");
- return;
- }
-
- MetricInfo metricInfo = new MetricInfo(prefix, name, metric);
- metricList.add(metricInfo);
-
- }
-
- public static Map<String, List<MetricInfo>> getTaskMetricMap() {
- return taskMetricMap;
- }
-
- public static List<MetricInfo> getWorkerMetricList() {
- return workerMetricList;
- }
-
- public static class QueueGauge implements Gauge<Float> {
- DisruptorQueue queue;
- String name;
-
- public QueueGauge(String name, DisruptorQueue queue) {
- this.queue = queue;
- this.name = name;
- }
-
- @Override
- public Float getValue() {
- Float ret = queue.pctFull();
- if (ret > 0.8) {
- //DEFAULT_LOG.info("Queue " + name + "is full " + ret);
- }
-
- return ret;
- }
-
- }
-
- public static Gauge<Float> registerQueue(String name, DisruptorQueue queue) {
- LOG.info("Register Metric " + name);
- return metrics.register(name, new QueueGauge(name, queue));
- }
-
- public static Gauge<Float> registerQueue(String prefix, String name, DisruptorQueue queue,
- String idStr, MetricType metricType) {
- String metricName;
- if (prefix == null)
- metricName = name;
- else
- metricName = prefix + "-" + name;
- Gauge<Float> ret = registerQueue(metricName, queue);
- updateMetric(prefix, name, metricType, ret, idStr);
- return ret;
- }
-
- public static Gauge<?> registerGauge(String name, Gauge<?> gauge) {
- LOG.info("Register Metric " + name);
- return metrics.register(name, gauge);
- }
-
- public static Counter registerCounter(String name) {
- LOG.info("Register Metric " + name);
- return metrics.counter(name);
- }
-
- public static Counter registerCounter(String prefix, String name,
- String idStr, MetricType metricType) {
- String metricName;
- if (prefix == null)
- metricName = name;
- else
- metricName = prefix + "-" + name;
- Counter ret = registerCounter(metricName);
- updateMetric(prefix, name, metricType, ret, idStr);
- return ret;
- }
-
- public static Meter registerMeter(String name) {
- LOG.info("Register Metric " + name);
- return metrics.meter(name);
- }
-
- public static Meter registerMeter(String prefix, String name,
- String idStr, MetricType metricType) {
- String metricName;
- if (prefix == null)
- metricName = name;
- else
- metricName = prefix + "-" + name;
- Meter ret = registerMeter(metricName);
- updateMetric(prefix, name, metricType, ret, idStr);
- return ret;
- }
-
- public static JStormHistogram registerHistograms(String name) {
- LOG.info("Register Metric " + name);
- Histogram instance = metrics.histogram(name);
-
- return new JStormHistogram(name, instance);
- }
-
- public static JStormHistogram registerHistograms(String prefix, String name,
- String idStr, MetricType metricType) {
- String metricName;
- if (prefix == null)
- metricName = name;
- else
- metricName = prefix + "-" + name;
- JStormHistogram ret = registerHistograms(metricName);
- updateMetric(prefix, name, metricType, ret.getInstance(), idStr);
- return ret;
- }
-
- public static JStormTimer registerTimer(String name) {
- LOG.info("Register Metric " + name);
-
- Timer instance = metrics.timer(name);
- return new JStormTimer(name, instance);
- }
-
- public static JStormTimer registerTimer(String prefix, String name,
- String idStr, MetricType metricType) {
- String metricName;
- if (prefix == null)
- metricName = name;
- else
- metricName = prefix + "-" + name;
- JStormTimer ret = registerTimer(metricName);
- updateMetric(prefix, name, metricType, ret.getInstance(), idStr);
- return ret;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetric.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetric.java
deleted file mode 100644
index 51f787e..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetric.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.io.Serializable;
-
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Sampling;
-import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.alibaba.jstorm.client.metric.MetricCallback;
-import com.alibaba.jstorm.metric.MetricInfo;
-
-
-/**
- * /storm-zk-root/Monitor/{topologyid}/UserDefMetrics/{workerid} data
- */
-public class UserDefMetric {
-
- private static final long serialVersionUID = 4547327064057659279L;
-
- private Map<String, Gauge<?>> gaugeMap = new HashMap<String, Gauge<?>>();
- private Map<String, Counter> counterMap = new HashMap<String, Counter>();
- private Map<String, Histogram> histogramMap = new HashMap<String, Histogram>();
- private Map<String, Timer> timerMap = new HashMap<String, Timer>();
- private Map<String, Meter> meterMap = new HashMap<String, Meter>();
- private Map<String, MetricCallback> callbacks = new HashMap<String, MetricCallback>();
-
- public UserDefMetric() {
- }
-
- public Map<String, Gauge<?>> getGauge() {
- return this.gaugeMap;
- }
- public void registerCallback(MetricCallback callback, String name) {
- if (callbacks.containsKey(name) != true) {
- callbacks.put(name, callback);
- }
- }
- public void unregisterCallback(String name) {
- callbacks.remove(name);
- }
- public Map<String, MetricCallback> getCallbacks() {
- return callbacks;
- }
- public void addToGauge(String name, Gauge<?> gauge) {
- gaugeMap.put(name, gauge);
- }
-
- public Map<String, Counter> getCounter() {
- return this.counterMap;
- }
-
- public void addToCounter(String name, Counter counter) {
- counterMap.put(name, counter);
- }
-
- public Map<String, Histogram> getHistogram() {
- return this.histogramMap;
- }
-
- public void addToHistogram(String name, Histogram histogram) {
- histogramMap.put(name, histogram);
- }
-
-
- public Map<String, Timer> getTimer() {
- return this.timerMap;
- }
-
- public void addToTimer(String name, Timer timer) {
- timerMap.put(name, timer);
- }
-
- public Map<String, Meter> getMeter() {
- return this.meterMap;
- }
-
- public void addToMeter(String name, Meter meter) {
- meterMap.put(name, meter);
- }
-
- public void remove(String name) {
- if (gaugeMap.containsKey(name)) {
- gaugeMap.remove(name);
- } else if (counterMap.containsKey(name)) {
- counterMap.remove(name);
- } else if (histogramMap.containsKey(name)) {
- histogramMap.remove(name);
- } else if (timerMap.containsKey(name)) {
- timerMap.remove(name);
- } else if (meterMap.containsKey(name)) {
- meterMap.remove(name);
- }
-
- if (callbacks.containsKey(name)) {
- callbacks.remove(name);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetricData.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetricData.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetricData.java
deleted file mode 100644
index 7ca0860..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/UserDefMetricData.java
+++ /dev/null
@@ -1,138 +0,0 @@
-package com.alibaba.jstorm.metric;
-
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Map.Entry;
-import java.io.Serializable;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.apache.log4j.Logger;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Timer;
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Meter;
-import com.alibaba.jstorm.metric.metrdata.*;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-
-/**
- * /storm-zk-root/Monitor/{topologyid}/user/{workerid} data
- */
-public class UserDefMetricData implements Serializable {
- private static final Logger LOG = Logger.getLogger(UserDefMetricData.class);
-
- private static final long serialVersionUID = 954727168057659270L;
-
- private Map<String, GaugeData> gaugeDataMap = new HashMap<String, GaugeData>();
- private Map<String, CounterData> counterDataMap = new HashMap<String, CounterData>();
- private Map<String, TimerData> timerDataMap = new HashMap<String, TimerData>();
- private Map<String, MeterData> meterDataMap = new HashMap<String, MeterData>();
- private Map<String, HistogramData> histogramDataMap = new HashMap<String, HistogramData>();
-
- public UserDefMetricData() {
- }
-
- public Map<String, GaugeData> getGaugeDataMap() {
- return gaugeDataMap;
- }
-
- public Map<String, CounterData> getCounterDataMap() {
- return counterDataMap;
- }
-
- public Map<String, TimerData> getTimerDataMap() {
- return timerDataMap;
- }
-
- public Map<String, MeterData> getMeterDataMap() {
- return meterDataMap;
- }
-
- public Map<String, HistogramData> getHistogramDataMap() {
- return histogramDataMap;
- }
-
- public void updateFromGauge(Map<String, Gauge<?>> gaugeMap) {
- for(Entry<String, Gauge<?>> entry : gaugeMap.entrySet()) {
- try {
- GaugeData gaugeData = new GaugeData();
- gaugeData.setValue(JStormUtils.parseDouble(entry.getValue().getValue()));
- gaugeDataMap.put(entry.getKey(), gaugeData);
- } catch (Throwable e) {
- LOG.error("updateFromGauge exception ", e);
- }
- }
- }
-
- public void updateFromCounter(Map<String, Counter> counterMap) {
- for(Entry<String, Counter> entry : counterMap.entrySet()) {
- CounterData counterData = new CounterData();
- counterData.setValue(entry.getValue().getCount());
- counterDataMap.put(entry.getKey(), counterData);
- }
- }
-
- public void updateFromMeterData(Map<String, Meter> meterMap) {
- for(Entry<String, Meter> entry : meterMap.entrySet()) {
- Meter meter = entry.getValue();
- MeterData meterData = new MeterData();
- meterData.setCount(meter.getCount());
- meterData.setMeanRate(meter.getMeanRate());
- meterData.setOneMinuteRate(meter.getOneMinuteRate());
- meterData.setFiveMinuteRate(meter.getFiveMinuteRate());
- meterData.setFifteenMinuteRate(meter.getFifteenMinuteRate());
- meterDataMap.put(entry.getKey(), meterData);
- }
- }
-
- public void updateFromHistogramData(Map<String, Histogram> histogramMap) {
- for(Entry<String, Histogram> entry : histogramMap.entrySet()) {
- Histogram histogram = entry.getValue();
- HistogramData histogramData = new HistogramData();
- histogramData.setCount(histogram.getCount());
- histogramData.setMax(histogram.getSnapshot().getMax());
- histogramData.setMin(histogram.getSnapshot().getMin());
- histogramData.setMean(histogram.getSnapshot().getMean());
- histogramData.setMedian(histogram.getSnapshot().getMedian());
- histogramData.setStdDev(histogram.getSnapshot().getStdDev());
- histogramData.setPercent75th(histogram.getSnapshot().get75thPercentile());
- histogramData.setPercent95th(histogram.getSnapshot().get95thPercentile());
- histogramData.setPercent98th(histogram.getSnapshot().get98thPercentile());
- histogramData.setPercent99th(histogram.getSnapshot().get99thPercentile());
- histogramData.setPercent999th(histogram.getSnapshot().get999thPercentile());
- histogramDataMap.put(entry.getKey(), histogramData);
- }
- }
-
- public void updateFromTimerData(Map<String, Timer> timerMap) {
- for(Entry<String, Timer> entry : timerMap.entrySet()) {
- Timer timer = entry.getValue();
- TimerData timerData = new TimerData();
- timerData.setCount(timer.getCount());
- timerData.setMax(timer.getSnapshot().getMax());
- timerData.setMin(timer.getSnapshot().getMin());
- timerData.setMean(timer.getSnapshot().getMean());
- timerData.setMedian(timer.getSnapshot().getMedian());
- timerData.setStdDev(timer.getSnapshot().getStdDev());
- timerData.setPercent75th(timer.getSnapshot().get75thPercentile());
- timerData.setPercent95th(timer.getSnapshot().get95thPercentile());
- timerData.setPercent98th(timer.getSnapshot().get98thPercentile());
- timerData.setPercent99th(timer.getSnapshot().get99thPercentile());
- timerData.setPercent999th(timer.getSnapshot().get999thPercentile());
- timerData.setMeanRate(timer.getMeanRate());
- timerData.setOneMinuteRate(timer.getOneMinuteRate());
- timerData.setFiveMinuteRate(timer.getFiveMinuteRate());
- timerData.setFifteenMinuteRate(timer.getFifteenMinuteRate());
- timerDataMap.put(entry.getKey(), timerData);
- }
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/CounterData.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/CounterData.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/CounterData.java
deleted file mode 100644
index 727cb9d..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/CounterData.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.alibaba.jstorm.metric.metrdata;
-
-import java.io.Serializable;
-
-
-public class CounterData implements Serializable {
-
- private static final long serialVersionUID = 954627168057659219L;
-
- private long value;
-
- public CounterData () {
- value = 0l;
- }
-
- public long getValue() {
- return value;
- }
-
- public void setValue(long value) {
- this.value = value;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/GaugeData.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/GaugeData.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/GaugeData.java
deleted file mode 100644
index 9f64bf3..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/GaugeData.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.alibaba.jstorm.metric.metrdata;
-
-import java.io.Serializable;
-
-
-public class GaugeData implements Serializable {
-
- private static final long serialVersionUID = 954627168057659279L;
-
- private double value;
-
- public GaugeData () {
- value = 0.0;
- }
-
- public double getValue() {
- return value;
- }
-
- public void setValue(double value) {
- this.value = value;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/HistogramData.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/HistogramData.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/HistogramData.java
deleted file mode 100644
index ec39851..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/HistogramData.java
+++ /dev/null
@@ -1,112 +0,0 @@
-package com.alibaba.jstorm.metric.metrdata;
-
-import java.io.Serializable;
-
-
-public class HistogramData implements Serializable {
-
- private static final long serialVersionUID = 954627168057639289L;
-
- private long count;
- private long min;
- private long max;
- private double mean;
- private double stdDev;
- private double median;
- private double percent75th;
- private double percent95th;
- private double percent98th;
- private double percent99th;
- private double percent999th;
-
- public HistogramData() {
- }
-
- public long getCount() {
- return count;
- }
-
- public void setCount(long count) {
- this.count = count;
- }
-
- public long getMin() {
- return min;
- }
-
- public void setMin(long min) {
- this.min = min;
- }
-
- public long getMax() {
- return max;
- }
-
- public void setMax(long max) {
- this.max = max;
- }
-
- public double getMean() {
- return mean;
- }
-
- public void setMean(double mean) {
- this.mean = mean;
- }
-
- public double getStdDev() {
- return stdDev;
- }
-
- public void setStdDev(double stdDev) {
- this.stdDev = stdDev;
- }
-
- public double getMedian() {
- return median;
- }
-
- public void setMedian(double median) {
- this.median = median;
- }
-
- public double getPercent75th() {
- return percent75th;
- }
-
- public void setPercent75th(double percent75th) {
- this.percent75th = percent75th;
- }
-
- public double getPercent95th() {
- return percent95th;
- }
-
- public void setPercent95th(double percent95th) {
- this.percent95th = percent95th;
- }
-
- public double getPercent98th() {
- return percent98th;
- }
-
- public void setPercent98th(double percent98th) {
- this.percent98th = percent98th;
- }
-
- public double getPercent99th() {
- return percent99th;
- }
-
- public void setPercent99th(double percent99th) {
- this.percent99th = percent99th;
- }
-
- public double getPercent999th() {
- return percent999th;
- }
-
- public void setPercent999th(double percent999th) {
- this.percent999th = percent999th;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/MeterData.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/MeterData.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/MeterData.java
deleted file mode 100644
index 865a3c4..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/MeterData.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.alibaba.jstorm.metric.metrdata;
-
-import java.io.Serializable;
-
-
-public class MeterData implements Serializable {
-
- private static final long serialVersionUID = 954627168057659269L;
-
- private long count;
- private double meanRate;
- private double oneMinuteRate;
- private double fiveMinuteRate;
- private double fifteenMinuteRate;
-
- public MeterData() {
- }
-
- public void setCount(long count) {
- this.count = count;
- }
-
- public long getCount() {
- return this.count;
- }
-
- public void setMeanRate(double meanRate) {
- this.meanRate = meanRate;
- }
-
- public double getMeanRate() {
- return this.meanRate;
- }
-
- public void setOneMinuteRate(double oneMinuteRate) {
- this.oneMinuteRate = oneMinuteRate;
- }
-
- public double getOneMinuteRate() {
- return this.oneMinuteRate;
- }
-
- public void setFiveMinuteRate(double fiveMinuteRate) {
- this.fiveMinuteRate = fiveMinuteRate;
- }
-
- public double getFiveMinuteRate() {
- return this.fiveMinuteRate;
- }
-
- public void setFifteenMinuteRate(double fifteenMinuteRate) {
- this.fifteenMinuteRate = fifteenMinuteRate;
- }
-
- public double getFifteenMinuteRate() {
- return this.fifteenMinuteRate;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/TimerData.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/TimerData.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/TimerData.java
deleted file mode 100644
index 5aaab01..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/metric/metrdata/TimerData.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package com.alibaba.jstorm.metric.metrdata;
-
-import java.io.Serializable;
-
-
-public class TimerData implements Serializable {
-
- private static final long serialVersionUID = 954627168057659239L;
-
- private long count;
- private double meanRate;
- private double oneMinuteRate;
- private double fiveMinuteRate;
- private double fifteenMinuteRate;
- private long min;
- private long max;
- private double mean;
- private double stdDev;
- private double median;
- private double percent75th;
- private double percent95th;
- private double percent98th;
- private double percent99th;
- private double percent999th;
-
- public TimerData() {
-
- }
-
- public long getCount() {
- return count;
- }
-
- public void setCount(long count) {
- this.count = count;
- }
-
- public long getMin() {
- return min;
- }
-
- public void setMin(long min) {
- this.min = min;
- }
-
- public long getMax() {
- return max;
- }
-
- public void setMax(long max) {
- this.max = max;
- }
-
- public double getMean() {
- return mean;
- }
-
- public void setMean(double mean) {
- this.mean = mean;
- }
-
- public double getStdDev() {
- return stdDev;
- }
-
- public void setStdDev(double stdDev) {
- this.stdDev = stdDev;
- }
-
- public double getMedian() {
- return median;
- }
-
- public void setMedian(double median) {
- this.median = median;
- }
-
- public double getPercent75th() {
- return percent75th;
- }
-
- public void setPercent75th(double percent75th) {
- this.percent75th = percent75th;
- }
-
- public double getPercent95th() {
- return percent95th;
- }
-
- public void setPercent95th(double percent95th) {
- this.percent95th = percent95th;
- }
-
- public double getPercent98th() {
- return percent98th;
- }
-
- public void setPercent98th(double percent98th) {
- this.percent98th = percent98th;
- }
-
- public double getPercent99th() {
- return percent99th;
- }
-
- public void setPercent99th(double percent99th) {
- this.percent99th = percent99th;
- }
-
- public double getPercent999th() {
- return percent999th;
- }
-
- public void setPercent999th(double percent999th) {
- this.percent999th = percent999th;
- }
-
- public void setMeanRate(double meanRate) {
- this.meanRate = meanRate;
- }
-
- public double getMeanRate() {
- return this.meanRate;
- }
-
- public void setOneMinuteRate(double oneMinuteRate) {
- this.oneMinuteRate = oneMinuteRate;
- }
-
- public double getOneMinuteRate() {
- return this.oneMinuteRate;
- }
-
- public void setFiveMinuteRate(double fiveMinuteRate) {
- this.fiveMinuteRate = fiveMinuteRate;
- }
-
- public double getFiveMinuteRate() {
- return this.fiveMinuteRate;
- }
-
- public void setFifteenMinuteRate(double fifteenMinuteRate) {
- this.fifteenMinuteRate = fifteenMinuteRate;
- }
-
- public double getFifteenMinuteRate() {
- return this.fifteenMinuteRate;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
deleted file mode 100644
index b127095..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/DisruptorQueue.java
+++ /dev/null
@@ -1,151 +0,0 @@
-//package com.alibaba.jstorm.utils;
-//
-//import java.util.ArrayList;
-//import java.util.List;
-//import java.util.concurrent.Executor;
-//import java.util.concurrent.atomic.AtomicBoolean;
-//
-//import org.apache.commons.lang.mutable.MutableObject;
-//
-//import com.lmax.disruptor.EventFactory;
-//import com.lmax.disruptor.ExceptionHandler;
-//import com.lmax.disruptor.FatalExceptionHandler;
-//import com.lmax.disruptor.RingBuffer;
-//import com.lmax.disruptor.Sequence;
-//import com.lmax.disruptor.SequenceBarrier;
-//import com.lmax.disruptor.Sequencer;
-//import com.lmax.disruptor.WaitStrategy;
-//import com.lmax.disruptor.WorkHandler;
-//import com.lmax.disruptor.WorkProcessor;
-//import com.lmax.disruptor.util.Util;
-//
-//public class DisruptorQueue<T> {
-// private final RingBuffer<MutableObject> ringBuffer;
-// private final SequenceBarrier sequenceBarrier;
-// private final ExceptionHandler exceptionHandler;
-// private final List<WorkProcessor> workProcessors;
-// private final Sequence workSequence;
-// private final AtomicBoolean started = new AtomicBoolean(false);
-//
-// public DisruptorQueue(boolean isMultiProducer, int bufferSize,
-// WaitStrategy waitStrategy) {
-// if (isMultiProducer) {
-// ringBuffer = RingBuffer.createMultiProducer(
-// new ObjectEventFactory(), bufferSize, waitStrategy);
-// } else {
-// ringBuffer = RingBuffer.createSingleProducer(
-// new ObjectEventFactory(), bufferSize, waitStrategy);
-// }
-//
-// sequenceBarrier = ringBuffer.newBarrier();
-// exceptionHandler = new FatalExceptionHandler();
-// workProcessors = new ArrayList<WorkProcessor>();
-// workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
-// }
-//
-// public void register(WorkHandler<T> handler) {
-// WorkProcessor workProcessor = new WorkProcessor(ringBuffer,
-// sequenceBarrier, new HandleWraper(handler), exceptionHandler,
-// workSequence);
-//
-// ringBuffer.addGatingSequences(workProcessor.getSequence());
-//
-// workProcessors.add(workProcessor);
-// }
-//
-// void cleanup() {
-//
-// }
-//
-// /**
-// * Start the worker pool processing events in sequence.
-// *
-// * @param executor
-// * providing threads for running the workers.
-// * @return the {@link RingBuffer} used for the work queue.
-// * @throws IllegalStateException
-// * if the pool has already been started and not halted yet
-// */
-// public void start() {
-// if (!started.compareAndSet(false, true)) {
-// throw new IllegalStateException(
-// "WorkerPool has already been started and cannot be restarted until halted.");
-// }
-//
-// final long cursor = ringBuffer.getCursor();
-// workSequence.set(cursor);
-//
-// for (WorkProcessor<T> processor : workProcessors) {
-// processor.getSequence().set(cursor);
-// new Thread(processor).start();
-// }
-//
-// return;
-// }
-//
-// public Sequence[] getWorkerSequences() {
-// final Sequence[] sequences = new Sequence[workProcessors.size()];
-// for (int i = 0, size = workProcessors.size(); i < size; i++) {
-// sequences[i] = workProcessors.get(i).getSequence();
-// }
-//
-// return sequences;
-// }
-//
-// /**
-// * Wait for the {@link RingBuffer} to drain of published events then halt
-// * the workers.
-// */
-// public void drainAndHalt() {
-// Sequence[] workerSequences = getWorkerSequences();
-// while (ringBuffer.getCursor() > Util
-// .getMinimumSequence(workerSequences)) {
-// Thread.yield();
-// }
-//
-// for (WorkProcessor<?> processor : workProcessors) {
-// processor.halt();
-// }
-//
-// started.set(false);
-// }
-//
-// /**
-// * Halt all workers immediately at the end of their current cycle.
-// */
-// public void halt() {
-// for (WorkProcessor<?> processor : workProcessors) {
-// processor.halt();
-// }
-//
-// started.set(false);
-// }
-//
-// public void offer(T o) {
-// long sequence = ringBuffer.next();
-// ringBuffer.get(sequence).setValue(o);
-// ringBuffer.publish(sequence);
-// }
-//
-// public static class ObjectEventFactory implements
-// EventFactory<MutableObject> {
-//
-// public MutableObject newInstance() {
-// return new MutableObject();
-// }
-// }
-//
-// public static class HandleWraper<T> implements WorkHandler<MutableObject> {
-// private WorkHandler<T> handler;
-//
-// public HandleWraper(WorkHandler<T> handler) {
-// this.handler = handler;
-// }
-//
-// public void onEvent(MutableObject event) throws Exception {
-// // TODO Auto-generated method stub
-// handler.onEvent((T) event.getValue());
-// }
-//
-// }
-// }
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/EventSampler.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/EventSampler.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/EventSampler.java
deleted file mode 100644
index c0bff67..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/EventSampler.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package com.alibaba.jstorm.utils;
-
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- *
- * statistics tuples: sampling event
- *
- *
- * @author yannian/Longda
- *
- */
-public class EventSampler {
- private volatile int freq;
- private AtomicInteger i = new AtomicInteger(0);
- private volatile int target;
- private Random r = new Random();
-
- public EventSampler(int freq) {
- this.freq = freq;
- this.target = r.nextInt(freq);
-
- if (freq / 4 > 1) {
- intervalCheck.setInterval(freq / 4);
- }
- }
-
- /**
- * select 1/freq
- *
- * @return
- */
- public boolean countCheck() {
- i.incrementAndGet();
- if (i.get() > freq) {
- target = r.nextInt(freq);
- i.set(0);
- }
- if (i.get() == target) {
- return true;
- }
- return false;
- }
-
- private AtomicInteger counter = new AtomicInteger(0);
- private AtomicLong sum = new AtomicLong(0);
- private IntervalCheck intervalCheck = new IntervalCheck();
-
- public Integer tpsCheck() {
- int send = counter.incrementAndGet();
-
- Double pastSeconds = intervalCheck.checkAndGet();
- if (pastSeconds != null) {
- counter.set(0);
-
- return Integer.valueOf((int) (send / pastSeconds));
-
- }
-
- return null;
- }
-
- public Integer timesCheck() {
- int send = counter.incrementAndGet();
-
- Double pastSeconds = intervalCheck.checkAndGet();
- if (pastSeconds != null) {
- counter.set(0);
-
- return send;
-
- }
-
- return null;
- }
-
- public Pair<Integer, Double> avgCheck(long one) {
- int send = counter.incrementAndGet();
- long total = sum.addAndGet(one);
-
- Double pastSeconds = intervalCheck.checkAndGet();
- if (pastSeconds != null) {
- counter.set(0);
- sum.set(0);
-
- Double avg = Double.valueOf(0);
- if (send != 0) {
- avg = ((double)total)/send;
- }
-
-
- return new Pair<Integer, Double>(send, avg);
-
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/ExpiredCallback.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/ExpiredCallback.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/ExpiredCallback.java
deleted file mode 100644
index 4ee450b..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/ExpiredCallback.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package com.alibaba.jstorm.utils;
-
-public interface ExpiredCallback<K, V> {
- public void expire(K key, V val);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
deleted file mode 100644
index 46582b2..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/FileAttribute.java
+++ /dev/null
@@ -1,118 +0,0 @@
-package com.alibaba.jstorm.utils;
-
-import java.io.Serializable;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import org.json.simple.JSONAware;
-
-//import com.alibaba.fastjson.JSONAware;
-
-public class FileAttribute implements Serializable, JSONAware {
-
- /** */
- private static final long serialVersionUID = -5131640995402822835L;
-
- private String fileName;
- private String isDir;
- private String modifyTime;
- private String size;
-
- private static final String FILE_NAME_FIELD = "fileName";
- private static final String IS_DIR_FIELD = "isDir";
- private static final String MODIFY_TIME_FIELD = "modifyTime";
- private static final String SIZE_FIELD = "size";
-
- public String getFileName() {
- return fileName;
- }
-
- public void setFileName(String fileName) {
- this.fileName = fileName;
- }
-
- public String getIsDir() {
- return isDir;
- }
-
- public void setIsDir(String isDir) {
- this.isDir = isDir;
- }
-
- public String getModifyTime() {
- return modifyTime;
- }
-
- public void setModifyTime(String modifyTime) {
- this.modifyTime = modifyTime;
- }
-
- public String getSize() {
- return size;
- }
-
- public void setSize(String size) {
- this.size = size;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this,
- ToStringStyle.SHORT_PREFIX_STYLE);
- }
-
- @Override
- public String toJSONString() {
- Map<String, String> map = new HashMap<String, String>();
-
- map.put(FILE_NAME_FIELD, fileName);
- map.put(IS_DIR_FIELD, isDir);
- map.put(MODIFY_TIME_FIELD, modifyTime);
- map.put(SIZE_FIELD, size);
- return JStormUtils.to_json(map);
- }
-
- public static FileAttribute fromJSONObject(Map jobj) {
- if (jobj == null) {
- return null;
- }
-
- FileAttribute attribute = new FileAttribute();
-
- attribute.setFileName((String) jobj.get(FILE_NAME_FIELD));
- attribute.setIsDir((String) jobj.get(IS_DIR_FIELD));
- attribute.setModifyTime((String) jobj.get(MODIFY_TIME_FIELD));
- attribute.setSize((String) jobj.get(SIZE_FIELD));
-
- return attribute;
- }
-
- public static void main(String[] args) {
- Map<String, FileAttribute> map = new HashMap<String, FileAttribute>();
-
- FileAttribute attribute = new FileAttribute();
- attribute.setFileName("test");
- attribute.setIsDir("true");
- attribute.setModifyTime(new Date().toString());
- attribute.setSize("4096");
-
- map.put("test", attribute);
-
- System.out.println("Before:" + map);
-
- String jsonString = JStormUtils.to_json(map);
-
- Map<String, Map> map2 = (Map<String, Map>) JStormUtils
- .from_json(jsonString);
-
- Map jObject = map2.get("test");
-
- FileAttribute attribute2 = FileAttribute.fromJSONObject(jObject);
-
- System.out.println("attribute2:" + attribute2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
deleted file mode 100644
index ee7376d..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/HttpserverUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package com.alibaba.jstorm.utils;
-
-public class HttpserverUtils {
-
- public static final String HTTPSERVER_CONTEXT_PATH_LOGVIEW = "/logview";
-
- public static final String HTTPSERVER_LOGVIEW_PARAM_CMD = "cmd";
-
- public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_LIST = "listDir";
-
- public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW = "showLog";
-
- public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK = "jstack";
-
- public static final String HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF = "showConf";
-
- public static final String HTTPSERVER_LOGVIEW_PARAM_LOGFILE = "log";
-
- public static final String HTTPSERVER_LOGVIEW_PARAM_POS = "pos";
-
- public static final String HTTPSERVER_LOGVIEW_PARAM_DIR = "dir";
-
- public static final String HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT = "workerPort";
-
- public static final long HTTPSERVER_LOGVIEW_PAGESIZE = 16384;
-
- public static final String HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT = "%016d\n";
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
----------------------------------------------------------------------
diff --git a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java b/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
deleted file mode 100644
index 6d0acc2..0000000
--- a/jstorm-client-extension/src/main/java/com/alibaba/jstorm/utils/IntervalCheck.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.alibaba.jstorm.utils;
-
-import java.io.Serializable;
-
-public class IntervalCheck implements Serializable {
-
- /**
- *
- */
- private static final long serialVersionUID = 8952971673547362883L;
-
- long lastCheck = System.currentTimeMillis();
-
- // default interval is 1 second
- long interval = 1;
-
- /*
- * if last check time is before interval seconds, return true, otherwise
- * return false
- */
- public boolean check() {
- return checkAndGet() != null;
- }
-
- /**
- *
- * @return
- */
- public Double checkAndGet() {
- long now = System.currentTimeMillis();
-
- synchronized (this) {
- if (now >= interval * 1000 + lastCheck) {
- double pastSecond = ((double) (now - lastCheck)) / 1000;
- lastCheck = now;
- return pastSecond;
- }
- }
-
- return null;
- }
-
- public long getInterval() {
- return interval;
- }
-
- public void setInterval(long interval) {
- this.interval = interval;
- }
-
- public void adjust(long addTimeMillis) {
- lastCheck += addTimeMillis;
- }
-
- public void start() {
- lastCheck = System.currentTimeMillis();
- }
-}