You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:58 UTC

[20/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java
new file mode 100644
index 0000000..c0a220f
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java
@@ -0,0 +1,58 @@
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricType;
+import com.alibaba.jstorm.metric.MetricUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author wange
+ * @since 15/7/14
+ */
+public class MetricMetaParser {
+    private static final Logger logger = LoggerFactory.getLogger(TopologyMetricsRunnable.class);
+
+    public static MetricMeta fromMetricName(String name) {
+        try {
+            String[] parts = name.split(MetricUtils.DELIM);
+            char ch = parts[0].charAt(0);
+            if (ch == 'W' || ch == 'N' || ch == 'P') {
+                return parseWorkerMetricMeta(parts);
+            } else {
+                return parseTaskMetricMeta(parts);
+            }
+        } catch (Exception ex) {
+            logger.error("Error parsing metric meta, name:{}", name, ex);
+        }
+        return null;
+    }
+
+    private static MetricMeta parseTaskMetricMeta(String[] parts) {
+        MetricMeta meta = new MetricMeta();
+        meta.setMetaType(MetaType.parse(parts[0].charAt(0)).getT());
+        meta.setMetricType(MetricType.parse(parts[0].charAt(1)).getT());
+        meta.setTopologyId(parts[1]);
+        meta.setComponent(parts[2]);
+        meta.setTaskId(Integer.valueOf(parts[3]));
+        meta.setStreamId(parts[4]);
+        meta.setMetricGroup(parts[5]);
+        meta.setMetricName(parts[6]);
+
+        return meta;
+    }
+
+    private static MetricMeta parseWorkerMetricMeta(String[] parts) {
+        MetricMeta meta = new MetricMeta();
+        meta.setMetaType(MetaType.parse(parts[0].charAt(0)).getT());
+        meta.setMetricType(MetricType.parse(parts[0].charAt(1)).getT());
+        meta.setTopologyId(parts[1]);
+        meta.setHost(parts[2]);
+        meta.setPort(Integer.valueOf(parts[3]));
+        meta.setMetricGroup(parts[4]);
+        meta.setMetricName(parts[5]);
+
+        return meta;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java
deleted file mode 100755
index 982c5f6..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public class MetricRegistry implements MetricSet {
-    private static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
-
-    private static final long serialVersionUID = 8184106900230111064L;
-    public static final String NAME_SEPERATOR = ".";
-
-    /**
-     * Concatenates elements to form a dotted name, eliding any null values or
-     * empty strings.
-     *
-     * @param name the first element of the name
-     * @param names the remaining elements of the name
-     * @return {@code name} and {@code names} concatenated by periods
-     */
-    public static String name(String name, String... names) {
-        final StringBuilder builder = new StringBuilder();
-        append(builder, name);
-        if (names != null) {
-            for (String s : names) {
-                append(builder, s);
-            }
-        }
-        return builder.toString();
-    }
-
-    /**
-     * Concatenates a class name and elements to form a dotted name, eliding any
-     * null values or empty strings.
-     *
-     * @param klass the first element of the name
-     * @param names the remaining elements of the name
-     * @return {@code klass} and {@code names} concatenated by periods
-     */
-    public static String name(Class<?> klass, String... names) {
-        return name(klass.getName(), names);
-    }
-
-    private static void append(StringBuilder builder, String part) {
-        if (part != null && !part.isEmpty()) {
-            if (builder.length() > 0) {
-                builder.append(NAME_SEPERATOR);
-            }
-            builder.append(part);
-        }
-    }
-
-    protected final ConcurrentMap<String, Metric> metrics;
-
-    /**
-     * Creates a new {@link MetricRegistry}.
-     */
-    public MetricRegistry() {
-        this.metrics = buildMap();
-    }
-
-    /**
-     * Creates a new {@link ConcurrentMap} implementation for use inside the
-     * registry. Override this to create a {@link MetricRegistry} with space- or
-     * time-bounded metric lifecycles, for example.
-     *
-     * @return a new {@link ConcurrentMap}
-     */
-    protected ConcurrentMap<String, Metric> buildMap() {
-        return new ConcurrentHashMap<String, Metric>();
-    }
-
-    /**
-     * Given a {@link Metric}, registers it under the given name.
-     *
-     * @param name the name of the metric
-     * @param metric the metric
-     * @param <T> the type of the metric
-     * @return {@code metric}
-     * @throws IllegalArgumentException if the name is already registered
-     */
-    @SuppressWarnings("unchecked")
-    public <T extends Metric> T register(String name, T metric)
-            throws IllegalArgumentException {
-        if (metric instanceof MetricSet) {
-            registerAll(name, (MetricSet) metric);
-        } else {
-            final Metric existing = metrics.putIfAbsent(name, metric);
-            if (existing == null) {
-                // add one listener to notify
-                LOG.info("Successfully register metric of {}", name);
-            } else {
-                throw new IllegalArgumentException("A metric named " + name
-                        + " already exists");
-            }
-        }
-        return metric;
-    }
-
-    /**
-     * Given a metric set, registers them.
-     *
-     * @param metrics a set of metrics
-     * @throws IllegalArgumentException if any of the names are already
-     *             registered
-     */
-    public void registerAll(MetricSet metrics) throws IllegalArgumentException {
-        registerAll(null, metrics);
-    }
-
-    /**
-     * Removes the metric with the given name.
-     *
-     * @param name the name of the metric
-     * @return whether or not the metric was removed
-     */
-    public boolean remove(String name) {
-        final Metric metric = metrics.remove(name);
-        if (metric != null) {
-            // call listener to notify remove
-            LOG.info("Successfully unregister metric of {}", name);
-            return true;
-        }
-        return false;
-    }
-
-    /**
-     * Removes all metrics which match the given filter.
-     *
-     * @param filter a filter
-     */
-    public void removeMatching(MetricFilter filter) {
-        for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
-            if (filter.matches(entry.getKey(), entry.getValue())) {
-                remove(entry.getKey());
-            }
-        }
-    }
-
-    /**
-     * Returns a set of the names of all the metrics in the registry.
-     *
-     * @return the names of all the metrics
-     */
-    public SortedSet<String> getNames() {
-        return Collections.unmodifiableSortedSet(new TreeSet<String>(metrics
-                .keySet()));
-    }
-
-    /**
-     * Returns a map of all the gauges in the registry and their names.
-     *
-     * @return all the gauges in the registry
-     */
-    public SortedMap<String, Gauge> getGauges() {
-        return getGauges(MetricFilter.ALL);
-    }
-
-    /**
-     * Returns a map of all the gauges in the registry and their names which
-     * match the given filter.
-     *
-     * @param filter the metric filter to match
-     * @return all the gauges in the registry
-     */
-    public SortedMap<String, Gauge> getGauges(MetricFilter filter) {
-        return getMetrics(Gauge.class, filter);
-    }
-
-    /**
-     * Returns a map of all the counters in the registry and their names.
-     *
-     * @return all the counters in the registry
-     */
-    public SortedMap<String, Counter> getCounters() {
-        return getCounters(MetricFilter.ALL);
-    }
-
-    /**
-     * Returns a map of all the counters in the registry and their names which
-     * match the given filter.
-     *
-     * @param filter the metric filter to match
-     * @return all the counters in the registry
-     */
-    public SortedMap<String, Counter> getCounters(MetricFilter filter) {
-        return getMetrics(Counter.class, filter);
-    }
-
-    /**
-     * Returns a map of all the histograms in the registry and their names.
-     *
-     * @return all the histograms in the registry
-     */
-    public SortedMap<String, Histogram> getHistograms() {
-        return getHistograms(MetricFilter.ALL);
-    }
-
-    /**
-     * Returns a map of all the histograms in the registry and their names which
-     * match the given filter.
-     *
-     * @param filter the metric filter to match
-     * @return all the histograms in the registry
-     */
-    public SortedMap<String, Histogram> getHistograms(MetricFilter filter) {
-        return getMetrics(Histogram.class, filter);
-    }
-
-    /**
-     * Returns a map of all the meters in the registry and their names.
-     *
-     * @return all the meters in the registry
-     */
-    public SortedMap<String, Meter> getMeters() {
-        return getMeters(MetricFilter.ALL);
-    }
-
-    /**
-     * Returns a map of all the meters in the registry and their names which
-     * match the given filter.
-     *
-     * @param filter the metric filter to match
-     * @return all the meters in the registry
-     */
-    public SortedMap<String, Meter> getMeters(MetricFilter filter) {
-        return getMetrics(Meter.class, filter);
-    }
-
-    /**
-     * Returns a map of all the timers in the registry and their names.
-     *
-     * @return all the timers in the registry
-     */
-    public SortedMap<String, Timer> getTimers() {
-        return getTimers(MetricFilter.ALL);
-    }
-
-    /**
-     * Returns a map of all the timers in the registry and their names which
-     * match the given filter.
-     *
-     * @param filter the metric filter to match
-     * @return all the timers in the registry
-     */
-    public SortedMap<String, Timer> getTimers(MetricFilter filter) {
-        return getMetrics(Timer.class, filter);
-    }
-
-    @SuppressWarnings("unchecked")
-    private <T extends Metric> SortedMap<String, T> getMetrics(Class<T> klass,
-            MetricFilter filter) {
-        final TreeMap<String, T> timers = new TreeMap<String, T>();
-        for (Map.Entry<String, Metric> entry : metrics.entrySet()) {
-            if (klass.isInstance(entry.getValue())
-                    && filter.matches(entry.getKey(), entry.getValue())) {
-                timers.put(entry.getKey(), (T) entry.getValue());
-            }
-        }
-        return Collections.unmodifiableSortedMap(timers);
-    }
-
-    private void registerAll(String prefix, MetricSet metrics)
-            throws IllegalArgumentException {
-        for (Map.Entry<String, Metric> entry : metrics.getMetrics().entrySet()) {
-            if (entry.getValue() instanceof MetricSet) {
-                registerAll(name(prefix, entry.getKey()),
-                        (MetricSet) entry.getValue());
-            } else {
-                register(name(prefix, entry.getKey()), entry.getValue());
-            }
-        }
-    }
-
-    @Override
-    public Map<String, Metric> getMetrics() {
-        return Collections.unmodifiableMap(metrics);
-    }
-
-    /**
-     * Expose metrics is to improve performance
-     * 
-     * @return
-     */
-    public Metric getMetric(String name) {
-        return metrics.get(name);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java
deleted file mode 100755
index 243f9b8..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public interface MetricSet extends Serializable {
-    Map<String, Metric> getMetrics();
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java
index 0ff964e..114eeb2 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java
@@ -17,6 +17,7 @@
  */
 package com.alibaba.jstorm.common.metric;
 
+import com.google.common.base.Joiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,10 +35,10 @@ public class QueueGauge extends HealthCheck implements Gauge<Double> {
     String name;
     Result healthy;
 
-    public QueueGauge(String name, DisruptorQueue queue) {
+    public QueueGauge(DisruptorQueue queue, String... names) {
         this.queue = queue;
-        this.name = name;
-        this.healthy = HealthCheck.Result.healthy();
+        this.name = Joiner.on("-").join(names);
+        this.healthy = Result.healthy();
     }
 
     @Override
@@ -52,7 +53,7 @@ public class QueueGauge extends HealthCheck implements Gauge<Double> {
         // TODO Auto-generated method stub
         Double ret = (double) queue.pctFull();
         if (ret > 0.9) {
-            return HealthCheck.Result.unhealthy(name + QUEUE_IS_FULL);
+            return Result.unhealthy(name + QUEUE_IS_FULL);
         } else {
             return healthy;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java
new file mode 100644
index 0000000..0b49ecf
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java
@@ -0,0 +1,180 @@
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.metric.KVSerializable;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.utils.JStormUtils;
+
+import java.util.Date;
+
+/**
+ * @author wange
+ * @since 15/7/16
+ */
+public class TaskTrack implements KVSerializable {
+
+    private long id;
+    private String clusterName;
+    private String topologyId;
+    private String component;
+    private int taskId;
+    private String host;
+    private int port;
+    private Date start;
+    private Date end;
+
+    public TaskTrack() {
+    }
+
+    public TaskTrack(String clusterName, String topologyId) {
+        this.clusterName = clusterName;
+        this.topologyId = topologyId;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getTopologyId() {
+        return topologyId;
+    }
+
+    public void setTopologyId(String topologyId) {
+        this.topologyId = topologyId;
+    }
+
+    public String getComponent() {
+        return component;
+    }
+
+    public void setComponent(String component) {
+        this.component = component;
+    }
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(int taskId) {
+        this.taskId = taskId;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public Date getStart() {
+        return start;
+    }
+
+    public void setStart(Date start) {
+        this.start = start;
+    }
+
+    public Date getEnd() {
+        return end;
+    }
+
+    public void setEnd(Date end) {
+        this.end = end;
+    }
+
+    /**
+     * key: clusterName + topologyId + taskId + time
+     */
+    @Override
+    public byte[] getKey() {
+        StringBuilder sb = new StringBuilder(128);
+        sb.append(clusterName).append(MetricUtils.AT).append(topologyId).append(MetricUtils.AT)
+                .append(taskId).append(MetricUtils.AT);
+        if (start != null) {
+            sb.append(start.getTime());
+        } else {
+            sb.append(end.getTime());
+        }
+        return sb.toString().getBytes();
+    }
+
+    /**
+     * value: type + host + port
+     * type: S/E (start/end)
+     */
+    @Override
+    public byte[] getValue() {
+        StringBuilder sb = new StringBuilder(32);
+        if (start != null) {
+            sb.append(KVSerializable.START);
+        } else {
+            sb.append(KVSerializable.END);
+        }
+        sb.append(MetricUtils.AT).append(host).append(MetricUtils.AT).append(port);
+        return sb.toString().getBytes();
+    }
+
+    @Override
+    public Object fromKV(byte[] key, byte[] value) {
+        String[] keyParts = new String(key).split(MetricUtils.DELIM);
+
+        String[] valueParts = new String(value).split(MetricUtils.DELIM);
+        boolean isStart = false;
+        if (valueParts.length >= 3){
+            if (valueParts[0].equals(KVSerializable.START)) isStart = true;
+            host = valueParts[1];
+            port = JStormUtils.parseInt(valueParts[2]);
+        }
+
+        if (keyParts.length >= 4){
+            clusterName = keyParts[0];
+            topologyId = keyParts[1];
+            taskId = JStormUtils.parseInt(keyParts[2]);
+            long ts = JStormUtils.parseLong(keyParts[3]);
+            if (isStart) start = new Date(ts);
+            else end = new Date(ts);
+        }
+
+        return this;
+    }
+
+    public Date getTime() {
+        return start != null ? start : end;
+    }
+
+    public String getIdentity(){
+        StringBuilder sb = new StringBuilder();
+        sb.append(clusterName).append(MetricUtils.AT).append(topologyId).append(MetricUtils.AT)
+                .append(taskId).append(MetricUtils.AT).append(host).append(MetricUtils.AT).append(port);
+        return sb.toString();
+    }
+
+    public void merge(TaskTrack taskTrack){
+        if (taskTrack.start != null && this.start == null){
+            this.start = taskTrack.start;
+        }
+        if (taskTrack.end != null && this.end == null){
+            this.end = taskTrack.end;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java
deleted file mode 100755
index daf5633..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.io.Closeable;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Use com.codahale.metrics's interface
- * 
- * @author zhongyan.feng
- * 
- */
-public class Timer extends Histogram {
-    private static final long serialVersionUID = 5915881891513771108L;
-
-    /**
-     * A timing context.
-     * 
-     * @see Timer#time()
-     */
-    public static class Context implements Closeable {
-        private final Timer timer;
-        private final long startTime;
-
-        private Context(Timer timer) {
-            this.timer = timer;
-            this.startTime = System.currentTimeMillis();
-        }
-
-        /**
-         * Stops recording the elapsed time, updates the timer and returns the
-         * elapsed time in nanoseconds.
-         */
-        public long stop() {
-            final long elapsed = System.currentTimeMillis() - startTime;
-            timer.update(elapsed, TimeUnit.MILLISECONDS);
-            return elapsed;
-        }
-
-        @Override
-        public void close() {
-            stop();
-        }
-    }
-
-    public Timer() {
-        init();
-    }
-
-    /**
-     * Adds a recorded duration.
-     * 
-     * @param duration the length of the duration
-     * @param unit the scale unit of {@code duration}
-     */
-    public void update(long duration, TimeUnit unit) {
-        update(unit.toMillis(duration));
-    }
-
-    /**
-     * Times and records the duration of event.
-     * 
-     * @param event a {@link Callable} whose {@link Callable#call()} method
-     *            implements a process whose duration should be timed
-     * @param <T> the type of the value returned by {@code event}
-     * @return the value returned by {@code event}
-     * @throws Exception if {@code event} throws an {@link Exception}
-     */
-    public <T> T time(Callable<T> event) throws Exception {
-        final long startTime = System.currentTimeMillis();
-        try {
-            return event.call();
-        } finally {
-            update(System.currentTimeMillis() - startTime);
-        }
-    }
-
-    /**
-     * Returns a new {@link Context}.
-     * 
-     * @return a new {@link Context}
-     * @see Context
-     */
-    public Context time() {
-        return new Context(this);
-    }
-
-    public long getCount() {
-        return allWindow.getSnapshot().getTimes();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java
new file mode 100644
index 0000000..3f2be9b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java
@@ -0,0 +1,169 @@
+package com.alibaba.jstorm.common.metric;
+
+
+import com.alibaba.jstorm.metric.Bytes;
+import com.alibaba.jstorm.metric.KVSerializable;
+
+/**
+ * @author wange
+ * @since 15/6/23
+ */
+public class TimerData extends MetricBaseData implements KVSerializable {
+    private long min;
+    private long max;
+    private double mean;
+    private double p50;
+    private double p75;
+    private double p95;
+    private double p98;
+    private double p99;
+    private double p999;
+    private double stddev;
+
+    private double m1;
+    private double m5;
+    private double m15;
+
+    public long getMin() {
+        return min;
+    }
+
+    public void setMin(long min) {
+        this.min = min;
+    }
+
+    public long getMax() {
+        return max;
+    }
+
+    public void setMax(long max) {
+        this.max = max;
+    }
+
+    public double getMean() {
+        return mean;
+    }
+
+    public void setMean(double mean) {
+        this.mean = mean;
+    }
+
+    public double getP50() {
+        return p50;
+    }
+
+    public void setP50(double p50) {
+        this.p50 = p50;
+    }
+
+    public double getP75() {
+        return p75;
+    }
+
+    public void setP75(double p75) {
+        this.p75 = p75;
+    }
+
+    public double getP95() {
+        return p95;
+    }
+
+    public void setP95(double p95) {
+        this.p95 = p95;
+    }
+
+    public double getP98() {
+        return p98;
+    }
+
+    public void setP98(double p98) {
+        this.p98 = p98;
+    }
+
+    public double getP99() {
+        return p99;
+    }
+
+    public void setP99(double p99) {
+        this.p99 = p99;
+    }
+
+    public double getP999() {
+        return p999;
+    }
+
+    public void setP999(double p999) {
+        this.p999 = p999;
+    }
+
+    public double getStddev() {
+        return stddev;
+    }
+
+    public void setStddev(double stddev) {
+        this.stddev = stddev;
+    }
+
+    public double getM1() {
+        return m1;
+    }
+
+    public void setM1(double m1) {
+        this.m1 = m1;
+    }
+
+    public double getM5() {
+        return m5;
+    }
+
+    public void setM5(double m5) {
+        this.m5 = m5;
+    }
+
+    public double getM15() {
+        return m15;
+    }
+
+    public void setM15(double m15) {
+        this.m15 = m15;
+    }
+
+    @Override
+    public byte[] getValue() {
+        byte[] ret = new byte[8 * 12];
+        Bytes.putLong(ret, 0, min);
+        Bytes.putLong(ret, 8, max);
+        Bytes.putDouble(ret, 16, p50);
+        Bytes.putDouble(ret, 24, p75);
+        Bytes.putDouble(ret, 32, p95);
+        Bytes.putDouble(ret, 40, p98);
+        Bytes.putDouble(ret, 48, p99);
+        Bytes.putDouble(ret, 56, p999);
+        Bytes.putDouble(ret, 64, mean);
+        Bytes.putDouble(ret, 72, m1);
+        Bytes.putDouble(ret, 80, m5);
+        Bytes.putDouble(ret, 88, m15);
+
+        return ret;
+    }
+
+    @Override
+    public Object fromKV(byte[] key, byte[] value) {
+        parseKey(key);
+
+        this.min = Bytes.toLong(value, 0, KVSerializable.LONG_SIZE);
+        this.max = Bytes.toLong(value, 8, KVSerializable.LONG_SIZE);
+        this.p50 = Bytes.toDouble(value, 16);
+        this.p75 = Bytes.toDouble(value, 24);
+        this.p95 = Bytes.toDouble(value, 32);
+        this.p98 = Bytes.toDouble(value, 40);
+        this.p99 = Bytes.toDouble(value, 48);
+        this.p999 = Bytes.toDouble(value, 56);
+        this.mean = Bytes.toDouble(value, 64);
+        this.m1 = Bytes.toDouble(value, 72);
+        this.m5 = Bytes.toDouble(value, 80);
+        this.m15 = Bytes.toDouble(value, 88);
+
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java
index 0a0e7e2..495ec4f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java
@@ -28,11 +28,7 @@ public class TimerRatio implements Gauge<Double> {
 
     private long lastUpdateTime = 0;
     private long sum = 0;
-    private long lastGaugeTime;
-
-    public void init() {
-        lastGaugeTime = System.nanoTime();
-    }
+    private long lastGaugeTime = 0;
 
     public synchronized void start() {
         if (lastUpdateTime == 0) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java
deleted file mode 100755
index 00ccc98..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.TreeSet;
-
-import com.alibaba.jstorm.common.metric.operator.convert.Convertor;
-import com.alibaba.jstorm.common.metric.operator.merger.Merger;
-import com.alibaba.jstorm.common.metric.operator.updater.Updater;
-import com.alibaba.jstorm.common.metric.window.Metric;
-
-public class Top<T> extends Metric<List<T>, TreeSet<T>> {
-    private static final long serialVersionUID = 4990212679365713831L;
-
-    final protected Comparator<T> comparator;
-    final protected int n;
-
-    public Top(Comparator<T> comparator, int n) {
-        this.comparator = comparator;
-        this.n = n;
-
-        this.defaultValue = new TreeSet<T>(comparator);
-        this.updater = new Top.TopUpdator<T>(comparator, n);
-        this.merger = new Top.TopMerger<T>(comparator, n);
-        this.convertor = new Top.SetToList<T>();
-
-        init();
-    }
-
-    public static class TopUpdator<T> implements Updater<TreeSet<T>> {
-        private static final long serialVersionUID = -3940041101182079146L;
-
-        final protected Comparator<T> comparator;
-        final protected int n;
-
-        public TopUpdator(Comparator<T> comparator, int n) {
-            this.comparator = comparator;
-            this.n = n;
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public TreeSet<T> update(Number object, TreeSet<T> cache,
-                Object... others) {
-            // TODO Auto-generated method stub
-            if (cache == null) {
-                cache = new TreeSet<T>(comparator);
-            }
-
-            cache.add((T) object);
-
-            if (cache.size() > n) {
-                cache.remove(cache.last());
-            }
-
-            return cache;
-        }
-
-        @Override
-        public TreeSet<T> updateBatch(TreeSet<T> object, TreeSet<T> cache,
-                Object... objects) {
-            // TODO Auto-generated method stub
-            if (cache == null) {
-                cache = new TreeSet<T>(comparator);
-            }
-
-            cache.addAll(object);
-
-            while (cache.size() > n) {
-                cache.remove(cache.last());
-            }
-
-            return cache;
-        }
-
-    }
-
-    public static class TopMerger<T> implements Merger<TreeSet<T>> {
-
-        private static final long serialVersionUID = 4478867986986581638L;
-        final protected Comparator<T> comparator;
-        final protected int n;
-
-        public TopMerger(Comparator<T> comparator, int n) {
-            this.comparator = comparator;
-            this.n = n;
-        }
-
-        @Override
-        public TreeSet<T> merge(Collection<TreeSet<T>> objs,
-                TreeSet<T> unflushed, Object... others) {
-            // TODO Auto-generated method stub
-            TreeSet<T> temp = new TreeSet<T>(comparator);
-            if (unflushed != null) {
-                temp.addAll(unflushed);
-            }
-
-            for (TreeSet<T> set : objs) {
-                temp.addAll(set);
-            }
-
-            if (temp.size() <= n) {
-                return temp;
-            }
-
-            TreeSet<T> ret = new TreeSet<T>(comparator);
-            int i = 0;
-            for (T item : temp) {
-                if (i < n) {
-                    ret.add(item);
-                    i++;
-                } else {
-                    break;
-                }
-            }
-            return ret;
-        }
-
-    }
-
-    public static class SetToList<T> implements Convertor<TreeSet<T>, List<T>> {
-        private static final long serialVersionUID = 4968816655779625255L;
-
-        @Override
-        public List<T> convert(TreeSet<T> set) {
-            // TODO Auto-generated method stub
-            List<T> ret = new ArrayList<T>();
-            if (set != null) {
-                for (T item : set) {
-                    ret.add(item);
-                }
-            }
-            return ret;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java
new file mode 100644
index 0000000..186e2be
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java
@@ -0,0 +1,153 @@
+package com.alibaba.jstorm.common.metric;
+
+import com.alibaba.jstorm.metric.KVSerializable;
+import com.alibaba.jstorm.metric.MetricUtils;
+import com.alibaba.jstorm.utils.JStormUtils;
+
+import java.util.Date;
+
+/**
+ * @author wange
+ * @since 15/7/16
+ */
+public class TopologyHistory implements KVSerializable {
+
+    private long id;
+    private String clusterName;
+    private String topologyName;
+    private String topologyId;
+    private double sampleRate;
+    private Date start;
+    private Date end;
+
+    public TopologyHistory() {
+    }
+
+    public TopologyHistory(String clusterName, String topologyId) {
+        this.clusterName = clusterName;
+        this.topologyId = topologyId;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public void setClusterName(String clusterName) {
+        this.clusterName = clusterName;
+    }
+
+    public String getTopologyName() {
+        return topologyName;
+    }
+
+    public void setTopologyName(String topologyName) {
+        this.topologyName = topologyName;
+    }
+
+    public String getTopologyId() {
+        return topologyId;
+    }
+
+    public void setTopologyId(String topologyId) {
+        this.topologyId = topologyId;
+    }
+
+    public Date getStart() {
+        return start;
+    }
+
+    public void setStart(Date start) {
+        this.start = start;
+    }
+
+    public Date getEnd() {
+        return end;
+    }
+
+    public void setEnd(Date end) {
+        this.end = end;
+    }
+
+    public Date getTime() {
+        return start != null ? start : end;
+    }
+
+    public String getTag() {
+        return start != null ? KVSerializable.START : KVSerializable.END;
+    }
+
+    public double getSampleRate() {
+        return sampleRate;
+    }
+
+    public void setSampleRate(Double sampleRate) {
+        if (sampleRate == null) {
+            this.sampleRate = 1.0d;
+        } else {
+            this.sampleRate = sampleRate;
+        }
+    }
+
+    /**
+     * key: clusterName + topologyName + time
+     */
+    @Override
+    public byte[] getKey() {
+        return MetricUtils.concat2(clusterName, topologyName, getTime().getTime()).getBytes();
+
+    }
+
+    /**
+     * value: topologyId + type: S/E
+     */
+    @Override
+    public byte[] getValue() {
+        return MetricUtils.concat2(topologyId, getTag(), sampleRate).getBytes();
+    }
+
+    @Override
+    public Object fromKV(byte[] key, byte[] value) {
+        String[] keyParts = new String(key).split(MetricUtils.DELIM);
+        long time = 0;
+        if (keyParts.length >= 3) {
+            this.clusterName = keyParts[0];
+            this.topologyName = keyParts[1];
+            time = Long.valueOf(keyParts[2]);
+        }
+
+        String[] valueParts = new String(value).split(MetricUtils.DELIM);
+        if (valueParts.length >= 3) {
+            this.topologyId = valueParts[0];
+            String tag = valueParts[1];
+            if (tag.equals(KVSerializable.START)) {
+                this.start = new Date(time);
+            } else {
+                this.end = new Date(time);
+            }
+            this.sampleRate = JStormUtils.parseDouble(valueParts[2], 0.1d);
+        }
+
+        return this;
+    }
+
+    public String getIdentity(){
+        return MetricUtils.concat2(clusterName, topologyId);
+    }
+
+    public void merge(TopologyHistory history){
+        if (history.start != null && this.start == null){
+            this.start = history.start;
+        }
+        if (history.end != null && this.end == null){
+            this.end = history.end;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java
new file mode 100644
index 0000000..6745f14
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.operator.convert.DefaultConvertor;
+import com.alibaba.jstorm.common.metric.old.operator.merger.SumMerger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.AddUpdater;
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+/**
+ * The class is similar to com.codahale.metrics.Counter
+ * 
+ * Sum all window's value
+ * 
+ * how to use Counter , please refer to Sampling Interface
+ * 
+ * @author zhongyan.feng
+ * 
+ * @param <T>
+ */
+public class Counter<T extends Number> extends Metric<T, T> {
+    private static final long serialVersionUID = -1362345159511508074L;
+
+    public Counter(T zero) {
+        updater = new AddUpdater<T>();
+        merger = new SumMerger<T>();
+        convertor = new DefaultConvertor<T>();
+        defaultValue = zero;
+
+        init();
+    }
+
+    public static void main(String[] args) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java
new file mode 100644
index 0000000..b323df8
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+import com.alibaba.jstorm.common.metric.old.window.StatBuckets;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class Gauge<T extends Number> extends Metric<Number, Number> {
+    private static final long serialVersionUID = 1985614006717750790L;
+
+    protected com.codahale.metrics.Gauge<T> gauge;
+
+    public Gauge(com.codahale.metrics.Gauge<T> gauge) {
+        this.gauge = gauge;
+
+        init();
+    }
+
+    @Override
+    public void init() {
+
+    }
+
+    @Override
+    public void update(Number obj) {
+        // TODO Auto-generated method stub
+    }
+
+    @Override
+    public Map<Integer, Number> getSnapshot() {
+        // TODO Auto-generated method stub
+        Number value = gauge.getValue();
+
+        Map<Integer, Number> ret = new TreeMap<Integer, Number>();
+        for (Integer timeKey : windowSeconds) {
+            ret.put(timeKey, value);
+        }
+        ret.put(StatBuckets.ALL_TIME_WINDOW, value);
+
+        return ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java
new file mode 100644
index 0000000..478de4e
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.operator.convert.Convertor;
+import com.alibaba.jstorm.common.metric.old.operator.merger.AvgMerger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.AvgUpdater;
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+/**
+ * Meter is used to compute tps
+ * 
+ * Attention: 1.
+ * 
+ * @author zhongyan.feng
+ * 
+ */
+public class Histogram extends Metric<Double, Histogram.HistorgramPair> {
+    private static final long serialVersionUID = -1362345159511508074L;
+
+    public Histogram() {
+        defaultValue = new HistorgramPair();
+        updater = new AvgUpdater();
+        merger = new AvgMerger();
+        convertor = new HistogramConvertor();
+
+        init();
+    }
+
+    public static class HistogramConvertor implements Convertor<HistorgramPair, Double> {
+        private static final long serialVersionUID = -1569170826785657226L;
+
+        @Override
+        public Double convert(HistorgramPair from) {
+            // TODO Auto-generated method stub
+            if (from == null) {
+                return 0.0d;
+            }
+
+            if (from.getTimes() == 0) {
+                return 0.0d;
+            } else {
+                return from.getSum() / from.getTimes();
+            }
+        }
+
+    }
+
+    public static class HistorgramPair {
+        private double sum;
+        private long times;
+
+        public HistorgramPair() {
+
+        }
+
+        public HistorgramPair(double sum, long times) {
+            this.sum = sum;
+            this.times = times;
+        }
+
+        public double getSum() {
+            return sum;
+        }
+
+        public void setSum(double sum) {
+            this.sum = sum;
+        }
+
+        public void addValue(double value) {
+            sum += value;
+        }
+
+        public long getTimes() {
+            return times;
+        }
+
+        public void setTimes(long times) {
+            this.times = times;
+        }
+
+        public void addTimes(long time) {
+            times += time;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java
new file mode 100644
index 0000000..cd64e62
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.operator.convert.AtomicLongToLong;
+import com.alibaba.jstorm.common.metric.old.operator.merger.LongSumMerger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.LongAddUpdater;
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LongCounter extends Metric<Long, AtomicLong> {
+    private static final long serialVersionUID = -1362345159511508074L;
+
+    public LongCounter() {
+        super.defaultValue = new AtomicLong(0);
+        super.updater = new LongAddUpdater();
+        super.merger = new LongSumMerger();
+        super.convertor = new AtomicLongToLong();
+
+        init();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java
new file mode 100644
index 0000000..cde66fd
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.operator.convert.DefaultConvertor;
+import com.alibaba.jstorm.common.metric.old.operator.merger.TpsMerger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.AddUpdater;
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+/**
+ * Meter is used to compute tps
+ * 
+ * Attention: 1.
+ * 
+ * @author zhongyan.feng
+ * 
+ */
+public class Meter extends Metric<Double, Double> {
+    private static final long serialVersionUID = -1362345159511508074L;
+
+    public Meter() {
+        defaultValue = 0.0d;
+        updater = new AddUpdater<Double>();
+        merger = new TpsMerger();
+        convertor = new DefaultConvertor<Double>();
+
+        init();
+    }
+
+    public void update() {
+        update(Double.valueOf(1));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java
new file mode 100644
index 0000000..a91b925
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+import java.io.Serializable;
+
+public interface MetricFilter extends Serializable {
+    /**
+     * Matches all metrics, regardless of type or name.
+     */
+    MetricFilter ALL = new MetricFilter() {
+        private static final long serialVersionUID = 7089987006352295530L;
+
+        @Override
+        public boolean matches(String name, Metric metric) {
+            return true;
+        }
+    };
+
+    /**
+     * Returns {@code true} if the metric matches the filter; {@code false} otherwise.
+     * 
+     * @param name the metric's name
+     * @param metric the metric
+     * @return {@code true} if the metric matches the filter
+     */
+    boolean matches(String name, Metric metric);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java
new file mode 100644
index 0000000..1cce913
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public interface MetricSet extends Serializable {
+    Map<String, Metric> getMetrics();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java
new file mode 100644
index 0000000..8de6f6d
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import backtype.storm.generated.MetricInfo;
+import backtype.storm.generated.MetricWindow;
+import com.alibaba.jstorm.utils.JStormUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class MetricThrift {
+    private static final Logger LOG = LoggerFactory.getLogger(MetricThrift.class);
+
+    public static MetricInfo mkMetricInfo() {
+        return new MetricInfo();
+    }
+
+    public static void insert(MetricInfo metricInfo, String key, Map<Integer, Double> windowSet) {
+    }
+
+    public static MetricWindow merge(Map<String, MetricWindow> details) {
+        Map<Integer, Double> merge = new HashMap<Integer, Double>();
+
+        for (Entry<String, MetricWindow> entry : details.entrySet()) {
+            MetricWindow metricWindow = entry.getValue();
+            Map<Integer, Double> metric = metricWindow.get_metricWindow();
+
+            for (Entry<Integer, Double> metricEntry : metric.entrySet()) {
+                Integer key = metricEntry.getKey();
+                try {
+                    Double value = ((Number) JStormUtils.add(metricEntry.getValue(), merge.get(key))).doubleValue();
+                    merge.put(key, value);
+                } catch (Exception e) {
+                    LOG.error("Invalid type of " + entry.getKey() + ":" + key, e);
+                }
+            }
+        }
+
+        MetricWindow ret = new MetricWindow();
+
+        ret.set_metricWindow(merge);
+        return ret;
+    }
+
+    public static void merge(MetricInfo metricInfo, Map<String, Map<String, MetricWindow>> extraMap) {
+        for (Entry<String, Map<String, MetricWindow>> entry : extraMap.entrySet()) {
+            String metricName = entry.getKey();
+            // metricInfo.put_to_baseMetric(metricName, merge(entry.getValue()));
+        }
+    }
+
+    public static MetricWindow mergeMetricWindow(MetricWindow fromMetric, MetricWindow toMetric) {
+        if (toMetric == null) {
+            toMetric = new MetricWindow(new HashMap<Integer, Double>());
+        }
+
+        if (fromMetric == null) {
+            return toMetric;
+        }
+
+        List<Map<Integer, Double>> list = new ArrayList<Map<Integer, Double>>();
+        list.add(fromMetric.get_metricWindow());
+        list.add(toMetric.get_metricWindow());
+        Map<Integer, Double> merged = JStormUtils.mergeMapList(list);
+
+        toMetric.set_metricWindow(merged);
+
+        return toMetric;
+    }
+
+    public static MetricInfo mergeMetricInfo(MetricInfo from, MetricInfo to) {
+        if (to == null) {
+            to = mkMetricInfo();
+        }
+
+        if (from == null) {
+            return to;
+        }
+        // to.get_baseMetric().putAll(from.get_baseMetric());
+
+        return to;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java
new file mode 100644
index 0000000..6e8a020
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java
@@ -0,0 +1,9 @@
+package com.alibaba.jstorm.common.metric.old;
+
+/**
+ * @author wange
+ * @since 15/6/11
+ */
+public enum RegistryType {
+    STREAM, TASK, COMPONENT, WORKER, SYS
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java
new file mode 100644
index 0000000..2094b9a
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+public enum StaticsType {
+    emitted, send_tps, recv_tps, acked, failed, transferred, process_latencies;
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java
new file mode 100644
index 0000000..675c237
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import java.io.Closeable;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Use com.codahale.metrics's interface
+ * 
+ * @author zhongyan.feng
+ * 
+ */
+public class Timer extends Histogram {
+    private static final long serialVersionUID = 5915881891513771108L;
+
+    /**
+     * A timing context.
+     * 
+     * @see Timer#time()
+     */
+    public static class Context implements Closeable {
+        private final Timer timer;
+        private final long startTime;
+
+        private Context(Timer timer) {
+            this.timer = timer;
+            this.startTime = System.currentTimeMillis();
+        }
+
+        /**
+         * Stops recording the elapsed time, updates the timer and returns the elapsed time in nanoseconds.
+         */
+        public long stop() {
+            final long elapsed = System.currentTimeMillis() - startTime;
+            timer.update(elapsed, TimeUnit.MILLISECONDS);
+            return elapsed;
+        }
+
+        @Override
+        public void close() {
+            stop();
+        }
+    }
+
+    public Timer() {
+        init();
+    }
+
+    /**
+     * Adds a recorded duration.
+     * 
+     * @param duration the length of the duration
+     * @param unit the scale unit of {@code duration}
+     */
+    public void update(long duration, TimeUnit unit) {
+        update(unit.toMillis(duration));
+    }
+
+    /**
+     * Times and records the duration of event.
+     * 
+     * @param event a {@link Callable} whose {@link Callable#call()} method implements a process whose duration should be timed
+     * @param <T> the type of the value returned by {@code event}
+     * @return the value returned by {@code event}
+     * @throws Exception if {@code event} throws an {@link Exception}
+     */
+    public <T> T time(Callable<T> event) throws Exception {
+        final long startTime = System.currentTimeMillis();
+        try {
+            return event.call();
+        } finally {
+            update(System.currentTimeMillis() - startTime);
+        }
+    }
+
+    /**
+     * Returns a new {@link Context}.
+     * 
+     * @return a new {@link Context}
+     * @see Context
+     */
+    public Context time() {
+        return new Context(this);
+    }
+
+    public long getCount() {
+        return allWindow.getSnapshot().getTimes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java
new file mode 100644
index 0000000..e3fdbdd
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeSet;
+
+import com.alibaba.jstorm.common.metric.old.operator.convert.Convertor;
+import com.alibaba.jstorm.common.metric.old.operator.merger.Merger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.Updater;
+import com.alibaba.jstorm.common.metric.old.window.Metric;
+
+public class Top<T> extends Metric<List<T>, TreeSet<T>> {
+    private static final long serialVersionUID = 4990212679365713831L;
+
+    final protected Comparator<T> comparator;
+    final protected int n;
+
+    public Top(Comparator<T> comparator, int n) {
+        this.comparator = comparator;
+        this.n = n;
+
+        this.defaultValue = new TreeSet<T>(comparator);
+        this.updater = new TopUpdator<T>(comparator, n);
+        this.merger = new TopMerger<T>(comparator, n);
+        this.convertor = new SetToList<T>();
+
+        init();
+    }
+
+    public static class TopUpdator<T> implements Updater<TreeSet<T>> {
+        private static final long serialVersionUID = -3940041101182079146L;
+
+        final protected Comparator<T> comparator;
+        final protected int n;
+
+        public TopUpdator(Comparator<T> comparator, int n) {
+            this.comparator = comparator;
+            this.n = n;
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public TreeSet<T> update(Number object, TreeSet<T> cache, Object... others) {
+            // TODO Auto-generated method stub
+            if (cache == null) {
+                cache = new TreeSet<T>(comparator);
+            }
+
+            cache.add((T) object);
+
+            if (cache.size() > n) {
+                cache.remove(cache.last());
+            }
+
+            return cache;
+        }
+
+        @Override
+        public TreeSet<T> updateBatch(TreeSet<T> object, TreeSet<T> cache, Object... objects) {
+            // TODO Auto-generated method stub
+            if (cache == null) {
+                cache = new TreeSet<T>(comparator);
+            }
+
+            cache.addAll(object);
+
+            while (cache.size() > n) {
+                cache.remove(cache.last());
+            }
+
+            return cache;
+        }
+
+    }
+
+    public static class TopMerger<T> implements Merger<TreeSet<T>> {
+
+        private static final long serialVersionUID = 4478867986986581638L;
+        final protected Comparator<T> comparator;
+        final protected int n;
+
+        public TopMerger(Comparator<T> comparator, int n) {
+            this.comparator = comparator;
+            this.n = n;
+        }
+
+        @Override
+        public TreeSet<T> merge(Collection<TreeSet<T>> objs, TreeSet<T> unflushed, Object... others) {
+            // TODO Auto-generated method stub
+            TreeSet<T> temp = new TreeSet<T>(comparator);
+            if (unflushed != null) {
+                temp.addAll(unflushed);
+            }
+
+            for (TreeSet<T> set : objs) {
+                temp.addAll(set);
+            }
+
+            if (temp.size() <= n) {
+                return temp;
+            }
+
+            TreeSet<T> ret = new TreeSet<T>(comparator);
+            int i = 0;
+            for (T item : temp) {
+                if (i < n) {
+                    ret.add(item);
+                    i++;
+                } else {
+                    break;
+                }
+            }
+            return ret;
+        }
+
+    }
+
+    public static class SetToList<T> implements Convertor<TreeSet<T>, List<T>> {
+        private static final long serialVersionUID = 4968816655779625255L;
+
+        @Override
+        public List<T> convert(TreeSet<T> set) {
+            // TODO Auto-generated method stub
+            List<T> ret = new ArrayList<T>();
+            if (set != null) {
+                for (T item : set) {
+                    ret.add(item);
+                }
+            }
+            return ret;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java
new file mode 100644
index 0000000..8f3053b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator;
+
+import java.io.Serializable;
+
+public interface Sampling<V> extends Serializable {
+
+    /**
+     * Update object into Metric
+     * 
+     * @param obj
+     */
+    void update(Number obj);
+
+    /**
+     * 
+     * Get snapshot of Metric
+     * 
+     * @return
+     */
+    V getSnapshot();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java
new file mode 100644
index 0000000..f87ae5a
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator;
+
+public interface StartTime {
+    long getStartTime();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java
new file mode 100644
index 0000000..c9a8b24
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.convert;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AtomicLongToLong implements Convertor<AtomicLong, Long> {
+    private static final long serialVersionUID = -2755066621494409063L;
+
+    @Override
+    public Long convert(AtomicLong obj) {
+        // TODO Auto-generated method stub
+        if (obj == null) {
+            return null;
+        } else {
+            return obj.get();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java
new file mode 100644
index 0000000..713c1df
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.convert;
+
+import java.io.Serializable;
+
+public interface Convertor<From, To> extends Serializable {
+
+    To convert(From obj);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java
new file mode 100644
index 0000000..2cad206
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.convert;
+
+public class DefaultConvertor<T> implements Convertor<T, T> {
+    private static final long serialVersionUID = -647209923903679727L;
+
+    @Override
+    public T convert(T obj) {
+        // TODO Auto-generated method stub
+        return obj;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java
new file mode 100644
index 0000000..2569387
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.convert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class SetToList<T> implements Convertor<Set<T>, List<T>> {
+    private static final long serialVersionUID = 4968816655779625255L;
+
+    @Override
+    public List<T> convert(Set<T> set) {
+        // TODO Auto-generated method stub
+        List<T> ret = new ArrayList<T>();
+        if (set != null) {
+            for (T item : set) {
+                ret.add(item);
+            }
+        }
+        return ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java
new file mode 100644
index 0000000..815bb33
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.merger;
+
+import com.alibaba.jstorm.common.metric.old.Histogram;
+
+import java.util.Collection;
+
+public class AvgMerger implements Merger<Histogram.HistorgramPair> {
+    private static final long serialVersionUID = -3892281208959055221L;
+
+    @Override
+    public Histogram.HistorgramPair merge(Collection<Histogram.HistorgramPair> objs, Histogram.HistorgramPair unflushed, Object... others) {
+        // TODO Auto-generated method stub
+        double sum = 0.0d;
+        long times = 0l;
+
+        if (unflushed != null) {
+            sum = sum + unflushed.getSum();
+            times = times + unflushed.getTimes();
+        }
+
+        for (Histogram.HistorgramPair item : objs) {
+            if (item == null) {
+                continue;
+            }
+            sum = sum + item.getSum();
+            times = times + item.getTimes();
+        }
+
+        return new Histogram.HistorgramPair(sum, times);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java
new file mode 100644
index 0000000..1151718
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.merger;
+
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LongSumMerger implements Merger<AtomicLong> {
+    private static final long serialVersionUID = -3500779273677666691L;
+
+    @Override
+    public AtomicLong merge(Collection<AtomicLong> objs, AtomicLong unflushed, Object... others) {
+        AtomicLong ret = new AtomicLong(0);
+        if (unflushed != null) {
+            ret.addAndGet(unflushed.get());
+        }
+
+        for (AtomicLong item : objs) {
+            if (item == null) {
+                continue;
+            }
+            ret.addAndGet(item.get());
+        }
+        return ret;
+    }
+
+}