You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:57 UTC

[19/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java
new file mode 100644
index 0000000..4e31019
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/Merger.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.merger;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+public interface Merger<V> extends Serializable {
+    V merge(Collection<V> objs, V unflushed, Object... others);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java
new file mode 100644
index 0000000..b4d65e6
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/SumMerger.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.merger;
+
+import java.util.Collection;
+
+import com.alibaba.jstorm.utils.JStormUtils;
+
+public class SumMerger<T extends Number> implements Merger<T> {
+    private static final long serialVersionUID = -7026523452570138433L;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public T merge(Collection<T> objs, T unflushed, Object... others) {
+        // TODO Auto-generated method stub
+        T ret = unflushed;
+        for (T obj : objs) {
+            ret = (T) JStormUtils.add(ret, obj);
+        }
+
+        return ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java
new file mode 100644
index 0000000..4d770d6
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/TpsMerger.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.merger;
+
+import java.util.Collection;
+
+import com.alibaba.jstorm.common.metric.old.operator.StartTime;
+
+public class TpsMerger implements Merger<Double> {
+    private static final long serialVersionUID = -4534840881635955942L;
+    protected final long createTime;
+
+    public TpsMerger() {
+        createTime = System.currentTimeMillis();
+    }
+
+    public long getRunMillis(Object... args) {
+        long startTime = createTime;
+
+        if (args != null) {
+            if (args[0] != null && args[0] instanceof StartTime) {
+                StartTime rollingWindow = (StartTime) args[0];
+
+                startTime = rollingWindow.getStartTime();
+            }
+        }
+
+        return (System.currentTimeMillis() - startTime);
+    }
+
+    @Override
+    public Double merge(Collection<Double> objs, Double unflushed, Object... others) {
+        // TODO Auto-generated method stub
+        double sum = 0.0d;
+        if (unflushed != null) {
+            sum += unflushed;
+        }
+
+        for (Double item : objs) {
+            if (item != null) {
+                sum += item;
+            }
+        }
+
+        Double ret = (sum * 1000) / getRunMillis(others);
+        return ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java
new file mode 100644
index 0000000..a6c06f6
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AddUpdater.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.updater;
+
+import com.alibaba.jstorm.utils.JStormUtils;
+
+public class AddUpdater<T extends Number> implements Updater<T> {
+    private static final long serialVersionUID = -7955740095421752763L;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public T update(Number object, T cache, Object... others) {
+        // TODO Auto-generated method stub
+        return (T) JStormUtils.add(cache, object);
+    }
+
+    @Override
+    public T updateBatch(T object, T cache, Object... objects) {
+        // TODO Auto-generated method stub
+        return (T) JStormUtils.add(cache, object);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java
new file mode 100644
index 0000000..a0b7aa1
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/AvgUpdater.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.updater;
+
+import com.alibaba.jstorm.common.metric.old.Histogram;
+
+public class AvgUpdater implements Updater<Histogram.HistorgramPair> {
+    private static final long serialVersionUID = 2562836921724586449L;
+
+    @Override
+    public Histogram.HistorgramPair update(Number object, Histogram.HistorgramPair cache, Object... others) {
+        // TODO Auto-generated method stub
+        if (object == null) {
+            return cache;
+        }
+        if (cache == null) {
+            cache = new Histogram.HistorgramPair();
+        }
+
+        cache.addValue(object.doubleValue());
+        cache.addTimes(1l);
+
+        return cache;
+    }
+
+    @Override
+    public Histogram.HistorgramPair updateBatch(Histogram.HistorgramPair object, Histogram.HistorgramPair cache, Object... objects) {
+        // TODO Auto-generated method stub
+        if (object == null) {
+            return cache;
+        }
+        if (cache == null) {
+            cache = new Histogram.HistorgramPair();
+        }
+
+        cache.addValue(object.getSum());
+        cache.addTimes(object.getTimes());
+
+        return cache;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java
new file mode 100644
index 0000000..b25f97b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/DoubleAddUpdater.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.updater;
+
+import com.google.common.util.concurrent.AtomicDouble;
+
+public class DoubleAddUpdater implements Updater<AtomicDouble> {
+    private static final long serialVersionUID = -1293565961076552462L;
+
+    @Override
+    public AtomicDouble update(Number object, AtomicDouble cache, Object... others) {
+        // TODO Auto-generated method stub
+        if (cache == null) {
+            cache = new AtomicDouble(0.0);
+        }
+        if (object != null) {
+            cache.addAndGet(object.doubleValue());
+        }
+        return cache;
+    }
+
+    @Override
+    public AtomicDouble updateBatch(AtomicDouble object, AtomicDouble cache, Object... objects) {
+        // TODO Auto-generated method stub
+        return update(object, cache, objects);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java
new file mode 100644
index 0000000..8db6f21
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/LongAddUpdater.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.updater;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class LongAddUpdater implements Updater<AtomicLong> {
+    private static final long serialVersionUID = -2185639264737912405L;
+
+    @Override
+    public AtomicLong update(Number object, AtomicLong cache, Object... others) {
+        // TODO Auto-generated method stub
+        if (cache == null) {
+            cache = new AtomicLong(0);
+        }
+
+        if (object != null) {
+            cache.addAndGet(object.longValue());
+        }
+        return cache;
+    }
+
+    @Override
+    public AtomicLong updateBatch(AtomicLong object, AtomicLong cache, Object... objects) {
+        // TODO Auto-generated method stub
+        return update(object, cache, objects);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java
new file mode 100644
index 0000000..42d7b58
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/updater/Updater.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.operator.updater;
+
+import java.io.Serializable;
+
+public interface Updater<V> extends Serializable {
+    V update(Number object, V cache, Object... others);
+
+    V updateBatch(V object, V cache, Object... objects);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java
new file mode 100644
index 0000000..244db74
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/AllWindow.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.window;
+
+import com.alibaba.jstorm.common.metric.old.operator.Sampling;
+import com.alibaba.jstorm.common.metric.old.operator.StartTime;
+import com.alibaba.jstorm.common.metric.old.operator.merger.Merger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.Updater;
+
+import java.util.ArrayList;
+
+public class AllWindow<V> implements Sampling<V>, StartTime {
+
+    private static final long serialVersionUID = -8523514907315740812L;
+
+    protected V unflushed;
+    protected V defaultValue;
+
+    protected Updater<V> updater;
+    protected Merger<V> merger;
+    protected long startTime;
+
+    AllWindow(V defaultValue, Updater<V> updater, Merger<V> merger) {
+
+        this.updater = updater;
+        this.merger = merger;
+
+        this.defaultValue = defaultValue;
+        this.startTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public void update(Number obj) {
+        // TODO Auto-generated method stub
+        synchronized (this) {
+            unflushed = updater.update(obj, unflushed);
+        }
+    }
+
+    public void updateBatch(V batch) {
+        synchronized (this) {
+            unflushed = updater.updateBatch(batch, unflushed);
+        }
+    }
+
+    @Override
+    public V getSnapshot() {
+        // TODO Auto-generated method stub
+        V ret = merger.merge(new ArrayList<V>(), unflushed, this);
+        if (ret == null) {
+            return defaultValue;
+        } else {
+            return ret;
+        }
+    }
+
+    @Override
+    public long getStartTime() {
+        // TODO Auto-generated method stub
+        return startTime;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java
new file mode 100644
index 0000000..e505a1f
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/Metric.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.window;
+
+import com.alibaba.jstorm.callback.Callback;
+import com.alibaba.jstorm.common.metric.old.operator.Sampling;
+import com.alibaba.jstorm.common.metric.old.operator.convert.Convertor;
+import com.alibaba.jstorm.common.metric.old.operator.merger.Merger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.Updater;
+import com.alibaba.jstorm.utils.IntervalCheck;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class Metric<T, V> implements Sampling<Map<Integer, T>> {
+    private static final long serialVersionUID = -1362345159511508074L;
+    private static final Logger LOG = LoggerFactory.getLogger(Metric.class);
+
+    protected static boolean enable;
+
+    public static void setEnable(boolean e) {
+        enable = e;
+    }
+
+    protected List<RollingWindow<V>> rollingWindows;
+    protected AllWindow<V> allWindow;
+
+    protected int[] windowSeconds = { StatBuckets.MINUTE_WINDOW, StatBuckets.HOUR_WINDOW, StatBuckets.DAY_WINDOW };
+    protected int bucketSize = StatBuckets.NUM_STAT_BUCKETS;
+    protected V defaultValue;
+    protected Updater<V> updater;
+    protected Merger<V> merger;
+    protected Convertor<V, T> convertor;
+    protected Callback callback;
+
+    protected int interval; // unit is second
+    protected IntervalCheck intervalCheck;
+    protected V unflushed;
+
+    public Metric() {
+    }
+
+    public int getInterval() {
+        if (windowSeconds == null || windowSeconds.length == 0) {
+            return StatBuckets.NUM_STAT_BUCKETS;
+        }
+
+        int intervals[] = new int[windowSeconds.length];
+        int smallest = Integer.MAX_VALUE;
+        for (int i = 0; i < windowSeconds.length; i++) {
+            int interval = windowSeconds[i] / bucketSize;
+            intervals[i] = interval;
+            if (interval < smallest) {
+                smallest = interval;
+            }
+        }
+
+        for (int goodInterval = smallest; goodInterval > 1; goodInterval--) {
+            boolean good = true;
+            for (int interval : intervals) {
+                if (interval % goodInterval != 0) {
+                    good = false;
+                    break;
+                }
+            }
+
+            if (good == true) {
+                return goodInterval;
+            }
+        }
+
+        return 1;
+    }
+
+    public void init() {
+        if (defaultValue == null || updater == null || merger == null || convertor == null) {
+            throw new IllegalArgumentException("Invalid argements");
+        }
+
+        rollingWindows = new ArrayList<RollingWindow<V>>();
+        if (windowSeconds != null) {
+            rollingWindows.clear();
+            for (int windowSize : windowSeconds) {
+                RollingWindow<V> rollingWindow = new RollingWindow<V>(defaultValue, windowSize / bucketSize, windowSize, updater, merger);
+
+                rollingWindows.add(rollingWindow);
+            }
+
+        }
+        allWindow = new AllWindow<V>(defaultValue, updater, merger);
+
+        this.interval = getInterval();
+        this.intervalCheck = new IntervalCheck();
+        this.intervalCheck.setInterval(interval);
+    }
+
+    /**
+     * In order to improve performance Do
+     */
+    @Override
+    public void update(Number obj) {
+        if (enable == false) {
+            return;
+        }
+
+        if (intervalCheck.check()) {
+            flush();
+        }
+        synchronized (this) {
+            unflushed = updater.update(obj, unflushed);
+        }
+    }
+
+    public synchronized void flush() {
+        if (unflushed == null) {
+            return;
+        }
+        for (RollingWindow<V> rollingWindow : rollingWindows) {
+            rollingWindow.updateBatch(unflushed);
+        }
+        allWindow.updateBatch(unflushed);
+        unflushed = null;
+    }
+
+    @Override
+    public Map<Integer, T> getSnapshot() {
+        // TODO Auto-generated method stub
+        flush();
+
+        Map<Integer, T> ret = new TreeMap<Integer, T>();
+        for (RollingWindow<V> rollingWindow : rollingWindows) {
+            V value = rollingWindow.getSnapshot();
+
+            ret.put(rollingWindow.getWindowSecond(), convertor.convert(value));
+        }
+
+        ret.put(StatBuckets.ALL_TIME_WINDOW, convertor.convert(allWindow.getSnapshot()));
+
+        if (callback != null) {
+            callback.execute(this);
+        }
+        return ret;
+    }
+
+    public T getAllTimeValue() {
+        return convertor.convert(allWindow.getSnapshot());
+    }
+
+    public int[] getWindowSeconds() {
+        return windowSeconds;
+    }
+
+    public void setWindowSeconds(int[] windowSeconds) {
+        this.windowSeconds = windowSeconds;
+    }
+
+    public int getBucketSize() {
+        return bucketSize;
+    }
+
+    public void setBucketSize(int bucketSize) {
+        this.bucketSize = bucketSize;
+    }
+
+    public V getDefaultValue() {
+        return defaultValue;
+    }
+
+    public void setDefaultValue(V defaultValue) {
+        this.defaultValue = defaultValue;
+    }
+
+    public Updater<V> getUpdater() {
+        return updater;
+    }
+
+    public void setUpdater(Updater<V> updater) {
+        this.updater = updater;
+    }
+
+    public Merger<V> getMerger() {
+        return merger;
+    }
+
+    public void setMerger(Merger<V> merger) {
+        this.merger = merger;
+    }
+
+    public Convertor<V, T> getConvertor() {
+        return convertor;
+    }
+
+    public void setConvertor(Convertor<V, T> convertor) {
+        this.convertor = convertor;
+    }
+
+    public Callback getCallback() {
+        return callback;
+    }
+
+    public void setCallback(Callback callback) {
+        this.callback = callback;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java
new file mode 100644
index 0000000..5963951
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/RollingWindow.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.window;
+
+import com.alibaba.jstorm.common.metric.old.operator.Sampling;
+import com.alibaba.jstorm.common.metric.old.operator.StartTime;
+import com.alibaba.jstorm.common.metric.old.operator.merger.Merger;
+import com.alibaba.jstorm.common.metric.old.operator.updater.Updater;
+import com.alibaba.jstorm.utils.IntervalCheck;
+import com.alibaba.jstorm.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeMap;
+
+public class RollingWindow<V> implements Sampling<V>, StartTime {
+    private static final long serialVersionUID = 3794478417380003279L;
+    private static final Logger LOG = LoggerFactory.getLogger(RollingWindow.class);
+
+    protected long startTime;
+    protected Integer currBucketTime;
+    protected int interval; // unit is second
+    protected int windowSecond;
+    protected IntervalCheck intervalCheck;
+
+    protected TreeMap<Integer, V> buckets;
+    protected Integer bucketNum;
+    protected V unflushed;
+    protected V defaultValue;
+
+    protected Updater<V> updater;
+    protected Merger<V> merger;
+
+    RollingWindow(V defaultValue, int interval, int windowSecond, Updater<V> updater, Merger<V> merger) {
+        this.startTime = System.currentTimeMillis();
+        this.interval = interval;
+        this.intervalCheck = new IntervalCheck();
+        this.intervalCheck.setInterval(interval);
+        this.currBucketTime = getCurrBucketTime();
+
+        this.bucketNum = windowSecond / interval;
+        this.windowSecond = (bucketNum) * interval;
+
+        this.buckets = new TreeMap<Integer, V>();
+
+        this.updater = updater;
+        this.merger = merger;
+
+        this.defaultValue = defaultValue;
+
+    }
+
+    @Override
+    public void update(Number obj) {
+        // TODO Auto-generated method stub
+
+        if (intervalCheck.check()) {
+            rolling();
+        }
+        synchronized (this) {
+            unflushed = updater.update(obj, unflushed);
+
+        }
+
+    }
+
+    /**
+     * In order to improve performance Flush one batch to rollingWindow
+     * 
+     */
+    public void updateBatch(V batch) {
+
+        if (intervalCheck.check()) {
+            rolling();
+        }
+        synchronized (this) {
+            unflushed = updater.updateBatch(batch, unflushed);
+        }
+
+    }
+
+    @Override
+    public V getSnapshot() {
+        // TODO Auto-generated method stub
+        if (intervalCheck.check()) {
+            rolling();
+        }
+
+        cleanExpiredBuckets();
+        // @@@ Testing
+        // LOG.info("Raw Data:" + buckets + ",unflushed:" + unflushed);
+
+        Collection<V> values = buckets.values();
+
+        V ret = merger.merge(values, unflushed, this);
+        if (ret == null) {
+
+            // @@@ testing
+            // LOG.warn("!!!!Exist null data !!!!!");
+            return defaultValue;
+        }
+        return ret;
+    }
+
+    /*
+     * Move the "current bucket time" index and clean the expired buckets
+     */
+    protected void rolling() {
+        synchronized (this) {
+            if (unflushed != null) {
+                buckets.put(currBucketTime, unflushed);
+                unflushed = null;
+            }
+
+            currBucketTime = getCurrBucketTime();
+
+            return;
+        }
+    }
+
+    protected void cleanExpiredBuckets() {
+        int nowSec = TimeUtils.current_time_secs();
+        int startRemove = nowSec - (interval - 1) - windowSecond;
+
+        List<Integer> removeList = new ArrayList<Integer>();
+
+        for (Integer keyTime : buckets.keySet()) {
+            if (keyTime < startRemove) {
+                removeList.add(keyTime);
+            } else if (keyTime >= startRemove) {
+                break;
+            }
+        }
+
+        for (Integer removeKey : removeList) {
+            buckets.remove(removeKey);
+            // @@@ Testing
+            // LOG.info("Remove key:" + removeKey + ", diff:" + (nowSec - removeKey));
+
+        }
+
+        if (buckets.isEmpty() == false) {
+            Integer first = buckets.firstKey();
+            startTime = first.longValue() * 1000;
+        }
+    }
+
+    public int getWindowSecond() {
+        return windowSecond;
+    }
+
+    public long getStartTime() {
+        return startTime;
+    }
+
+    public int getInterval() {
+        return interval;
+    }
+
+    public Integer getBucketNum() {
+        return bucketNum;
+    }
+
+    public V getDefaultValue() {
+        return defaultValue;
+    }
+
+    private Integer getCurrBucketTime() {
+        return (TimeUtils.current_time_secs() / interval) * interval;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java
new file mode 100644
index 0000000..30f5c64
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/window/StatBuckets.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.common.metric.old.window;
+
+import com.google.common.base.Joiner;
+
+import java.util.*;
+
+public class StatBuckets {
+
+    public static final Integer NUM_STAT_BUCKETS = 20;
+
+    public static final Integer MINUTE_WINDOW = 600;
+    public static final Integer HOUR_WINDOW = 10800;
+    public static final Integer DAY_WINDOW = 86400;
+    public static final Integer ALL_TIME_WINDOW = 0;
+    public static Set<Integer> TIME_WINDOWS = new TreeSet<Integer>();
+    static {
+        TIME_WINDOWS.add(ALL_TIME_WINDOW);
+        TIME_WINDOWS.add(MINUTE_WINDOW);
+        TIME_WINDOWS.add(HOUR_WINDOW);
+        TIME_WINDOWS.add(DAY_WINDOW);
+    }
+
+    public static final String MINUTE_WINDOW_STR = "0d0h10m0s";
+    public static final String HOUR_WINDOW_STR = "0d3h0m0s";
+    public static final String DAY_WINDOW_STR = "1d0h0m0s";
+    public static final String ALL_WINDOW_STR = "All-time";
+
+    public static Integer[] STAT_BUCKETS = { MINUTE_WINDOW / NUM_STAT_BUCKETS, HOUR_WINDOW / NUM_STAT_BUCKETS, DAY_WINDOW / NUM_STAT_BUCKETS };
+
+    private static final String[][] PRETTYSECDIVIDERS = { new String[] { "s", "60" }, new String[] { "m", "60" }, new String[] { "h", "24" },
+            new String[] { "d", null } };
+
+    /**
+     * Service b
+     * 
+     * @param key
+     * @return
+     */
+    public static String parseTimeKey(Integer key) {
+        if (key == 0) {
+            return ALL_WINDOW_STR;
+        } else {
+            return String.valueOf(key);
+        }
+    }
+
+    /**
+     * 
+     * Default is the latest result
+     * 
+     * @param showKey
+     * @return
+     */
+    public static Integer getTimeKey(String showKey) {
+        Integer window = null;
+        if (showKey == null) {
+            window = (MINUTE_WINDOW);
+        } else if (showKey.equals(MINUTE_WINDOW_STR)) {
+            window = (MINUTE_WINDOW);
+        } else if (showKey.equals(HOUR_WINDOW_STR)) {
+            window = (HOUR_WINDOW);
+        } else if (showKey.equals(DAY_WINDOW_STR)) {
+            window = (DAY_WINDOW);
+        } else if (showKey.equals(ALL_WINDOW_STR)) {
+            window = ALL_TIME_WINDOW;
+        } else {
+            window = MINUTE_WINDOW;
+        }
+
+        return window;
+    }
+
+    /**
+     * Default is the latest result
+     * 
+     * @param showStr
+     * @return
+     */
+    public static String getShowTimeStr(Integer time) {
+        if (time == null) {
+            return MINUTE_WINDOW_STR;
+        } else if (time.equals(MINUTE_WINDOW)) {
+            return MINUTE_WINDOW_STR;
+        } else if (time.equals(HOUR_WINDOW)) {
+            return HOUR_WINDOW_STR;
+        } else if (time.equals(DAY_WINDOW)) {
+            return DAY_WINDOW_STR;
+        } else if (time.equals(ALL_TIME_WINDOW)) {
+            return ALL_WINDOW_STR;
+        } else {
+            return MINUTE_WINDOW_STR;
+        }
+
+    }
+
+    /**
+     * seconds to string like 1d20h30m40s
+     * 
+     * @param secs
+     * @return
+     */
+    public static String prettyUptimeStr(int secs) {
+        int diversize = PRETTYSECDIVIDERS.length;
+
+        List<String> tmp = new ArrayList<String>();
+        int div = secs;
+        for (int i = 0; i < diversize; i++) {
+            if (PRETTYSECDIVIDERS[i][1] != null) {
+                Integer d = Integer.parseInt(PRETTYSECDIVIDERS[i][1]);
+                tmp.add(div % d + PRETTYSECDIVIDERS[i][0]);
+                div = div / d;
+            } else {
+                tmp.add(div + PRETTYSECDIVIDERS[i][0]);
+            }
+        }
+
+        String rtn = "";
+        int tmpSzie = tmp.size();
+        for (int j = tmpSzie - 1; j > -1; j--) {
+            rtn += tmp.get(j);
+        }
+        return rtn;
+    }
+
+    /**
+     * seconds to string like '30m 40s' and '1d 20h 30m 40s'
+     *
+     * @param secs
+     * @return
+     */
+    public static String prettyUptime(int secs) {
+        int diversize = PRETTYSECDIVIDERS.length;
+
+        LinkedList<String> tmp = new LinkedList<>();
+        int div = secs;
+        for (int i = 0; i < diversize; i++) {
+            if (PRETTYSECDIVIDERS[i][1] != null) {
+                Integer d = Integer.parseInt(PRETTYSECDIVIDERS[i][1]);
+                tmp.addFirst(div % d + PRETTYSECDIVIDERS[i][0]);
+                div = div / d;
+            } else {
+                tmp.addFirst(div + PRETTYSECDIVIDERS[i][0]);
+            }
+            if (div <= 0 ) break;
+        }
+
+        Joiner joiner = Joiner.on(" ");
+        return joiner.join(tmp);
+    }
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java
deleted file mode 100755
index 3d32cc9..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/Sampling.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator;
-
-import java.io.Serializable;
-
-public interface Sampling<V> extends Serializable {
-
-    /**
-     * Update object into Metric
-     * 
-     * @param obj
-     */
-    void update(Number obj);
-
-    /**
-     * 
-     * Get snapshot of Metric
-     * 
-     * @return
-     */
-    V getSnapshot();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java
deleted file mode 100755
index 0b6173f..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/StartTime.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator;
-
-public interface StartTime {
-    long getStartTime();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java
deleted file mode 100755
index 8f142f1..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/AtomicLongToLong.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.convert;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class AtomicLongToLong implements Convertor<AtomicLong, Long> {
-    private static final long serialVersionUID = -2755066621494409063L;
-
-    @Override
-    public Long convert(AtomicLong obj) {
-        // TODO Auto-generated method stub
-        if (obj == null) {
-            return null;
-        } else {
-            return obj.get();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java
deleted file mode 100755
index 73cdceb..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/Convertor.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.convert;
-
-import java.io.Serializable;
-
-public interface Convertor<From, To> extends Serializable {
-
-    To convert(From obj);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java
deleted file mode 100755
index 47065d0..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/DefaultConvertor.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.convert;
-
-public class DefaultConvertor<T> implements Convertor<T, T> {
-    private static final long serialVersionUID = -647209923903679727L;
-
-    @Override
-    public T convert(T obj) {
-        // TODO Auto-generated method stub
-        return obj;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java
deleted file mode 100755
index 4891222..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/convert/SetToList.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.convert;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-public class SetToList<T> implements Convertor<Set<T>, List<T>> {
-    private static final long serialVersionUID = 4968816655779625255L;
-
-    @Override
-    public List<T> convert(Set<T> set) {
-        // TODO Auto-generated method stub
-        List<T> ret = new ArrayList<T>();
-        if (set != null) {
-            for (T item : set) {
-                ret.add(item);
-            }
-        }
-        return ret;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java
deleted file mode 100755
index 3ad94f2..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.util.Collection;
-
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.utils.Pair;
-
-public class AvgMerger implements Merger<Histogram.HistorgramPair> {
-    private static final long serialVersionUID = -3892281208959055221L;
-
-    @Override
-    public Histogram.HistorgramPair merge(
-            Collection<Histogram.HistorgramPair> objs,
-            Histogram.HistorgramPair unflushed, Object... others) {
-        // TODO Auto-generated method stub
-        double sum = 0.0d;
-        long times = 0l;
-
-        if (unflushed != null) {
-            sum = sum + unflushed.getSum();
-            times = times + unflushed.getTimes();
-        }
-
-        for (Histogram.HistorgramPair item : objs) {
-            if (item == null) {
-                continue;
-            }
-            sum = sum + item.getSum();
-            times = times + item.getTimes();
-        }
-
-        return new Histogram.HistorgramPair(sum, times);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak
deleted file mode 100755
index 6f82888..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/AvgMerger.java.bak
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alibaba.jstorm.utils.Pair;
-import com.google.common.util.concurrent.AtomicDouble;
-
-public class AvgMerger2 implements Merger<Pair<AtomicDouble, AtomicLong>> {
-    private static final long serialVersionUID = -3892281208959055221L;
-
-    @Override
-    public Pair<AtomicDouble, AtomicLong> merge(
-            Collection<Pair<AtomicDouble, AtomicLong>> objs,
-            Pair<AtomicDouble, AtomicLong> unflushed, Object... others) {
-        // TODO Auto-generated method stub
-        AtomicDouble sum = new AtomicDouble(0.0);
-        AtomicLong times = new AtomicLong(0);
-
-        if (unflushed != null) {
-            sum.addAndGet(unflushed.getFirst().get());
-            times.addAndGet(unflushed.getSecond().get());
-        }
-
-        for (Pair<AtomicDouble, AtomicLong> item : objs) {
-            if (item == null) {
-                continue;
-            }
-            sum.addAndGet(item.getFirst().get());
-            times.addAndGet(item.getSecond().get());
-        }
-
-        return new Pair<AtomicDouble, AtomicLong>(sum, times);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java
deleted file mode 100755
index 30ded34..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/LongSumMerger.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class LongSumMerger implements Merger<AtomicLong> {
-    private static final long serialVersionUID = -3500779273677666691L;
-
-    @Override
-    public AtomicLong merge(Collection<AtomicLong> objs, AtomicLong unflushed,
-            Object... others) {
-        AtomicLong ret = new AtomicLong(0);
-        if (unflushed != null) {
-            ret.addAndGet(unflushed.get());
-        }
-
-        for (AtomicLong item : objs) {
-            if (item == null) {
-                continue;
-            }
-            ret.addAndGet(item.get());
-        }
-        return ret;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java
deleted file mode 100755
index 0483458..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/Merger.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-public interface Merger<V> extends Serializable {
-    V merge(Collection<V> objs, V unflushed, Object... others);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java
deleted file mode 100755
index ead3c53..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/SumMerger.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.util.Collection;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-public class SumMerger<T extends Number> implements Merger<T> {
-    private static final long serialVersionUID = -7026523452570138433L;
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public T merge(Collection<T> objs, T unflushed, Object... others) {
-        // TODO Auto-generated method stub
-        T ret = unflushed;
-        for (T obj : objs) {
-            ret = (T) JStormUtils.add(ret, obj);
-        }
-
-        return ret;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java
deleted file mode 100755
index 859f642..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/merger/TpsMerger.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.merger;
-
-import java.util.Collection;
-
-import com.alibaba.jstorm.common.metric.operator.StartTime;
-
-public class TpsMerger implements Merger<Double> {
-    private static final long serialVersionUID = -4534840881635955942L;
-    protected final long createTime;
-
-    public TpsMerger() {
-        createTime = System.currentTimeMillis();
-    }
-
-    public long getRunMillis(Object... args) {
-        long startTime = createTime;
-
-        if (args != null) {
-            if (args[0] != null && args[0] instanceof StartTime) {
-                StartTime rollingWindow = (StartTime) args[0];
-
-                startTime = rollingWindow.getStartTime();
-            }
-        }
-
-        return (System.currentTimeMillis() - startTime);
-    }
-
-    @Override
-    public Double merge(Collection<Double> objs, Double unflushed,
-            Object... others) {
-        // TODO Auto-generated method stub
-        double sum = 0.0d;
-        if (unflushed != null) {
-            sum += unflushed;
-        }
-
-        for (Double item : objs) {
-            if (item != null) {
-                sum += item;
-            }
-        }
-
-        Double ret = (sum * 1000) / getRunMillis(others);
-        return ret;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java
deleted file mode 100755
index 4fdf813..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AddUpdater.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import com.alibaba.jstorm.utils.JStormUtils;
-
-public class AddUpdater<T extends Number> implements Updater<T> {
-    private static final long serialVersionUID = -7955740095421752763L;
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public T update(Number object, T cache, Object... others) {
-        // TODO Auto-generated method stub
-        return (T) JStormUtils.add(cache, object);
-    }
-
-    @Override
-    public T updateBatch(T object, T cache, Object... objects) {
-        // TODO Auto-generated method stub
-        return (T) JStormUtils.add(cache, object);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java
deleted file mode 100755
index 30ae46c..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import com.alibaba.jstorm.common.metric.Histogram;
-
-public class AvgUpdater implements Updater<Histogram.HistorgramPair> {
-    private static final long serialVersionUID = 2562836921724586449L;
-
-    @Override
-    public Histogram.HistorgramPair update(Number object,
-            Histogram.HistorgramPair cache, Object... others) {
-        // TODO Auto-generated method stub
-        if (object == null) {
-            return cache;
-        }
-        if (cache == null) {
-            cache =
-                    new Histogram.HistorgramPair();
-        }
-
-        cache.addValue(object.doubleValue());
-        cache.addTimes(1l);
-
-        return cache;
-    }
-
-    @Override
-    public Histogram.HistorgramPair updateBatch(
-            Histogram.HistorgramPair object,
-            Histogram.HistorgramPair cache, Object... objects) {
-        // TODO Auto-generated method stub
-        if (object == null) {
-            return cache;
-        }
-        if (cache == null) {
-            cache =
-                    new Histogram.HistorgramPair();
-        }
-
-        cache.addValue(object.getSum());
-        cache.addTimes(object.getTimes());
-
-        return cache;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak
deleted file mode 100755
index 44cc70d..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/AvgUpdater.java.bak
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.alibaba.jstorm.utils.Pair;
-import com.google.common.util.concurrent.AtomicDouble;
-
-public class AvgUpdater2 implements Updater<Pair<AtomicDouble, AtomicLong>> {
-    private static final long serialVersionUID = 2562836921724586449L;
-
-    @Override
-    public Pair<AtomicDouble, AtomicLong> update(Number object,
-            Pair<AtomicDouble, AtomicLong> cache, Object... others) {
-        // TODO Auto-generated method stub
-        if (object == null) {
-            return cache;
-        }
-        if (cache == null) {
-            cache =
-                    new Pair<AtomicDouble, AtomicLong>(new AtomicDouble(0.0),
-                            new AtomicLong(0));
-        }
-
-        AtomicDouble sum = cache.getFirst();
-        AtomicLong times = cache.getSecond();
-
-        sum.addAndGet(object.doubleValue());
-        times.incrementAndGet();
-
-        return cache;
-    }
-
-    @Override
-    public Pair<AtomicDouble, AtomicLong> updateBatch(
-            Pair<AtomicDouble, AtomicLong> object,
-            Pair<AtomicDouble, AtomicLong> cache, Object... objects) {
-        // TODO Auto-generated method stub
-        if (object == null) {
-            return cache;
-        }
-        if (cache == null) {
-            cache =
-                    new Pair<AtomicDouble, AtomicLong>(new AtomicDouble(0.0),
-                            new AtomicLong(0));
-        }
-
-        AtomicDouble sum = cache.getFirst();
-        AtomicLong times = cache.getSecond();
-
-        sum.addAndGet(object.getFirst().get());
-        times.addAndGet(object.getSecond().get());
-
-        return cache;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java
deleted file mode 100755
index e3b640a..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/DoubleAddUpdater.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import com.google.common.util.concurrent.AtomicDouble;
-
-public class DoubleAddUpdater implements Updater<AtomicDouble> {
-    private static final long serialVersionUID = -1293565961076552462L;
-
-    @Override
-    public AtomicDouble update(Number object, AtomicDouble cache,
-            Object... others) {
-        // TODO Auto-generated method stub
-        if (cache == null) {
-            cache = new AtomicDouble(0.0);
-        }
-        if (object != null) {
-            cache.addAndGet(object.doubleValue());
-        }
-        return cache;
-    }
-
-    @Override
-    public AtomicDouble updateBatch(AtomicDouble object, AtomicDouble cache,
-            Object... objects) {
-        // TODO Auto-generated method stub
-        return update(object, cache, objects);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java
deleted file mode 100755
index 4986146..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/LongAddUpdater.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-public class LongAddUpdater implements Updater<AtomicLong> {
-    private static final long serialVersionUID = -2185639264737912405L;
-
-    @Override
-    public AtomicLong update(Number object, AtomicLong cache, Object... others) {
-        // TODO Auto-generated method stub
-        if (cache == null) {
-            cache = new AtomicLong(0);
-        }
-
-        if (object != null) {
-            cache.addAndGet(object.longValue());
-        }
-        return cache;
-    }
-
-    @Override
-    public AtomicLong updateBatch(AtomicLong object, AtomicLong cache,
-            Object... objects) {
-        // TODO Auto-generated method stub
-        return update(object, cache, objects);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java
deleted file mode 100755
index cb22c4c..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/operator/updater/Updater.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.operator.updater;
-
-import java.io.Serializable;
-
-public interface Updater<V> extends Serializable {
-    V update(Number object, V cache, Object... others);
-    V updateBatch(V object, V cache, Object... objects );
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java
new file mode 100644
index 0000000..2f79141
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmCounterSnapshot.java
@@ -0,0 +1,20 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public class AsmCounterSnapshot extends AsmSnapshot {
+    private static final long serialVersionUID = -7574994037947802582L;
+
+    private long v;
+
+    public long getV() {
+        return v;
+    }
+
+    public AsmSnapshot setValue(long value) {
+        this.v = value;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java
new file mode 100644
index 0000000..221b5b1
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmGaugeSnapshot.java
@@ -0,0 +1,20 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public class AsmGaugeSnapshot extends AsmSnapshot {
+    private static final long serialVersionUID = 3216517772824794848L;
+
+    private double v;
+
+    public double getV() {
+        return v;
+    }
+
+    public AsmSnapshot setValue(double value) {
+        this.v = value;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java
new file mode 100644
index 0000000..51ac3f5
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmHistogramSnapshot.java
@@ -0,0 +1,22 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+import com.codahale.metrics.Snapshot;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public class AsmHistogramSnapshot extends AsmSnapshot {
+    private static final long serialVersionUID = 7284437562594156565L;
+
+    private Snapshot snapshot;
+
+    public Snapshot getSnapshot() {
+        return snapshot;
+    }
+
+    public AsmSnapshot setSnapshot(Snapshot snapshot) {
+        this.snapshot = snapshot;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java
new file mode 100644
index 0000000..e255e6b
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmMeterSnapshot.java
@@ -0,0 +1,50 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public class AsmMeterSnapshot extends AsmSnapshot {
+    private static final long serialVersionUID = -1754325312045025810L;
+
+    private double m1;
+    private double m5;
+    private double m15;
+    private double mean;
+
+    public double getM1() {
+        return m1;
+    }
+
+    public AsmMeterSnapshot setM1(double m1) {
+        this.m1 = m1;
+        return this;
+    }
+
+    public double getM5() {
+        return m5;
+    }
+
+    public AsmMeterSnapshot setM5(double m5) {
+        this.m5 = m5;
+        return this;
+    }
+
+    public double getM15() {
+        return m15;
+    }
+
+    public AsmMeterSnapshot setM15(double m15) {
+        this.m15 = m15;
+        return this;
+    }
+
+    public double getMean() {
+        return mean;
+    }
+
+    public AsmMeterSnapshot setMean(double mean) {
+        this.mean = mean;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java
new file mode 100644
index 0000000..4c71fe9
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmSnapshot.java
@@ -0,0 +1,32 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+import java.io.Serializable;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public abstract class AsmSnapshot implements Serializable {
+    private static final long serialVersionUID = 1945719653840917619L;
+
+    private long metricId;
+    private long ts;
+
+    public long getTs() {
+        return ts;
+    }
+
+    public AsmSnapshot setTs(long ts) {
+        this.ts = ts;
+        return this;
+    }
+
+    public long getMetricId() {
+        return metricId;
+    }
+
+    public AsmSnapshot setMetricId(long metricId) {
+        this.metricId = metricId;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java
new file mode 100644
index 0000000..6fec50c
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/snapshot/AsmTimerSnapshot.java
@@ -0,0 +1,32 @@
+package com.alibaba.jstorm.common.metric.snapshot;
+
+import com.codahale.metrics.Snapshot;
+
+/**
+ * @author wange
+ * @since 15/6/5
+ */
+public class AsmTimerSnapshot extends AsmSnapshot {
+    private static final long serialVersionUID = 7784062881728741781L;
+
+    private Snapshot histogram;
+    private AsmMeterSnapshot meter;
+
+    public Snapshot getHistogram() {
+        return histogram;
+    }
+
+    public AsmTimerSnapshot setHistogram(Snapshot snapshot) {
+        this.histogram = snapshot;
+        return this;
+    }
+
+    public AsmMeterSnapshot getMeter() {
+        return meter;
+    }
+
+    public AsmTimerSnapshot setMeter(AsmMeterSnapshot meter) {
+        this.meter = meter;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java
deleted file mode 100755
index 8475e4c..0000000
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/window/AllWindow.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.common.metric.window;
-
-import java.util.ArrayList;
-
-import com.alibaba.jstorm.common.metric.operator.Sampling;
-import com.alibaba.jstorm.common.metric.operator.StartTime;
-import com.alibaba.jstorm.common.metric.operator.merger.Merger;
-import com.alibaba.jstorm.common.metric.operator.updater.Updater;
-
-public class AllWindow<V> implements Sampling<V>, StartTime {
-
-    private static final long serialVersionUID = -8523514907315740812L;
-
-    protected V unflushed;
-    protected V defaultValue;
-
-    protected Updater<V> updater;
-    protected Merger<V> merger;
-    protected long startTime;
-
-    AllWindow(V defaultValue, Updater<V> updater, Merger<V> merger) {
-
-        this.updater = updater;
-        this.merger = merger;
-
-        this.defaultValue = defaultValue;
-        this.startTime = System.currentTimeMillis();
-    }
-
-    @Override
-    public void update(Number obj) {
-        // TODO Auto-generated method stub
-        synchronized (this) {
-            unflushed = updater.update(obj, unflushed);
-        }
-    }
-    
-    public void updateBatch(V batch) {
-        synchronized (this) {
-            unflushed = updater.updateBatch(batch, unflushed);
-        }
-    }
-
-    @Override
-    public V getSnapshot() {
-        // TODO Auto-generated method stub
-        V ret = merger.merge(new ArrayList<V>(), unflushed, this);
-        if (ret == null) {
-            return defaultValue;
-        } else {
-            return ret;
-        }
-    }
-
-    @Override
-    public long getStartTime() {
-        // TODO Auto-generated method stub
-        return startTime;
-    }
-
-}