You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:58 UTC
[20/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java
new file mode 100644
index 0000000..c0a220f
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java
@@ -0,0 +1,58 @@
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author wange
+ * @since 15/7/14
+ */
+public class MetricMetaParser {
+ private static final Logger logger = LoggerFactory.getLogger(TopologyMetricsRunnable.class);
+
+ public static MetricMeta fromMetricName(String name) {
+ try {
+ String[] parts = name.split(MetricUtils.DELIM);
+ char ch = parts[0].charAt(0);
+ if (ch == 'W' || ch == 'N' || ch == 'P') {
+ return parseWorkerMetricMeta(parts);
+ } else {
+ return parseTaskMetricMeta(parts);
+ }
+ } catch (Exception ex) {
+ logger.error("Error parsing metric meta, name:{}", name, ex);
+ }
+ return null;
+ }
+
+ private static MetricMeta parseTaskMetricMeta(String[] parts) {
+ MetricMeta meta = new MetricMeta();
+ meta.setMetaType(MetaType.parse(parts[0].charAt(0)).getT());
+ meta.setMetricType(MetricType.parse(parts[0].charAt(1)).getT());
+ meta.setTopologyId(parts[1]);
+ meta.setComponent(parts[2]);
+ meta.setTaskId(Integer.valueOf(parts[3]));
+ meta.setStreamId(parts[4]);
+ meta.setMetricGroup(parts[5]);
+ meta.setMetricName(parts[6]);
+
+ return meta;
+ }
+
+ private static MetricMeta parseWorkerMetricMeta(String[] parts) {
+ MetricMeta meta = new MetricMeta();
+ meta.setMetaType(MetaType.parse(parts[0].charAt(0)).getT());
+ meta.setMetricType(MetricType.parse(parts[0].charAt(1)).getT());
+ meta.setTopologyId(parts[1]);
+ meta.setHost(parts[2]);
+ meta.setPort(Integer.valueOf(parts[3]));
+ meta.setMetricGroup(parts[4]);
+ meta.setMetricName(parts[5]);
+
+ return meta;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java
deleted file mode 100755
index 982c5f6..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public class MetricRegistry implements MetricSet {
- private static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
-
- private static final long serialVersionUID = 8184106900230111064L;
- public static final String NAME_SEPERATOR = ".";
-
- /**
- * Concatenates elements to form a dotted name, eliding any null values or
- * empty strings.
- *
- * @param name the first element of the name
- * @param names the remaining elements of the name
- * @return {@code name} and {@code names} concatenated by periods
- */
- public static String name(String name, String... names) {
- final StringBuilder builder = new StringBuilder();
- append(builder, name);
- if (names != null) {
- for (String s : names) {
- append(builder, s);
- }
- }
- return builder.toString();
- }
-
- /**
- * Concatenates a class name and elements to form a dotted name, eliding any
- * null values or empty strings.
- *
- * @param klass the first element of the name
- * @param names the remaining elements of the name
- * @return {@code klass} and {@code names} concatenated by periods
- */
- public static String name(Class<?> klass, String... names) {
- return name(klass.getName(), names);
- }
-
- private static void append(StringBuilder builder, String part) {
- if (part != null && !part.isEmpty()) {
- if (builder.length() > 0) {
- builder.append(NAME_SEPERATOR);
- }
- builder.append(part);
- }
- }
-
- protected final ConcurrentMap<String, Metric> metrics;
-
- /**
- * Creates a new {@link MetricRegistry}.
- */
- public MetricRegistry() {
- this.metrics = buildMap();
- }
-
- /**
- * Creates a new {@link ConcurrentMap} implementation for use inside the
- * registry. Override this to create a {@link MetricRegistry} with space- or
- * time-bounded metric lifecycles, for example.
- *
- * @return a new {@link ConcurrentMap}
- */
- protected ConcurrentMap<String, Metric> buildMap() {
- return new ConcurrentHashMap<String, Metric>();
- }
-
- /**
- * Given a {@link Metric}, registers it under the given name.
- *
- * @param name the name of the metric
- * @param metric the metric
- * @param <T> the type of the metric
- * @return {@code metric}
- * @throws IllegalArgumentException if the name is already registered
- */
- @SuppressWarnings("unchecked")
- public <T extends Metric> T register(String name, T metric)
- throws IllegalArgumentException {
- if (metric instanceof MetricSet) {
- registerAll(name, (MetricSet) metric);
- } else {
- final Metric existing = metrics.putIfAbsent(name, metric);
- if (existing == null) {
- // add one listener to notify
- LOG.info("Successfully register metric of {}", name);
- } else {
- throw new IllegalArgumentException("A metric named " + name
- + " already exists");
- }
- }
- return metric;
- }
-
- /**
- * Given a metric set, registers them.
- *
- * @param metrics a set of metrics
- * @throws IllegalArgumentException if any of the names are already
- * registered
- */
- public void registerAll(MetricSet metrics) throws IllegalArgumentException {
- registerAll(null, metrics);
- }
-
- /**
- * Removes the metric with the given name.
- *
- * @param name the name of the metric
- * @return whether or not the metric was removed
- */
- public boolean remove(String name) {
- final Metric metric = metrics.remove(name);
- if (metric != null) {
- // call listener to notify remove
- LOG.info("Successfully unregister metric of {}", name);
- return true;
- }
- return false;
- }
-
- /**
- * Removes all metrics which match the given filter.
- *
- * @param filter a filter
- */
- public void removeMatching(MetricFilter filter) {
- for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
- if (filter.matches(entry.getKey(), entry.getValue())) {
- remove(entry.getKey());
- }
- }
- }
-
- /**
- * Returns a set of the names of all the metrics in the registry.
- *
- * @return the names of all the metrics
- */
- public SortedSet<String> getNames() {
- return Collections.unmodifiableSortedSet(new TreeSet<String>(metrics
- .keySet()));
- }
-
- /**
- * Returns a map of all the gauges in the registry and their names.
- *
- * @return all the gauges in the registry
- */
- public SortedMap<String, Gauge> getGauges() {
- return getGauges(MetricFilter.ALL);
- }
-
- /**
- * Returns a map of all the gauges in the registry and their names which
- * match the given filter.
- *
- * @param filter the metric filter to match
- * @return all the gauges in the registry
- */
- public SortedMap<String, Gauge> getGauges(MetricFilter filter) {
- return getMetrics(Gauge.class, filter);
- }
-
- /**
- * Returns a map of all the counters in the registry and their names.
- *
- * @return all the counters in the registry
- */
- public SortedMap<String, Counter> getCounters() {
- return getCounters(MetricFilter.ALL);
- }
-
- /**
- * Returns a map of all the counters in the registry and their names which
- * match the given filter.
- *
- * @param filter the metric filter to match
- * @return all the counters in the registry
- */
- public SortedMap<String, Counter> getCounters(MetricFilter filter) {
- return getMetrics(Counter.class, filter);
- }
-
- /**
- * Returns a map of all the histograms in the registry and their names.
- *
- * @return all the histograms in the registry
- */
- public SortedMap<String, Histogram> getHistograms() {
- return getHistograms(MetricFilter.ALL);
- }
-
- /**
- * Returns a map of all the histograms in the registry and their names which
- * match the given filter.
- *
- * @param filter the metric filter to match
- * @return all the histograms in the registry
- */
- public SortedMap<String, Histogram> getHistograms(MetricFilter filter) {
- return getMetrics(Histogram.class, filter);
- }
-
- /**
- * Returns a map of all the meters in the registry and their names.
- *
- * @return all the meters in the registry
- */
- public SortedMap<String, Meter> getMeters() {
- return getMeters(MetricFilter.ALL);
- }
-
- /**
- * Returns a map of all the meters in the registry and their names which
- * match the given filter.
- *
- * @param filter the metric filter to match
- * @return all the meters in the registry
- */
- public SortedMap<String, Meter> getMeters(MetricFilter filter) {
- return getMetrics(Meter.class, filter);
- }
-
- /**
- * Returns a map of all the timers in the registry and their names.
- *
- * @return all the timers in the registry
- */
- public SortedMap<String, Timer> getTimers() {
- return getTimers(MetricFilter.ALL);
- }
-
- /**
- * Returns a map of all the timers in the registry and their names which
- * match the given filter.
- *
- * @param filter the metric filter to match
- * @return all the timers in the registry
- */
- public SortedMap<String, Timer> getTimers(MetricFilter filter) {
- return getMetrics(Timer.class, filter);
- }
-
- @SuppressWarnings("unchecked")
- private <T extends Metric> SortedMap<String, T> getMetrics(Class<T> klass,
- MetricFilter filter) {
- final TreeMap<String, T> timers = new TreeMap<String, T>();
- for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
- if (klass.isInstance(entry.getValue())
- && filter.matches(entry.getKey(), entry.getValue())) {
- timers.put(entry.getKey(), (T) entry.getValue());
- }
- }
- return Collections.unmodifiableSortedMap(timers);
- }
-
- private void registerAll(String prefix, MetricSet metrics)
- throws IllegalArgumentException {
- for (Map.Entry<String, Metric> entry : metrics.getMetrics().entrySet()) {
- if (entry.getValue() instanceof MetricSet) {
- registerAll(name(prefix, entry.getKey()),
- (MetricSet) entry.getValue());
- } else {
- register(name(prefix, entry.getKey()), entry.getValue());
- }
- }
- }
-
- @Override
- public Map<String, Metric> getMetrics() {
- return Collections.unmodifiableMap(metrics);
- }
-
- /**
- * Expose metrics is to improve performance
- *
- * @return
- */
- public Metric getMetric(String name) {
- return metrics.get(name);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java
deleted file mode 100755
index 243f9b8..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public interface MetricSet extends Serializable {
- Map<String, Metric> getMetrics();
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java
index 0ff964e..114eeb2 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java
@@ -17,6 +17,7 @@
*/
package com.alibaba.jstorm.common.metric;
+import com.google.common.base.Joiner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,10 +35,10 @@ public class QueueGauge extends HealthCheck implements Gauge<Double> {
String name;
Result healthy;
- public QueueGauge(String name, DisruptorQueue queue) {
+ public QueueGauge(DisruptorQueue queue, String... names) {
this.queue = queue;
- this.name = name;
- this.healthy = HealthCheck.Result.healthy();
+ this.name = Joiner.on("-").join(names);
+ this.healthy = Result.healthy();
}
@Override
@@ -52,7 +53,7 @@ public class QueueGauge extends HealthCheck implements Gauge<Double> {
// TODO Auto-generated method stub
Double ret = (double) queue.pctFull();
if (ret > 0.9) {
- return HealthCheck.Result.unhealthy(name + QUEUE_IS_FULL);
+ return Result.unhealthy(name + QUEUE_IS_FULL);
} else {
return healthy;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java
new file mode 100644
index 0000000..0b49ecf
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java
@@ -0,0 +1,180 @@
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.metric.KVSerializable;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.utils.JStormUtils;
+
+import java.util.Date;
+
+/**
+ * @author wange
+ * @since 15/7/16
+ */
+public class TaskTrack implements KVSerializable {
+
+ private long id;
+ private String clusterName;
+ private String topologyId;
+ private String component;
+ private int taskId;
+ private String host;
+ private int port;
+ private Date start;
+ private Date end;
+
+ public TaskTrack() {
+ }
+
+ public TaskTrack(String clusterName, String topologyId) {
+ this.clusterName = clusterName;
+ this.topologyId = topologyId;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getTopologyId() {
+ return topologyId;
+ }
+
+ public void setTopologyId(String topologyId) {
+ this.topologyId = topologyId;
+ }
+
+ public String getComponent() {
+ return component;
+ }
+
+ public void setComponent(String component) {
+ this.component = component;
+ }
+
+ public int getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(int taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public Date getStart() {
+ return start;
+ }
+
+ public void setStart(Date start) {
+ this.start = start;
+ }
+
+ public Date getEnd() {
+ return end;
+ }
+
+ public void setEnd(Date end) {
+ this.end = end;
+ }
+
+ /**
+ * key: clusterName + topologyId + taskId + time
+ */
+ @Override
+ public byte[] getKey() {
+ StringBuilder sb = new StringBuilder(128);
+ sb.append(clusterName).append(MetricUtils.AT).append(topologyId).append(MetricUtils.AT)
+ .append(taskId).append(MetricUtils.AT);
+ if (start != null) {
+ sb.append(start.getTime());
+ } else {
+ sb.append(end.getTime());
+ }
+ return sb.toString().getBytes();
+ }
+
+ /**
+ * value: type + host + port
+ * type: S/E (start/end)
+ */
+ @Override
+ public byte[] getValue() {
+ StringBuilder sb = new StringBuilder(32);
+ if (start != null) {
+ sb.append(KVSerializable.START);
+ } else {
+ sb.append(KVSerializable.END);
+ }
+ sb.append(MetricUtils.AT).append(host).append(MetricUtils.AT).append(port);
+ return sb.toString().getBytes();
+ }
+
+ @Override
+ public Object fromKV(byte[] key, byte[] value) {
+ String[] keyParts = new String(key).split(MetricUtils.DELIM);
+
+ String[] valueParts = new String(value).split(MetricUtils.DELIM);
+ boolean isStart = false;
+ if (valueParts.length >= 3){
+ if (valueParts[0].equals(KVSerializable.START)) isStart = true;
+ host = valueParts[1];
+ port = JStormUtils.parseInt(valueParts[2]);
+ }
+
+ if (keyParts.length >= 4){
+ clusterName = keyParts[0];
+ topologyId = keyParts[1];
+ taskId = JStormUtils.parseInt(keyParts[2]);
+ long ts = JStormUtils.parseLong(keyParts[3]);
+ if (isStart) start = new Date(ts);
+ else end = new Date(ts);
+ }
+
+ return this;
+ }
+
+ public Date getTime() {
+ return start != null ? start : end;
+ }
+
+ public String getIdentity(){
+ StringBuilder sb = new StringBuilder();
+ sb.append(clusterName).append(MetricUtils.AT).append(topologyId).append(MetricUtils.AT)
+ .append(taskId).append(MetricUtils.AT).append(host).append(MetricUtils.AT).append(port);
+ return sb.toString();
+ }
+
+ public void merge(TaskTrack taskTrack){
+ if (taskTrack.start != null && this.start == null){
+ this.start = taskTrack.start;
+ }
+ if (taskTrack.end != null && this.end == null){
+ this.end = taskTrack.end;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java
deleted file mode 100755
index daf5633..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.io.Closeable;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Use com.codahale.metrics's interface
- *
- * @author zhongyan.feng
- *
- */
-public class Timer extends Histogram {
- private static final long serialVersionUID = 5915881891513771108L;
-
- /**
- * A timing context.
- *
- * @see Timer#time()
- */
- public static class Context implements Closeable {
- private final Timer timer;
- private final long startTime;
-
- private Context(Timer timer) {
- this.timer = timer;
- this.startTime = System.currentTimeMillis();
- }
-
- /**
- * Stops recording the elapsed time, updates the timer and returns the
- * elapsed time in nanoseconds.
- */
- public long stop() {
- final long elapsed = System.currentTimeMillis() - startTime;
- timer.update(elapsed, TimeUnit.MILLISECONDS);
- return elapsed;
- }
-
- @Override
- public void close() {
- stop();
- }
- }
-
- public Timer() {
- init();
- }
-
- /**
- * Adds a recorded duration.
- *
- * @param duration the length of the duration
- * @param unit the scale unit of {@code duration}
- */
- public void update(long duration, TimeUnit unit) {
- update(unit.toMillis(duration));
- }
-
- /**
- * Times and records the duration of event.
- *
- * @param event a {@link Callable} whose {@link Callable#call()} method
- * implements a process whose duration should be timed
- * @param <T> the type of the value returned by {@code event}
- * @return the value returned by {@code event}
- * @throws Exception if {@code event} throws an {@link Exception}
- */
- public <T> T time(Callable<T> event) throws Exception {
- final long startTime = System.currentTimeMillis();
- try {
- return event.call();
- } finally {
- update(System.currentTimeMillis() - startTime);
- }
- }
-
- /**
- * Returns a new {@link Context}.
- *
- * @return a new {@link Context}
- * @see Context
- */
- public Context time() {
- return new Context(this);
- }
-
- public long getCount() {
- return allWindow.getSnapshot().getTimes();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java
new file mode 100644
index 0000000..3f2be9b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java
@@ -0,0 +1,169 @@
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+/**
+ * @author wange
+ * @since 15/6/23
+ */
+public class TimerData extends MetricBaseData implements KVSerializable {
+ private long min;
+ private long max;
+ private double mean;
+ private double p50;
+ private double p75;
+ private double p95;
+ private double p98;
+ private double p99;
+ private double p999;
+ private double stddev;
+
+ private double m1;
+ private double m5;
+ private double m15;
+
+ public long getMin() {
+ return min;
+ }
+
+ public void setMin(long min) {
+ this.min = min;
+ }
+
+ public long getMax() {
+ return max;
+ }
+
+ public void setMax(long max) {
+ this.max = max;
+ }
+
+ public double getMean() {
+ return mean;
+ }
+
+ public void setMean(double mean) {
+ this.mean = mean;
+ }
+
+ public double getP50() {
+ return p50;
+ }
+
+ public void setP50(double p50) {
+ this.p50 = p50;
+ }
+
+ public double getP75() {
+ return p75;
+ }
+
+ public void setP75(double p75) {
+ this.p75 = p75;
+ }
+
+ public double getP95() {
+ return p95;
+ }
+
+ public void setP95(double p95) {
+ this.p95 = p95;
+ }
+
+ public double getP98() {
+ return p98;
+ }
+
+ public void setP98(double p98) {
+ this.p98 = p98;
+ }
+
+ public double getP99() {
+ return p99;
+ }
+
+ public void setP99(double p99) {
+ this.p99 = p99;
+ }
+
+ public double getP999() {
+ return p999;
+ }
+
+ public void setP999(double p999) {
+ this.p999 = p999;
+ }
+
+ public double getStddev() {
+ return stddev;
+ }
+
+ public void setStddev(double stddev) {
+ this.stddev = stddev;
+ }
+
+ public double getM1() {
+ return m1;
+ }
+
+ public void setM1(double m1) {
+ this.m1 = m1;
+ }
+
+ public double getM5() {
+ return m5;
+ }
+
+ public void setM5(double m5) {
+ this.m5 = m5;
+ }
+
+ public double getM15() {
+ return m15;
+ }
+
+ public void setM15(double m15) {
+ this.m15 = m15;
+ }
+
+ @Override
+ public byte[] getValue() {
+ byte[] ret = new byte[8 * 12];
+ Bytes.putLong(ret, 0, min);
+ Bytes.putLong(ret, 8, max);
+ Bytes.putDouble(ret, 16, p50);
+ Bytes.putDouble(ret, 24, p75);
+ Bytes.putDouble(ret, 32, p95);
+ Bytes.putDouble(ret, 40, p98);
+ Bytes.putDouble(ret, 48, p99);
+ Bytes.putDouble(ret, 56, p999);
+ Bytes.putDouble(ret, 64, mean);
+ Bytes.putDouble(ret, 72, m1);
+ Bytes.putDouble(ret, 80, m5);
+ Bytes.putDouble(ret, 88, m15);
+
+ return ret;
+ }
+
+ @Override
+ public Object fromKV(byte[] key, byte[] value) {
+ parseKey(key);
+
+ this.min = Bytes.toLong(value, 0, KVSerializable.LONG_SIZE);
+ this.max = Bytes.toLong(value, 8, KVSerializable.LONG_SIZE);
+ this.p50 = Bytes.toDouble(value, 16);
+ this.p75 = Bytes.toDouble(value, 24);
+ this.p95 = Bytes.toDouble(value, 32);
+ this.p98 = Bytes.toDouble(value, 40);
+ this.p99 = Bytes.toDouble(value, 48);
+ this.p999 = Bytes.toDouble(value, 56);
+ this.mean = Bytes.toDouble(value, 64);
+ this.m1 = Bytes.toDouble(value, 72);
+ this.m5 = Bytes.toDouble(value, 80);
+ this.m15 = Bytes.toDouble(value, 88);
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java
index 0a0e7e2..495ec4f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java
@@ -28,11 +28,7 @@ public class TimerRatio implements Gauge<Double> {
private long lastUpdateTime = 0;
private long sum = 0;
- private long lastGaugeTime;
-
- public void init() {
- lastGaugeTime = System.nanoTime();
- }
+ private long lastGaugeTime = 0;
public synchronized void start() {
if (lastUpdateTime == 0) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java
deleted file mode 100755
index 00ccc98..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TreeSet;
-
-import com.alibaba.jstorm.common.metric.operator.convert.Convertor;
-import com.alibaba.jstorm.common.metric.operator.merger.Merger;
-import com.alibaba.jstorm.common.metric.operator.updater.Updater;
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public class Top<T> extends Metric<List<T>, TreeSet<T>> {
- private static final long serialVersionUID = 4990212679365713831L;
-
- final protected Comparator<T> comparator;
- final protected int n;
-
- public Top(Comparator<T> comparator, int n) {
- this.comparator = comparator;
- this.n = n;
-
- this.defaultValue = new TreeSet<T>(comparator);
- this.updater = new Top.TopUpdator<T>(comparator, n);
- this.merger = new Top.TopMerger<T>(comparator, n);
- this.convertor = new Top.SetToList<T>();
-
- init();
- }
-
- public static class TopUpdator<T> implements Updater<TreeSet<T>> {
- private static final long serialVersionUID = -3940041101182079146L;
-
- final protected Comparator<T> comparator;
- final protected int n;
-
- public TopUpdator(Comparator<T> comparator, int n) {
- this.comparator = comparator;
- this.n = n;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public TreeSet<T> update(Number object, TreeSet<T> cache,
- Object... others) {
- // TODO Auto-generated method stub
- if (cache == null) {
- cache = new TreeSet<T>(comparator);
- }
-
- cache.add((T) object);
-
- if (cache.size() > n) {
- cache.remove(cache.last());
- }
-
- return cache;
- }
-
- @Override
- public TreeSet<T> updateBatch(TreeSet<T> object, TreeSet<T> cache,
- Object... objects) {
- // TODO Auto-generated method stub
- if (cache == null) {
- cache = new TreeSet<T>(comparator);
- }
-
- cache.addAll(object);
-
- while (cache.size() > n) {
- cache.remove(cache.last());
- }
-
- return cache;
- }
-
- }
-
- public static class TopMerger<T> implements Merger<TreeSet<T>> {
-
- private static final long serialVersionUID = 4478867986986581638L;
- final protected Comparator<T> comparator;
- final protected int n;
-
- public TopMerger(Comparator<T> comparator, int n) {
- this.comparator = comparator;
- this.n = n;
- }
-
- @Override
- public TreeSet<T> merge(Collection<TreeSet<T>> objs,
- TreeSet<T> unflushed, Object... others) {
- // TODO Auto-generated method stub
- TreeSet<T> temp = new TreeSet<T>(comparator);
- if (unflushed != null) {
- temp.addAll(unflushed);
- }
-
- for (TreeSet<T> set : objs) {
- temp.addAll(set);
- }
-
- if (temp.size() <= n) {
- return temp;
- }
-
- TreeSet<T> ret = new TreeSet<T>(comparator);
- int i = 0;
- for (T item : temp) {
- if (i < n) {
- ret.add(item);
- i++;
- } else {
- break;
- }
- }
- return ret;
- }
-
- }
-
- public static class SetToList<T> implements Convertor<TreeSet<T>, List<T>> {
- private static final long serialVersionUID = 4968816655779625255L;
-
- @Override
- public List<T> convert(TreeSet<T> set) {
- // TODO Auto-generated method stub
- List<T> ret = new ArrayList<T>();
- if (set != null) {
- for (T item : set) {
- ret.add(item);
- }
- }
- return ret;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java
new file mode 100644
index 0000000..186e2be
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java
@@ -0,0 +1,153 @@
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.metric.KVSerializable;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.utils.JStormUtils;
+
+import java.util.Date;
+
+/**
+ * @author wange
+ * @since 15/7/16
+ */
+public class TopologyHistory implements KVSerializable {
+
+ private long id;
+ private String clusterName;
+ private String topologyName;
+ private String topologyId;
+ private double sampleRate;
+ private Date start;
+ private Date end;
+
+ public TopologyHistory() {
+ }
+
+ public TopologyHistory(String clusterName, String topologyId) {
+ this.clusterName = clusterName;
+ this.topologyId = topologyId;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getTopologyName() {
+ return topologyName;
+ }
+
+ public void setTopologyName(String topologyName) {
+ this.topologyName = topologyName;
+ }
+
+ public String getTopologyId() {
+ return topologyId;
+ }
+
+ public void setTopologyId(String topologyId) {
+ this.topologyId = topologyId;
+ }
+
+ public Date getStart() {
+ return start;
+ }
+
+ public void setStart(Date start) {
+ this.start = start;
+ }
+
+ public Date getEnd() {
+ return end;
+ }
+
+ public void setEnd(Date end) {
+ this.end = end;
+ }
+
+ public Date getTime() {
+ return start != null ? start : end;
+ }
+
+ public String getTag() {
+ return start != null ? KVSerializable.START : KVSerializable.END;
+ }
+
+ public double getSampleRate() {
+ return sampleRate;
+ }
+
+ public void setSampleRate(Double sampleRate) {
+ if (sampleRate == null) {
+ this.sampleRate = 1.0d;
+ } else {
+ this.sampleRate = sampleRate;
+ }
+ }
+
+ /**
+ * key: clusterName + topologyName + time
+ */
+ @Override
+ public byte[] getKey() {
+ return MetricUtils.concat2(clusterName, topologyName, getTime().getTime()).getBytes();
+
+ }
+
+ /**
+ * value: topologyId + type: S/E
+ */
+ @Override
+ public byte[] getValue() {
+ return MetricUtils.concat2(topologyId, getTag(), sampleRate).getBytes();
+ }
+
+ @Override
+ public Object fromKV(byte[] key, byte[] value) {
+ String[] keyParts = new String(key).split(MetricUtils.DELIM);
+ long time = 0;
+ if (keyParts.length >= 3) {
+ this.clusterName = keyParts[0];
+ this.topologyName = keyParts[1];
+ time = Long.valueOf(keyParts[2]);
+ }
+
+ String[] valueParts = new String(value).split(MetricUtils.DELIM);
+ if (valueParts.length >= 3) {
+ this.topologyId = valueParts[0];
+ String tag = valueParts[1];
+ if (tag.equals(KVSerializable.START)) {
+ this.start = new Date(time);
+ } else {
+ this.end = new Date(time);
+ }
+ this.sampleRate = JStormUtils.parseDouble(valueParts[2], 0.1d);
+ }
+
+ return this;
+ }
+
+ public String getIdentity(){
+ return MetricUtils.concat2(clusterName, topologyId);
+ }
+
+ public void merge(TopologyHistory history){
+ if (history.start != null && this.start == null){
+ this.start = history.start;
+ }
+ if (history.end != null && this.end == null){
+ this.end = history.end;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java
new file mode 100644
index 0000000..6745f14
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.operator.convert.DefaultConvertor;
+import com.alibaba.jstorm.common.metric.old.operator.merger.SumMerger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.AddUpdater;
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+/**
+ * The class is similar to com.codahale.metrics.Counter
+ *
+ * Sum all window's value
+ *
+ * how to use Counter , please refer to Sampling Interface
+ *
+ * @author zhongyan.feng
+ *
+ * @param <T>
+ */
+public class Counter<T extends Number> extends Metric<T, T> {
+ private static final long serialVersionUID = -1362345159511508074L;
+
+ public Counter(T zero) {
+ updater = new AddUpdater<T>();
+ merger = new SumMerger<T>();
+ convertor = new DefaultConvertor<T>();
+ defaultValue = zero;
+
+ init();
+ }
+
+ public static void main(String[] args) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java
new file mode 100644
index 0000000..b323df8
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+import com.alibaba.jstorm.common.metric.old.window.StatBuckets;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class Gauge<T extends Number> extends Metric<Number, Number> {
+ private static final long serialVersionUID = 1985614006717750790L;
+
+ protected com.codahale.metrics.Gauge<T> gauge;
+
+ public Gauge(com.codahale.metrics.Gauge<T> gauge) {
+ this.gauge = gauge;
+
+ init();
+ }
+
+ @Override
+ public void init() {
+
+ }
+
+ @Override
+ public void update(Number obj) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public Map<Integer, Number> getSnapshot() {
+ // TODO Auto-generated method stub
+ Number value = gauge.getValue();
+
+ Map<Integer, Number> ret = new TreeMap<Integer, Number>();
+ for (Integer timeKey : windowSeconds) {
+ ret.put(timeKey, value);
+ }
+ ret.put(StatBuckets.ALL_TIME_WINDOW, value);
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java
new file mode 100644
index 0000000..478de4e
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.operator.convert.Convertor;
+import com.alibaba.jstorm.common.metric.old.operator.merger.AvgMerger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.AvgUpdater;
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+/**
+ * Meter is used to compute tps
+ *
+ * Attention: 1.
+ *
+ * @author zhongyan.feng
+ *
+ */
+public class Histogram extends Metric<Double, Histogram.HistorgramPair> {
+ private static final long serialVersionUID = -1362345159511508074L;
+
+ public Histogram() {
+ defaultValue = new HistorgramPair();
+ updater = new AvgUpdater();
+ merger = new AvgMerger();
+ convertor = new HistogramConvertor();
+
+ init();
+ }
+
+ public static class HistogramConvertor implements Convertor<HistorgramPair, Double> {
+ private static final long serialVersionUID = -1569170826785657226L;
+
+ @Override
+ public Double convert(HistorgramPair from) {
+ // TODO Auto-generated method stub
+ if (from == null) {
+ return 0.0d;
+ }
+
+ if (from.getTimes() == 0) {
+ return 0.0d;
+ } else {
+ return from.getSum() / from.getTimes();
+ }
+ }
+
+ }
+
+ public static class HistorgramPair {
+ private double sum;
+ private long times;
+
+ public HistorgramPair() {
+
+ }
+
+ public HistorgramPair(double sum, long times) {
+ this.sum = sum;
+ this.times = times;
+ }
+
+ public double getSum() {
+ return sum;
+ }
+
+ public void setSum(double sum) {
+ this.sum = sum;
+ }
+
+ public void addValue(double value) {
+ sum += value;
+ }
+
+ public long getTimes() {
+ return times;
+ }
+
+ public void setTimes(long times) {
+ this.times = times;
+ }
+
+ public void addTimes(long time) {
+ times += time;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java
new file mode 100644
index 0000000..cd64e62
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.operator.convert.AtomicLongToLong;
+import com.alibaba.jstorm.common.metric.old.operator.merger.LongSumMerger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.LongAddUpdater;
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LongCounter extends Metric<Long, AtomicLong> {
+ private static final long serialVersionUID = -1362345159511508074L;
+
+ public LongCounter() {
+ super.defaultValue = new AtomicLong(0);
+ super.updater = new LongAddUpdater();
+ super.merger = new LongSumMerger();
+ super.convertor = new AtomicLongToLong();
+
+ init();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java
new file mode 100644
index 0000000..cde66fd
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.operator.convert.DefaultConvertor;
+import com.alibaba.jstorm.common.metric.old.operator.merger.TpsMerger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.AddUpdater;
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+/**
+ * Meter is used to compute tps
+ *
+ * Attention: 1.
+ *
+ * @author zhongyan.feng
+ *
+ */
+public class Meter extends Metric<Double, Double> {
+ private static final long serialVersionUID = -1362345159511508074L;
+
+ public Meter() {
+ defaultValue = 0.0d;
+ updater = new AddUpdater<Double>();
+ merger = new TpsMerger();
+ convertor = new DefaultConvertor<Double>();
+
+ init();
+ }
+
+ public void update() {
+ update(Double.valueOf(1));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java
new file mode 100644
index 0000000..a91b925
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+import java.io.Serializable;
+
+public interface MetricFilter extends Serializable {
+ /**
+ * Matches all metrics, regardless of type or name.
+ */
+ MetricFilter ALL = new MetricFilter() {
+ private static final long serialVersionUID = 7089987006352295530L;
+
+ @Override
+ public boolean matches(String name, Metric metric) {
+ return true;
+ }
+ };
+
+ /**
+ * Returns {@code true} if the metric matches the filter; {@code false} otherwise.
+ *
+ * @param name the metric's name
+ * @param metric the metric
+ * @return {@code true} if the metric matches the filter
+ */
+ boolean matches(String name, Metric metric);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java
new file mode 100644
index 0000000..1cce913
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public interface MetricSet extends Serializable {
+ Map<String, Metric> getMetrics();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java
new file mode 100644
index 0000000..8de6f6d
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import backtype.storm.generated.MetricInfo;
+import backtype.storm.generated.MetricWindow;
+import com.alibaba.jstorm.utils.JStormUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class MetricThrift {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricThrift.class);
+
+ public static MetricInfo mkMetricInfo() {
+ return new MetricInfo();
+ }
+
+ public static void insert(MetricInfo metricInfo, String key, Map<Integer, Double> windowSet) {
+ }
+
+ public static MetricWindow merge(Map<String, MetricWindow> details) {
+ Map<Integer, Double> merge = new HashMap<Integer, Double>();
+
+ for (Entry<String, MetricWindow> entry : details.entrySet()) {
+ MetricWindow metricWindow = entry.getValue();
+ Map<Integer, Double> metric = metricWindow.get_metricWindow();
+
+ for (Entry<Integer, Double> metricEntry : metric.entrySet()) {
+ Integer key = metricEntry.getKey();
+ try {
+ Double value = ((Number) JStormUtils.add(metricEntry.getValue(), merge.get(key))).doubleValue();
+ merge.put(key, value);
+ } catch (Exception e) {
+ LOG.error("Invalid type of " + entry.getKey() + ":" + key, e);
+ }
+ }
+ }
+
+ MetricWindow ret = new MetricWindow();
+
+ ret.set_metricWindow(merge);
+ return ret;
+ }
+
+ public static void merge(MetricInfo metricInfo, Map<String, Map<String, MetricWindow>> extraMap) {
+ for (Entry<String, Map<String, MetricWindow>> entry : extraMap.entrySet()) {
+ String metricName = entry.getKey();
+ // metricInfo.put_to_baseMetric(metricName, merge(entry.getValue()));
+ }
+ }
+
+ public static MetricWindow mergeMetricWindow(MetricWindow fromMetric, MetricWindow toMetric) {
+ if (toMetric == null) {
+ toMetric = new MetricWindow(new HashMap<Integer, Double>());
+ }
+
+ if (fromMetric == null) {
+ return toMetric;
+ }
+
+ List<Map<Integer, Double>> list = new ArrayList<Map<Integer, Double>>();
+ list.add(fromMetric.get_metricWindow());
+ list.add(toMetric.get_metricWindow());
+ Map<Integer, Double> merged = JStormUtils.mergeMapList(list);
+
+ toMetric.set_metricWindow(merged);
+
+ return toMetric;
+ }
+
+ public static MetricInfo mergeMetricInfo(MetricInfo from, MetricInfo to) {
+ if (to == null) {
+ to = mkMetricInfo();
+ }
+
+ if (from == null) {
+ return to;
+ }
+ // to.get_baseMetric().putAll(from.get_baseMetric());
+
+ return to;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java
new file mode 100644
index 0000000..6e8a020
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java
@@ -0,0 +1,9 @@
+package com.alibaba.jstorm.common.metric.old;
+
+/**
+ * @author wange
+ * @since 15/6/11
+ */
+public enum RegistryType {
+ STREAM, TASK, COMPONENT, WORKER, SYS
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java
new file mode 100644
index 0000000..2094b9a
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+public enum StaticsType {
+ emitted, send_tps, recv_tps, acked, failed, transferred, process_latencies;
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java
new file mode 100644
index 0000000..675c237
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import java.io.Closeable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Use com.codahale.metrics's interface
+ *
+ * @author zhongyan.feng
+ *
+ */
+public class Timer extends Histogram {
+ private static final long serialVersionUID = 5915881891513771108L;
+
+ /**
+ * A timing context.
+ *
+ * @see Timer#time()
+ */
+ public static class Context implements Closeable {
+ private final Timer timer;
+ private final long startTime;
+
+ private Context(Timer timer) {
+ this.timer = timer;
+ this.startTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Stops recording the elapsed time, updates the timer and returns the elapsed time in nanoseconds.
+ */
+ public long stop() {
+ final long elapsed = System.currentTimeMillis() - startTime;
+ timer.update(elapsed, TimeUnit.MILLISECONDS);
+ return elapsed;
+ }
+
+ @Override
+ public void close() {
+ stop();
+ }
+ }
+
+ public Timer() {
+ init();
+ }
+
+ /**
+ * Adds a recorded duration.
+ *
+ * @param duration the length of the duration
+ * @param unit the scale unit of {@code duration}
+ */
+ public void update(long duration, TimeUnit unit) {
+ update(unit.toMillis(duration));
+ }
+
+ /**
+ * Times and records the duration of event.
+ *
+ * @param event a {@link Callable} whose {@link Callable#call()} method implements a process whose duration should be timed
+ * @param <T> the type of the value returned by {@code event}
+ * @return the value returned by {@code event}
+ * @throws Exception if {@code event} throws an {@link Exception}
+ */
+ public <T> T time(Callable<T> event) throws Exception {
+ final long startTime = System.currentTimeMillis();
+ try {
+ return event.call();
+ } finally {
+ update(System.currentTimeMillis() - startTime);
+ }
+ }
+
+ /**
+ * Returns a new {@link Context}.
+ *
+ * @return a new {@link Context}
+ * @see Context
+ */
+ public Context time() {
+ return new Context(this);
+ }
+
+ public long getCount() {
+ return allWindow.getSnapshot().getTimes();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java
new file mode 100644
index 0000000..e3fdbdd
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeSet;
+
+import com.alibaba.jstorm.common.metric.old.operator.convert.Convertor;
+import com.alibaba.jstorm.common.metric.old.operator.merger.Merger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.Updater;
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+public class Top<T> extends Metric<List<T>, TreeSet<T>> {
+ private static final long serialVersionUID = 4990212679365713831L;
+
+ final protected Comparator<T> comparator;
+ final protected int n;
+
+ public Top(Comparator<T> comparator, int n) {
+ this.comparator = comparator;
+ this.n = n;
+
+ this.defaultValue = new TreeSet<T>(comparator);
+ this.updater = new TopUpdator<T>(comparator, n);
+ this.merger = new TopMerger<T>(comparator, n);
+ this.convertor = new SetToList<T>();
+
+ init();
+ }
+
+ public static class TopUpdator<T> implements Updater<TreeSet<T>> {
+ private static final long serialVersionUID = -3940041101182079146L;
+
+ final protected Comparator<T> comparator;
+ final protected int n;
+
+ public TopUpdator(Comparator<T> comparator, int n) {
+ this.comparator = comparator;
+ this.n = n;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TreeSet<T> update(Number object, TreeSet<T> cache, Object... others) {
+ // TODO Auto-generated method stub
+ if (cache == null) {
+ cache = new TreeSet<T>(comparator);
+ }
+
+ cache.add((T) object);
+
+ if (cache.size() > n) {
+ cache.remove(cache.last());
+ }
+
+ return cache;
+ }
+
+ @Override
+ public TreeSet<T> updateBatch(TreeSet<T> object, TreeSet<T> cache, Object... objects) {
+ // TODO Auto-generated method stub
+ if (cache == null) {
+ cache = new TreeSet<T>(comparator);
+ }
+
+ cache.addAll(object);
+
+ while (cache.size() > n) {
+ cache.remove(cache.last());
+ }
+
+ return cache;
+ }
+
+ }
+
+ public static class TopMerger<T> implements Merger<TreeSet<T>> {
+
+ private static final long serialVersionUID = 4478867986986581638L;
+ final protected Comparator<T> comparator;
+ final protected int n;
+
+ public TopMerger(Comparator<T> comparator, int n) {
+ this.comparator = comparator;
+ this.n = n;
+ }
+
+ @Override
+ public TreeSet<T> merge(Collection<TreeSet<T>> objs, TreeSet<T> unflushed, Object... others) {
+ // TODO Auto-generated method stub
+ TreeSet<T> temp = new TreeSet<T>(comparator);
+ if (unflushed != null) {
+ temp.addAll(unflushed);
+ }
+
+ for (TreeSet<T> set : objs) {
+ temp.addAll(set);
+ }
+
+ if (temp.size() <= n) {
+ return temp;
+ }
+
+ TreeSet<T> ret = new TreeSet<T>(comparator);
+ int i = 0;
+ for (T item : temp) {
+ if (i < n) {
+ ret.add(item);
+ i++;
+ } else {
+ break;
+ }
+ }
+ return ret;
+ }
+
+ }
+
+ public static class SetToList<T> implements Convertor<TreeSet<T>, List<T>> {
+ private static final long serialVersionUID = 4968816655779625255L;
+
+ @Override
+ public List<T> convert(TreeSet<T> set) {
+ // TODO Auto-generated method stub
+ List<T> ret = new ArrayList<T>();
+ if (set != null) {
+ for (T item : set) {
+ ret.add(item);
+ }
+ }
+ return ret;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java
new file mode 100644
index 0000000..8f3053b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator;
+
+import java.io.Serializable;
+
+public interface Sampling<V> extends Serializable {
+
+ /**
+ * Update object into Metric
+ *
+ * @param obj
+ */
+ void update(Number obj);
+
+ /**
+ *
+ * Get snapshot of Metric
+ *
+ * @return
+ */
+ V getSnapshot();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java
new file mode 100644
index 0000000..f87ae5a
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator;
+
+public interface StartTime {
+ long getStartTime();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java
new file mode 100644
index 0000000..c9a8b24
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.convert;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AtomicLongToLong implements Convertor<AtomicLong, Long> {
+ private static final long serialVersionUID = -2755066621494409063L;
+
+ @Override
+ public Long convert(AtomicLong obj) {
+ // TODO Auto-generated method stub
+ if (obj == null) {
+ return null;
+ } else {
+ return obj.get();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java
new file mode 100644
index 0000000..713c1df
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.convert;
+
+import java.io.Serializable;
+
+public interface Convertor<From, To> extends Serializable {
+
+ To convert(From obj);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java
new file mode 100644
index 0000000..2cad206
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.convert;
+
+public class DefaultConvertor<T> implements Convertor<T, T> {
+ private static final long serialVersionUID = -647209923903679727L;
+
+ @Override
+ public T convert(T obj) {
+ // TODO Auto-generated method stub
+ return obj;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java
new file mode 100644
index 0000000..2569387
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.convert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class SetToList<T> implements Convertor<Set<T>, List<T>> {
+ private static final long serialVersionUID = 4968816655779625255L;
+
+ @Override
+ public List<T> convert(Set<T> set) {
+ // TODO Auto-generated method stub
+ List<T> ret = new ArrayList<T>();
+ if (set != null) {
+ for (T item : set) {
+ ret.add(item);
+ }
+ }
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java
new file mode 100644
index 0000000..815bb33
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.merger;
+
+import com.alibaba.jstorm.common.metric.old.Histogram;
+
+import java.util.Collection;
+
+public class AvgMerger implements Merger<Histogram.HistorgramPair> {
+ private static final long serialVersionUID = -3892281208959055221L;
+
+ @Override
+ public Histogram.HistorgramPair merge(Collection<Histogram.HistorgramPair> objs, Histogram.HistorgramPair unflushed, Object... others) {
+ // TODO Auto-generated method stub
+ double sum = 0.0d;
+ long times = 0l;
+
+ if (unflushed != null) {
+ sum = sum + unflushed.getSum();
+ times = times + unflushed.getTimes();
+ }
+
+ for (Histogram.HistorgramPair item : objs) {
+ if (item == null) {
+ continue;
+ }
+ sum = sum + item.getSum();
+ times = times + item.getTimes();
+ }
+
+ return new Histogram.HistorgramPair(sum, times);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java
new file mode 100644
index 0000000..1151718
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.merger;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LongSumMerger implements Merger<AtomicLong> {
+ private static final long serialVersionUID = -3500779273677666691L;
+
+ @Override
+ public AtomicLong merge(Collection<AtomicLong> objs, AtomicLong unflushed, Object... others) {
+ AtomicLong ret = new AtomicLong(0);
+ if (unflushed != null) {
+ ret.addAndGet(unflushed.get());
+ }
+
+ for (AtomicLong item : objs) {
+ if (item == null) {
+ continue;
+ }
+ ret.addAndGet(item.get());
+ }
+ return ret;
+ }
+
+}