You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:47 UTC

[09/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricSet.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricSet.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricSet.java
new file mode 100644
index 0000000..709db27
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmMetricSet.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.metric;
+
+import com.alibaba.jstorm.common.metric.AsmMetric;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public interface AsmMetricSet extends Serializable {
+    Map<String, AsmMetric> getMetrics();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmWindow.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmWindow.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmWindow.java
new file mode 100644
index 0000000..ecb69d6
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/AsmWindow.java
@@ -0,0 +1,41 @@
+package com.alibaba.jstorm.metric;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class AsmWindow {
+    public static final Integer M1_WINDOW = 60;
+    public static final Integer M10_WINDOW = 600;
+    public static final Integer H2_WINDOW = 7200;
+    public static final Integer D1_WINDOW = 86400;
+
+    public static final String M1_WINDOW_STR = "0d0h1m0s";
+    public static final String M10_WINDOW_STR = "0d0h10m0s";
+    public static final String H2_WINDOW_STR = "0d2h0m0s";
+    public static final String D1_WINDOW_STR = "1d0h0m0s";
+
+    public static final Set<Integer> TIME_WINDOWS = new TreeSet<Integer>();
+    private static final Map<Integer, String> WIN_TO_STR = new HashMap<Integer, String>();
+
+    static {
+        TIME_WINDOWS.add(M1_WINDOW);
+        TIME_WINDOWS.add(M10_WINDOW);
+        TIME_WINDOWS.add(H2_WINDOW);
+        TIME_WINDOWS.add(D1_WINDOW);
+
+        WIN_TO_STR.put(M1_WINDOW, M1_WINDOW_STR);
+        WIN_TO_STR.put(M10_WINDOW, M10_WINDOW_STR);
+        WIN_TO_STR.put(H2_WINDOW, H2_WINDOW_STR);
+        WIN_TO_STR.put(D1_WINDOW, D1_WINDOW_STR);
+    }
+
+    public static String win2str(Integer win) {
+        return WIN_TO_STR.get(win);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/Bytes.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/Bytes.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/Bytes.java
new file mode 100644
index 0000000..290f813
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/Bytes.java
@@ -0,0 +1,842 @@
+package com.alibaba.jstorm.metric;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+public class Bytes {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Bytes.class);
+
+    /**
+     * Size of boolean in bytes
+     */
+    public static final int SIZEOF_BOOLEAN = Byte.SIZE / Byte.SIZE;
+
+    /**
+     * Size of byte in bytes
+     */
+    public static final int SIZEOF_BYTE = SIZEOF_BOOLEAN;
+
+    /**
+     * Size of char in bytes
+     */
+    public static final int SIZEOF_CHAR = Character.SIZE / Byte.SIZE;
+
+    /**
+     * Size of double in bytes
+     */
+    public static final int SIZEOF_DOUBLE = Double.SIZE / Byte.SIZE;
+
+    /**
+     * Size of float in bytes
+     */
+    public static final int SIZEOF_FLOAT = Float.SIZE / Byte.SIZE;
+
+    /**
+     * Size of int in bytes
+     */
+    public static final int SIZEOF_INT = Integer.SIZE / Byte.SIZE;
+
+    /**
+     * Size of long in bytes
+     */
+    public static final int SIZEOF_LONG = Long.SIZE / Byte.SIZE;
+
+    /**
+     * Size of short in bytes
+     */
+    public static final int SIZEOF_SHORT = Short.SIZE / Byte.SIZE;
+
+
+    /**
+     * Estimate of size cost to pay beyond payload in jvm for instance of byte [].
+     * Estimate based on study of jhat and jprofiler numbers.
+     */
+    // JHat says BU is 56 bytes.
+    // SizeOf which uses java.lang.instrument says 24 bytes. (3 longs?)
+    public static final int ESTIMATED_HEAP_TAX = 16;
+
+
+    /**
+     * Put bytes at the specified byte array position.
+     *
+     * @param tgtBytes  the byte array
+     * @param tgtOffset position in the array
+     * @param srcBytes  array to write out
+     * @param srcOffset source offset
+     * @param srcLength source length
+     * @return incremented offset
+     */
+    public static int putBytes(byte[] tgtBytes, int tgtOffset, byte[] srcBytes,
+                               int srcOffset, int srcLength) {
+        System.arraycopy(srcBytes, srcOffset, tgtBytes, tgtOffset, srcLength);
+        return tgtOffset + srcLength;
+    }
+
+    /**
+     * Write a single byte out to the specified byte array position.
+     *
+     * @param bytes  the byte array
+     * @param offset position in the array
+     * @param b      byte to write out
+     * @return incremented offset
+     */
+    public static int putByte(byte[] bytes, int offset, byte b) {
+        bytes[offset] = b;
+        return offset + 1;
+    }
+
+    /**
+     * Returns a new byte array, copied from the passed ByteBuffer.
+     *
+     * @param bb A ByteBuffer
+     * @return the byte array
+     */
+    public static byte[] toBytes(ByteBuffer bb) {
+        int length = bb.limit();
+        byte[] result = new byte[length];
+        System.arraycopy(bb.array(), bb.arrayOffset(), result, 0, length);
+        return result;
+    }
+
+    public static byte[] copyBytes(final byte[] bytes, int offset, int length) {
+        if (offset + length > bytes.length) {
+            throw explainWrongLengthOrOffset(bytes, offset, length, length);
+        }
+        byte[] result = new byte[length];
+        System.arraycopy(bytes, offset, result, 0, length);
+        return result;
+    }
+
+    /**
+     * Write a printable representation of a byte array.
+     *
+     * @param b byte array
+     * @return string
+     * @see #toStringBinary(byte[], int, int)
+     */
+    public static String toStringBinary(final byte[] b) {
+        if (b == null)
+            return "null";
+        return toStringBinary(b, 0, b.length);
+    }
+
+    /**
+     * Converts the given byte buffer, from its array offset to its limit, to
+     * a string. The position and the mark are ignored.
+     *
+     * @param buf a byte buffer
+     * @return a string representation of the buffer's binary contents
+     */
+    public static String toStringBinary(ByteBuffer buf) {
+        if (buf == null)
+            return "null";
+        return toStringBinary(buf.array(), buf.arrayOffset(), buf.limit());
+    }
+
+    /**
+     * Write a printable representation of a byte array. Non-printable
+     * characters are hex escaped in the format \\x%02X, eg:
+     * \x00 \x05 etc
+     *
+     * @param b   array to write out
+     * @param off offset to start at
+     * @param len length to write
+     * @return string output
+     */
+    public static String toStringBinary(final byte[] b, int off, int len) {
+        StringBuilder result = new StringBuilder();
+        try {
+            String first = new String(b, off, len, "ISO-8859-1");
+            for (int i = 0; i < first.length(); ++i) {
+                int ch = first.charAt(i) & 0xFF;
+                if ((ch >= '0' && ch <= '9')
+                        || (ch >= 'A' && ch <= 'Z')
+                        || (ch >= 'a' && ch <= 'z')
+                        || " `~!@#$%^&*()-_=+[]{}\\|;:'\",.<>/?".indexOf(ch) >= 0) {
+                    result.append(first.charAt(i));
+                } else {
+                    result.append(String.format("\\x%02X", ch));
+                }
+            }
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("ISO-8859-1 not supported?", e);
+        }
+        return result.toString();
+    }
+
+    private static boolean isHexDigit(char c) {
+        return
+                (c >= 'A' && c <= 'F') ||
+                        (c >= '0' && c <= '9');
+    }
+
+    /**
+     * Takes a ASCII digit in the range A-F0-9 and returns
+     * the corresponding integer/ordinal value.
+     *
+     * @param ch The hex digit.
+     * @return The converted hex value as a byte.
+     */
+    public static byte toBinaryFromHex(byte ch) {
+        if (ch >= 'A' && ch <= 'F')
+            return (byte) ((byte) 10 + (byte) (ch - 'A'));
+        // else
+        return (byte) (ch - '0');
+    }
+
+    public static byte[] toBytesBinary(String in) {
+        // this may be bigger than we need, but lets be safe.
+        byte[] b = new byte[in.length()];
+        int size = 0;
+        for (int i = 0; i < in.length(); ++i) {
+            char ch = in.charAt(i);
+            if (ch == '\\' && in.length() > i + 1 && in.charAt(i + 1) == 'x') {
+                // ok, take next 2 hex digits.
+                char hd1 = in.charAt(i + 2);
+                char hd2 = in.charAt(i + 3);
+
+                // they need to be A-F0-9:
+                if (!isHexDigit(hd1) ||
+                        !isHexDigit(hd2)) {
+                    // bogus escape code, ignore:
+                    continue;
+                }
+                // turn hex ASCII digit -> number
+                byte d = (byte) ((toBinaryFromHex((byte) hd1) << 4) + toBinaryFromHex((byte) hd2));
+
+                b[size++] = d;
+                i += 3; // skip 3
+            } else {
+                b[size++] = (byte) ch;
+            }
+        }
+        // resize:
+        byte[] b2 = new byte[size];
+        System.arraycopy(b, 0, b2, 0, size);
+        return b2;
+    }
+
+    /**
+     * Convert a boolean to a byte array. True becomes -1
+     * and false becomes 0.
+     *
+     * @param b value
+     * @return <code>b</code> encoded in a byte array.
+     */
+    public static byte[] toBytes(final boolean b) {
+        return new byte[]{b ? (byte) -1 : (byte) 0};
+    }
+
+    /**
+     * Reverses {@link #toBytes(boolean)}
+     *
+     * @param b array
+     * @return True or false.
+     */
+    public static boolean toBoolean(final byte[] b) {
+        if (b.length != 1) {
+            throw new IllegalArgumentException("Array has wrong size: " + b.length);
+        }
+        return b[0] != (byte) 0;
+    }
+
+    public static boolean toBoolean(final byte[] bytes, int offset, int length) {
+        if (length != SIZEOF_BOOLEAN || offset + length > bytes.length) {
+            throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_BOOLEAN);
+        }
+        return bytes[offset] != (byte) 0;
+    }
+
+    /**
+     * Convert a long value to a byte array using big-endian.
+     *
+     * @param val value to convert
+     * @return the byte array
+     */
+    public static byte[] toBytes(long val) {
+        byte[] b = new byte[8];
+        for (int i = 7; i > 0; i--) {
+            b[i] = (byte) val;
+            val >>>= 8;
+        }
+        b[0] = (byte) val;
+        return b;
+    }
+
+    /**
+     * Converts a byte array to a long value. Reverses
+     * {@link #toBytes(long)}
+     *
+     * @param bytes array
+     * @return the long value
+     */
+    public static long toLong(byte[] bytes) {
+        return toLong(bytes, 0, SIZEOF_LONG);
+    }
+
+    /**
+     * Converts a byte array to a long value. Assumes there will be
+     * {@link #SIZEOF_LONG} bytes available.
+     *
+     * @param bytes  bytes
+     * @param offset offset
+     * @return the long value
+     */
+    public static long toLong(byte[] bytes, int offset) {
+        return toLong(bytes, offset, SIZEOF_LONG);
+    }
+
+    /**
+     * Converts a byte array to a long value.
+     *
+     * @param bytes  array of bytes
+     * @param offset offset into array
+     * @param length length of data (must be {@link #SIZEOF_LONG})
+     * @return the long value
+     * @throws IllegalArgumentException if length is not {@link #SIZEOF_LONG} or
+     *                                  if there's not enough room in the array at the offset indicated.
+     */
+    public static long toLong(byte[] bytes, int offset, final int length) {
+        if (length != SIZEOF_LONG || offset + length > bytes.length) {
+            throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_LONG);
+        }
+        long l = 0;
+        for (int i = offset; i < offset + length; i++) {
+            l <<= 8;
+            l ^= bytes[i] & 0xFF;
+        }
+        return l;
+    }
+
+    private static IllegalArgumentException
+    explainWrongLengthOrOffset(final byte[] bytes,
+                               final int offset,
+                               final int length,
+                               final int expectedLength) {
+        String reason;
+        if (length != expectedLength) {
+            reason = "Wrong length: " + length + ", expected " + expectedLength;
+        } else {
+            reason = "offset (" + offset + ") + length (" + length + ") exceed the"
+                    + " capacity of the array: " + bytes.length;
+        }
+        return new IllegalArgumentException(reason);
+    }
+
+    /**
+     * Put a long value out to the specified byte array position.
+     *
+     * @param bytes  the byte array
+     * @param offset position in the array
+     * @param val    long to write out
+     * @return incremented offset
+     * @throws IllegalArgumentException if the byte array given doesn't have
+     *                                  enough room at the offset specified.
+     */
+    public static int putLong(byte[] bytes, int offset, long val) {
+        if (bytes.length - offset < SIZEOF_LONG) {
+            throw new IllegalArgumentException("Not enough room to put a long at"
+                    + " offset " + offset + " in a " + bytes.length + " byte array");
+        }
+        for (int i = offset + 7; i > offset; i--) {
+            bytes[i] = (byte) val;
+            val >>>= 8;
+        }
+        bytes[offset] = (byte) val;
+        return offset + SIZEOF_LONG;
+    }
+
+    /**
+     * Presumes float encoded as IEEE 754 floating-point "single format"
+     *
+     * @param bytes byte array
+     * @return Float made from passed byte array.
+     */
+    public static float toFloat(byte[] bytes) {
+        return toFloat(bytes, 0);
+    }
+
+    /**
+     * Presumes float encoded as IEEE 754 floating-point "single format"
+     *
+     * @param bytes  array to convert
+     * @param offset offset into array
+     * @return Float made from passed byte array.
+     */
+    public static float toFloat(byte[] bytes, int offset) {
+        return Float.intBitsToFloat(toInt(bytes, offset, SIZEOF_INT));
+    }
+
+    /**
+     * @param bytes  byte array
+     * @param offset offset to write to
+     * @param f      float value
+     * @return New offset in <code>bytes</code>
+     */
+    public static int putFloat(byte[] bytes, int offset, float f) {
+        return putInt(bytes, offset, Float.floatToRawIntBits(f));
+    }
+
+    /**
+     * @param f float value
+     * @return the float represented as byte []
+     */
+    public static byte[] toBytes(final float f) {
+        // Encode it as int
+        return Bytes.toBytes(Float.floatToRawIntBits(f));
+    }
+
+    /**
+     * @param bytes byte array
+     * @return Return double made from passed bytes.
+     */
+    public static double toDouble(final byte[] bytes) {
+        return toDouble(bytes, 0);
+    }
+
+    /**
+     * @param bytes  byte array
+     * @param offset offset where double is
+     * @return Return double made from passed bytes.
+     */
+    public static double toDouble(final byte[] bytes, final int offset) {
+        return Double.longBitsToDouble(toLong(bytes, offset, SIZEOF_LONG));
+    }
+
+    /**
+     * @param bytes  byte array
+     * @param offset offset to write to
+     * @param d      value
+     * @return New offset into array <code>bytes</code>
+     */
+    public static int putDouble(byte[] bytes, int offset, double d) {
+        return putLong(bytes, offset, Double.doubleToLongBits(d));
+    }
+
+    /**
+     * Serialize a double as the IEEE 754 double format output. The resultant
+     * array will be 8 bytes long.
+     *
+     * @param d value
+     * @return the double represented as byte []
+     */
+    public static byte[] toBytes(final double d) {
+        // Encode it as a long
+        return Bytes.toBytes(Double.doubleToRawLongBits(d));
+    }
+
+    /**
+     * Convert an int value to a byte array
+     *
+     * @param val value
+     * @return the byte array
+     */
+    public static byte[] toBytes(int val) {
+        byte[] b = new byte[4];
+        for (int i = 3; i > 0; i--) {
+            b[i] = (byte) val;
+            val >>>= 8;
+        }
+        b[0] = (byte) val;
+        return b;
+    }
+
+    /**
+     * Converts a byte array to an int value
+     *
+     * @param bytes byte array
+     * @return the int value
+     */
+    public static int toInt(byte[] bytes) {
+        return toInt(bytes, 0, SIZEOF_INT);
+    }
+
+    /**
+     * Converts a byte array to an int value
+     *
+     * @param bytes  byte array
+     * @param offset offset into array
+     * @return the int value
+     */
+    public static int toInt(byte[] bytes, int offset) {
+        return toInt(bytes, offset, SIZEOF_INT);
+    }
+
+    /**
+     * Converts a byte array to an int value
+     *
+     * @param bytes  byte array
+     * @param offset offset into array
+     * @param length length of int (has to be {@link #SIZEOF_INT})
+     * @return the int value
+     * @throws IllegalArgumentException if length is not {@link #SIZEOF_INT} or
+     *                                  if there's not enough room in the array at the offset indicated.
+     */
+    public static int toInt(byte[] bytes, int offset, final int length) {
+        if (length != SIZEOF_INT || offset + length > bytes.length) {
+            throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_INT);
+        }
+        int n = 0;
+        for (int i = offset; i < (offset + length); i++) {
+            n <<= 8;
+            n ^= bytes[i] & 0xFF;
+        }
+        return n;
+    }
+
+    /**
+     * Put an int value out to the specified byte array position.
+     *
+     * @param bytes  the byte array
+     * @param offset position in the array
+     * @param val    int to write out
+     * @return incremented offset
+     * @throws IllegalArgumentException if the byte array given doesn't have
+     *                                  enough room at the offset specified.
+     */
+    public static int putInt(byte[] bytes, int offset, int val) {
+        if (bytes.length - offset < SIZEOF_INT) {
+            throw new IllegalArgumentException("Not enough room to put an int at"
+                    + " offset " + offset + " in a " + bytes.length + " byte array");
+        }
+        for (int i = offset + 3; i > offset; i--) {
+            bytes[i] = (byte) val;
+            val >>>= 8;
+        }
+        bytes[offset] = (byte) val;
+        return offset + SIZEOF_INT;
+    }
+
+    /**
+     * Convert a short value to a byte array of {@link #SIZEOF_SHORT} bytes long.
+     *
+     * @param val value
+     * @return the byte array
+     */
+    public static byte[] toBytes(short val) {
+        byte[] b = new byte[SIZEOF_SHORT];
+        b[1] = (byte) val;
+        val >>= 8;
+        b[0] = (byte) val;
+        return b;
+    }
+
+    /**
+     * Converts a byte array to a short value
+     *
+     * @param bytes byte array
+     * @return the short value
+     */
+    public static short toShort(byte[] bytes) {
+        return toShort(bytes, 0, SIZEOF_SHORT);
+    }
+
+    /**
+     * Converts a byte array to a short value
+     *
+     * @param bytes  byte array
+     * @param offset offset into array
+     * @return the short value
+     */
+    public static short toShort(byte[] bytes, int offset) {
+        return toShort(bytes, offset, SIZEOF_SHORT);
+    }
+
+    /**
+     * Converts a byte array to a short value
+     *
+     * @param bytes  byte array
+     * @param offset offset into array
+     * @param length length, has to be {@link #SIZEOF_SHORT}
+     * @return the short value
+     * @throws IllegalArgumentException if length is not {@link #SIZEOF_SHORT}
+     *                                  or if there's not enough room in the array at the offset indicated.
+     */
+    public static short toShort(byte[] bytes, int offset, final int length) {
+        if (length != SIZEOF_SHORT || offset + length > bytes.length) {
+            throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_SHORT);
+        }
+        short n = 0;
+        n ^= bytes[offset] & 0xFF;
+        n <<= 8;
+        n ^= bytes[offset + 1] & 0xFF;
+        return n;
+    }
+
+    /**
+     * This method will get a sequence of bytes from pos -> limit,
+     * but will restore pos after.
+     *
+     * @param buf
+     * @return byte array
+     */
+    public static byte[] getBytes(ByteBuffer buf) {
+        int savedPos = buf.position();
+        byte[] newBytes = new byte[buf.remaining()];
+        buf.get(newBytes);
+        buf.position(savedPos);
+        return newBytes;
+    }
+
+    /**
+     * Put a short value out to the specified byte array position.
+     *
+     * @param bytes  the byte array
+     * @param offset position in the array
+     * @param val    short to write out
+     * @return incremented offset
+     * @throws IllegalArgumentException if the byte array given doesn't have
+     *                                  enough room at the offset specified.
+     */
+    public static int putShort(byte[] bytes, int offset, short val) {
+        if (bytes.length - offset < SIZEOF_SHORT) {
+            throw new IllegalArgumentException("Not enough room to put a short at"
+                    + " offset " + offset + " in a " + bytes.length + " byte array");
+        }
+        bytes[offset + 1] = (byte) val;
+        val >>= 8;
+        bytes[offset] = (byte) val;
+        return offset + SIZEOF_SHORT;
+    }
+
+    public static byte toByte(byte[] bytes, int offset, int length) {
+        if (length != SIZEOF_BYTE || offset + length > bytes.length) {
+            throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_BYTE);
+        }
+        return bytes[offset];
+    }
+
+
+    /**
+     * Convert a BigDecimal value to a byte array
+     *
+     * @param val
+     * @return the byte array
+     */
+    public static byte[] toBytes(BigDecimal val) {
+        byte[] valueBytes = val.unscaledValue().toByteArray();
+        byte[] result = new byte[valueBytes.length + SIZEOF_INT];
+        int offset = putInt(result, 0, val.scale());
+        putBytes(result, offset, valueBytes, 0, valueBytes.length);
+        return result;
+    }
+
+
+    /**
+     * Converts a byte array to a BigDecimal
+     *
+     * @param bytes
+     * @return the char value
+     */
+    public static BigDecimal toBigDecimal(byte[] bytes) {
+        return toBigDecimal(bytes, 0, bytes.length);
+    }
+
+    /**
+     * Converts a byte array to a BigDecimal value
+     *
+     * @param bytes
+     * @param offset
+     * @param length
+     * @return the char value
+     */
+    public static BigDecimal toBigDecimal(byte[] bytes, int offset, final int length) {
+        if (bytes == null || length < SIZEOF_INT + 1 ||
+                (offset + length > bytes.length)) {
+            return null;
+        }
+
+        int scale = toInt(bytes, offset);
+        byte[] tcBytes = new byte[length - SIZEOF_INT];
+        System.arraycopy(bytes, offset + SIZEOF_INT, tcBytes, 0, length - SIZEOF_INT);
+        return new BigDecimal(new BigInteger(tcBytes), scale);
+    }
+
+    /**
+     * Put a BigDecimal value out to the specified byte array position.
+     *
+     * @param bytes  the byte array
+     * @param offset position in the array
+     * @param val    BigDecimal to write out
+     * @return incremented offset
+     */
+    public static int putBigDecimal(byte[] bytes, int offset, BigDecimal val) {
+        if (bytes == null) {
+            return offset;
+        }
+
+        byte[] valueBytes = val.unscaledValue().toByteArray();
+        byte[] result = new byte[valueBytes.length + SIZEOF_INT];
+        offset = putInt(result, offset, val.scale());
+        return putBytes(result, offset, valueBytes, 0, valueBytes.length);
+    }
+
+    /**
+     * @param left  left operand
+     * @param right right operand
+     * @return 0 if equal, < 0 if left is less than right, etc.
+     */
+    public static int compareTo(final byte[] left, final byte[] right) {
+        return compareByteArrayInLexOrder(left, 0, left.length, right, 0, right.length);
+    }
+
+    /**
+     * Lexicographically compare two arrays.
+     *
+     * @param buffer1 left operand
+     * @param buffer2 right operand
+     * @param offset1 Where to start comparing in the left buffer
+     * @param offset2 Where to start comparing in the right buffer
+     * @param length1 How much to compare from the left buffer
+     * @param length2 How much to compare from the right buffer
+     * @return 0 if equal, < 0 if left is less than right, etc.
+     */
+    public static int compareTo(byte[] buffer1, int offset1, int length1,
+                                byte[] buffer2, int offset2, int length2) {
+        return compareByteArrayInLexOrder(buffer1, offset1, length1, buffer2, offset2, length2);
+    }
+
+    public static int compareByteArrayInLexOrder(byte[] buffer1, int offset1, int length1,
+                                                 byte[] buffer2, int offset2, int length2) {
+        // Short circuit equal case
+        if (buffer1 == buffer2 &&
+                offset1 == offset2 &&
+                length1 == length2) {
+            return 0;
+        }
+        // Bring WritableComparator code local
+        int end1 = offset1 + length1;
+        int end2 = offset2 + length2;
+        for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
+            int a = (buffer1[i] & 0xff);
+            int b = (buffer2[j] & 0xff);
+            if (a != b) {
+                return a - b;
+            }
+        }
+        return length1 - length2;
+    }
+
+    /**
+     * @param left  left operand
+     * @param right right operand
+     * @return True if equal
+     */
+    public static boolean equals(final byte[] left, final byte[] right) {
+        // Could use Arrays.equals?
+        //noinspection SimplifiableConditionalExpression
+        if (left == right) return true;
+        if (left == null || right == null) return false;
+        if (left.length != right.length) return false;
+        if (left.length == 0) return true;
+
+        // Since we're often comparing adjacent sorted data,
+        // it's usual to have equal arrays except for the very last byte
+        // so check that first
+        if (left[left.length - 1] != right[right.length - 1]) return false;
+
+        return compareTo(left, right) == 0;
+    }
+
+    public static boolean equals(final byte[] left, int leftOffset, int leftLen,
+                                 final byte[] right, int rightOffset, int rightLen) {
+        // short circuit case
+        if (left == right &&
+                leftOffset == rightOffset &&
+                leftLen == rightLen) {
+            return true;
+        }
+        // different lengths fast check
+        if (leftLen != rightLen) {
+            return false;
+        }
+        if (leftLen == 0) {
+            return true;
+        }
+
+        // Since we're often comparing adjacent sorted data,
+        // it's usual to have equal arrays except for the very last byte
+        // so check that first
+        if (left[leftOffset + leftLen - 1] != right[rightOffset + rightLen - 1]) return false;
+
+        return compareByteArrayInLexOrder(left, leftOffset, leftLen, right, rightOffset, rightLen) == 0;
+    }
+
+
+    /**
+     * Return true if the byte array on the right is a prefix of the byte
+     * array on the left.
+     */
+    public static boolean startsWith(byte[] bytes, byte[] prefix) {
+        return bytes != null && prefix != null &&
+                bytes.length >= prefix.length &&
+                compareByteArrayInLexOrder(bytes, 0, prefix.length, prefix, 0, prefix.length) == 0;
+    }
+
+    public static int hashCode(final byte[] b) {
+        return hashCode(b, b.length);
+    }
+
+    public static int hashCode(final byte[] b, final int length) {
+        return hashBytes(b, length);
+    }
+
+    /**
+     * Compute hash for binary data.
+     */
+    public static int hashBytes(byte[] bytes, int offset, int length) {
+        int hash = 1;
+        for (int i = offset; i < offset + length; i++)
+            hash = (31 * hash) + (int) bytes[i];
+        return hash;
+    }
+
+    /**
+     * Compute hash for binary data.
+     */
+    public static int hashBytes(byte[] bytes, int length) {
+        return hashBytes(bytes, 0, length);
+    }
+
+    /**
+     * @param bytes  array to hash
+     * @param offset offset to start from
+     * @param length length to hash
+     */
+    public static int hashCode(byte[] bytes, int offset, int length) {
+        int hash = 1;
+        for (int i = offset; i < offset + length; i++)
+            hash = (31 * hash) + (int) bytes[i];
+        return hash;
+    }
+
+    /**
+     * http://tools.ietf.org/html/rfc3629
+     */
+    public static int stringtoUTF8Bytes(String str, byte[] buffer) {
+        int index = 0;
+        for (int i = 0; i < str.length(); i++) {
+            char strChar = str.charAt(i);
+            if ((strChar & 0xFF80) == 0) {
+                // (00000000 00000000 - 00000000 01111111) -> 0xxxxxxx
+                buffer[index++] = (byte) (strChar & 0x00FF);
+            } else if ((strChar & 0xF800) == 0) {
+                // (00000000 10000000 - 00000111 11111111) -> 110xxxxx 10xxxxxx
+                buffer[index++] = (byte) ((strChar >> 6) | 0x00c0);
+                buffer[index++] = (byte) ((strChar & 0x003F) | 0x0080);
+            } else {
+                // (00001000 00000000 - 11111111 11111111) -> 1110xxxx 10xxxxxx 10xxxxxx
+                buffer[index++] = (byte) ((strChar >> 12) | 0x00e0);
+                buffer[index++] = (byte) (((strChar >> 6) & 0x003F) | 0x0080);
+                buffer[index++] = (byte) ((strChar & 0x003F) | 0x0080);
+            }
+        }
+        return index;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricIDGenerator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricIDGenerator.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricIDGenerator.java
new file mode 100644
index 0000000..44fd4bb
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricIDGenerator.java
@@ -0,0 +1,15 @@
+package com.alibaba.jstorm.metric;
+
+import java.util.UUID;
+
+/**
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class DefaultMetricIDGenerator implements MetricIDGenerator {
+
+    @Override
+    public long genMetricId(String metricName) {
+        return UUID.randomUUID().getLeastSignificantBits();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java
new file mode 100644
index 0000000..5de2b8d
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/DefaultMetricQueryClient.java
@@ -0,0 +1,84 @@
+package com.alibaba.jstorm.metric;
+
+import com.alibaba.jstorm.common.metric.MetricMeta;
+import com.alibaba.jstorm.common.metric.TaskTrack;
+import com.alibaba.jstorm.common.metric.TopologyHistory;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * a dummy metric query client implementation
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class DefaultMetricQueryClient implements MetricQueryClient {
+    @Override
+    public void init(Map conf) {
+    }
+
+    @Override
+    public List<MetricMeta> getMetricMeta(String clusterName, String topologyId, MetaType type, MetaFilter filter, Object arg) {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public List<MetricMeta> getMetricMeta(String clusterName, String topologyId, MetaType type) {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public List<MetricMeta> getWorkerMeta(String clusterName, String topologyId) {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public List<MetricMeta> getNettyMeta(String clusterName, String topologyId) {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public List<MetricMeta> getTaskMeta(String clusterName, String topologyId, int taskId) {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public List<MetricMeta> getComponentMeta(String clusterName, String topologyId, String componentId) {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public MetricMeta getMetricMeta(String clusterName, String topologyId, MetaType metaType, long metricId) {
+        return null;
+    }
+
+    @Override
+    public List<Object> getMetricData(long metricId, MetricType metricType, int win, long start, long end) {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public List<TaskTrack> getTaskTrack(String clusterName, String topologyId) {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public List<TaskTrack> getTaskTrack(String clusterName, String topologyId, int taskId) {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public List<TopologyHistory> getTopologyHistory(String clusterName, String topologyName, int size) {
+        return Lists.newArrayList();
+    }
+
+    @Override
+    public void deleteMeta(MetricMeta meta) {
+    }
+
+    @Override
+    public void deleteMeta(List<MetricMeta> metaList) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthCheck.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthCheck.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthCheck.java
index 631c38b..85e7f15 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthCheck.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthCheck.java
@@ -27,19 +27,14 @@ import com.codahale.metrics.health.HealthCheck;
 import com.codahale.metrics.health.HealthCheckRegistry;
 
 public class JStormHealthCheck {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(JStormHealthCheck.class);
+    private static final Logger LOG = LoggerFactory.getLogger(JStormHealthCheck.class);
 
-    private final static Map<Integer, HealthCheckRegistry> taskHealthCheckMap =
-            new ConcurrentHashMap<Integer, HealthCheckRegistry>();
+    private final static Map<Integer, HealthCheckRegistry> taskHealthCheckMap = new ConcurrentHashMap<Integer, HealthCheckRegistry>();
 
-    private final static HealthCheckRegistry workerHealthCheck =
-            new HealthCheckRegistry();
+    private final static HealthCheckRegistry workerHealthCheck = new HealthCheckRegistry();
 
-    public static void registerTaskHealthCheck(int taskId, String name,
-            HealthCheck healthCheck) {
-        HealthCheckRegistry healthCheckRegister =
-                taskHealthCheckMap.get(taskId);
+    public static void registerTaskHealthCheck(int taskId, String name, HealthCheck healthCheck) {
+        HealthCheckRegistry healthCheckRegister = taskHealthCheckMap.get(taskId);
 
         if (healthCheckRegister == null) {
             healthCheckRegister = new HealthCheckRegistry();
@@ -49,14 +44,12 @@ public class JStormHealthCheck {
         healthCheckRegister.register(name, healthCheck);
     }
 
-    public static void registerWorkerHealthCheck(String name,
-            HealthCheck healthCheck) {
+    public static void registerWorkerHealthCheck(String name, HealthCheck healthCheck) {
         workerHealthCheck.register(name, healthCheck);
     }
 
     public static void unregisterTaskHealthCheck(int taskId, String name) {
-        HealthCheckRegistry healthCheckRegister =
-                taskHealthCheckMap.get(taskId);
+        HealthCheckRegistry healthCheckRegister = taskHealthCheckMap.get(taskId);
 
         if (healthCheckRegister != null) {
             healthCheckRegister.unregister(name);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthReporter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthReporter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthReporter.java
new file mode 100644
index 0000000..e344bfd
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormHealthReporter.java
@@ -0,0 +1,59 @@
+package com.alibaba.jstorm.metric;
+
+import com.alibaba.jstorm.callback.RunnableCallback;
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.daemon.worker.WorkerData;
+import com.codahale.metrics.health.HealthCheckRegistry;
+import com.codahale.metrics.health.HealthCheck.Result;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Created by wuchong on 15/9/17.
+ */
+public class JStormHealthReporter extends RunnableCallback {
+    private static final Logger LOG = LoggerFactory.getLogger(JStormHealthReporter.class);
+    private static final int THREAD_CYCLE = 60;   //report every minute
+    private WorkerData workerData;
+
+    public JStormHealthReporter(WorkerData workerData) {
+        this.workerData = workerData;
+    }
+
+    @Override
+    public void run() {
+        StormClusterState clusterState = workerData.getZkCluster();
+        String topologyId = workerData.getTopologyId();
+
+        Map<Integer, HealthCheckRegistry> taskHealthCheckMap = JStormHealthCheck.getTaskhealthcheckmap();
+        int cnt = 0;
+        for (Map.Entry<Integer, HealthCheckRegistry> entry : taskHealthCheckMap.entrySet()) {
+            Integer taskId = entry.getKey();
+            Map<String, Result> results = entry.getValue().runHealthChecks();
+
+            for (Map.Entry<String, Result> result : results.entrySet()) {
+                if (!result.getValue().isHealthy()) {
+                    try {
+                        clusterState.report_task_error(topologyId, taskId, result.getValue().getMessage(), null);
+                        cnt++;
+                    } catch (Exception e) {
+                        LOG.error("Failed to update health data in ZK for topo-{} task-{}.", topologyId, taskId, e);
+                    }
+                }
+            }
+        }
+        LOG.info("Successfully updated {} health data to ZK for topology:{}", cnt, topologyId);
+    }
+
+    @Override
+    public Object getResult() {
+        return THREAD_CYCLE;
+    }
+
+    @Override
+    public String getThreadName() {
+        return "HealthReporterThread";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java
new file mode 100644
index 0000000..3a85b73
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetricCache.java
@@ -0,0 +1,351 @@
+package com.alibaba.jstorm.metric;
+
+import backtype.storm.generated.MetricInfo;
+import backtype.storm.generated.TopologyMetric;
+import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.cache.JStormCache;
+import com.alibaba.jstorm.cache.RocksDBCache;
+import com.alibaba.jstorm.cache.TimeoutMemCache;
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.utils.OSInfo;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * metrics cache. we maintain the following data in rocks DB cache: 1. all topology ids 2. topology id ==> all metrics meta(map<metric_name, metric_id>) 3.
+ * topology id ==> all metrics data
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+@SuppressWarnings("unchecked")
+public class JStormMetricCache {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JStormMetricCache.class);
+
+    public static final String TIMEOUT_MEM_CACHE_CLASS = TimeoutMemCache.class.getName();
+    public static final String ROCKS_DB_CACHE_CLASS = RocksDBCache.class.getName();
+
+    protected final Object lock = new Object();
+
+    protected JStormCache cache = null;
+
+    protected static final String METRIC_META_PREFIX = "__metric.meta__";
+    protected static final String SENT_METRIC_META_PREFIX = "__saved.metric.meta__";
+    protected static final String ALL_TOPOLOGIES_KEY = "__all.topologies__";
+    protected static final String TOPOLOGY_SAMPLE_RATE = "__topology.sample.rate__";
+
+    protected static final String METRIC_DATA_PREFIX = "__metric.data__";
+    protected static final String METRIC_DATA_30M_COMPONENT = "__metric.data.comp__";
+    protected static final String METRIC_DATA_30M_TASK = "__metric.data.task__";
+    protected static final String METRIC_DATA_30M_STREAM = "__metric.data.stream__";
+    protected static final String METRIC_DATA_30M_WORKER = "__metric.data.worker__";
+    protected static final String METRIC_DATA_30M_NETTY = "__metric.data.netty__";
+    protected static final String METRIC_DATA_30M_TOPOLOGY = "__metric.data.topology__";
+
+    protected final StormClusterState zkCluster;
+
+    public String getNimbusCacheClass(Map conf) {
+        boolean isLinux = OSInfo.isLinux();
+        boolean isMac = OSInfo.isMac();
+        boolean isLocal = StormConfig.local_mode(conf);
+
+        if (isLocal) {
+            return TIMEOUT_MEM_CACHE_CLASS;
+        }
+
+        if (!isLinux && !isMac) {
+            return TIMEOUT_MEM_CACHE_CLASS;
+        }
+
+        String nimbusCacheClass = ConfigExtension.getNimbusCacheClass(conf);
+        if (!StringUtils.isBlank(nimbusCacheClass)) {
+            return nimbusCacheClass;
+        }
+
+        return ROCKS_DB_CACHE_CLASS;
+    }
+
+    public JStormMetricCache(Map conf, StormClusterState zkCluster) {
+        String dbCacheClass = getNimbusCacheClass(conf);
+        LOG.info("JStorm metrics cache will use {}", dbCacheClass);
+
+        boolean reset = ConfigExtension.getMetricCacheReset(conf);
+        try {
+            cache = (JStormCache) Utils.newInstance(dbCacheClass);
+
+            String dbDir = StormConfig.metricDbDir(conf);
+            conf.put(RocksDBCache.ROCKSDB_ROOT_DIR, dbDir);
+            conf.put(RocksDBCache.ROCKSDB_RESET, reset);
+            cache.init(conf);
+        } catch (Exception e) {
+            if (!reset && cache != null) {
+                LOG.error("Failed to init rocks db, will reset and try to re-init...");
+                conf.put(RocksDBCache.ROCKSDB_RESET, true);
+                try {
+                    cache.init(conf);
+                } catch (Exception ex) {
+                    LOG.error("Error", ex);
+                }
+            } else {
+                LOG.error("Failed to create metrics cache!", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        this.zkCluster = zkCluster;
+    }
+
+    public JStormCache getCache() {
+        return cache;
+    }
+
+    public JStormCache put(String k, Object v) {
+        cache.put(k, v);
+        return cache;
+    }
+
+    /**
+     * store 30min metric data. the metric data is stored in a ring.
+     */
+    public JStormCache putMetricData(String topologyId, TopologyMetric tpMetric) {
+        // map<key, [ts, metric_info]>
+        Map<String, Object> batchData = new HashMap<String, Object>();
+        long ts = System.currentTimeMillis();
+        int tp = 0, comp = 0, task = 0, stream = 0, worker = 0, netty = 0;
+        if (tpMetric.get_componentMetric().get_metrics_size() > 0) {
+            batchData.put(METRIC_DATA_30M_COMPONENT + topologyId, new Object[]{ts, tpMetric.get_componentMetric()});
+            comp += tpMetric.get_componentMetric().get_metrics_size();
+        }
+        if (tpMetric.get_taskMetric().get_metrics_size() > 0) {
+            tryCombineMetricInfo(METRIC_DATA_30M_TASK + topologyId, tpMetric.get_taskMetric(), MetaType.TASK, ts);
+            task += tpMetric.get_taskMetric().get_metrics_size();
+        }
+        if (tpMetric.get_streamMetric().get_metrics_size() > 0) {
+            tryCombineMetricInfo(METRIC_DATA_30M_STREAM + topologyId, tpMetric.get_streamMetric(), MetaType.STREAM, ts);
+            stream += tpMetric.get_streamMetric().get_metrics_size();
+        }
+        if (tpMetric.get_workerMetric().get_metrics_size() > 0) {
+            tryCombineMetricInfo(METRIC_DATA_30M_WORKER + topologyId, tpMetric.get_workerMetric(), MetaType.WORKER, ts);
+            worker += tpMetric.get_workerMetric().get_metrics_size();
+        }
+        if (tpMetric.get_nettyMetric().get_metrics_size() > 0) {
+            tryCombineMetricInfo(METRIC_DATA_30M_NETTY + topologyId, tpMetric.get_nettyMetric(), MetaType.NETTY, ts);
+            netty += tpMetric.get_nettyMetric().get_metrics_size();
+        }
+
+        // store 30 snapshots of topology metrics
+        if (tpMetric.get_topologyMetric().get_metrics_size() > 0) {
+            String keyPrefix = METRIC_DATA_30M_TOPOLOGY + topologyId + "-";
+            int page = getRingAvailableIndex(keyPrefix);
+
+            batchData.put(keyPrefix + page, new Object[]{ts, tpMetric.get_topologyMetric()});
+            tp += tpMetric.get_topologyMetric().get_metrics_size();
+        }
+        LOG.info("caching metric data for topology:{},tp:{},comp:{},task:{},stream:{},worker:{},netty:{},cost:{}",
+                topologyId, tp, comp, task, stream, worker, netty, System.currentTimeMillis() - ts);
+
+        return putBatch(batchData);
+    }
+
+    private int getRingAvailableIndex(String keyPrefix) {
+        int page = 0;
+        // backward check
+        long last_ts = 0;
+        for (int idx = 1; idx <= 30; idx++) {
+            String key = keyPrefix + idx;
+            if (cache.get(key) != null) {
+                long timestamp = (long) ((Object[]) cache.get(key))[0];
+                if (timestamp > last_ts) {
+                    last_ts = timestamp;
+                    page = idx;
+                }
+            }
+        }
+        if (page < 30) {
+            page += 1;
+        } else {
+            page = 1;
+        }
+        return page;
+    }
+
+    private void tryCombineMetricInfo(String key, MetricInfo incoming, MetaType metaType, long ts) {
+        Object data = cache.get(key);
+        if (data != null) {
+            try {
+                Object[] parts = (Object[]) data;
+                MetricInfo old = (MetricInfo) parts[1];
+
+                LOG.info("combine {} metrics, old:{}, new:{}",
+                        metaType, old.get_metrics_size(), incoming.get_metrics_size());
+                old.get_metrics().putAll(incoming.get_metrics());
+                // remove dead worker
+                cache.put(key, new Object[]{ts, old});
+            } catch (Exception ignored) {
+                cache.remove(key);
+                cache.put(key, new Object[]{ts, incoming});
+            }
+        } else {
+            cache.put(key, new Object[]{ts, incoming});
+        }
+    }
+
+    public List<MetricInfo> getMetricData(String topologyId, MetaType metaType) {
+        Map<Long, MetricInfo> retMap = new TreeMap<Long, MetricInfo>();
+
+        String key = null;
+        if (metaType == MetaType.COMPONENT) {
+            key = METRIC_DATA_30M_COMPONENT + topologyId;
+        } else if (metaType == MetaType.TASK) {
+            key = METRIC_DATA_30M_TASK + topologyId;
+        } else if (metaType == MetaType.STREAM) {
+            key = METRIC_DATA_30M_STREAM + topologyId;
+        } else if (metaType == MetaType.WORKER) {
+            key = METRIC_DATA_30M_WORKER + topologyId;
+        } else if (metaType == MetaType.NETTY) {
+            key = METRIC_DATA_30M_NETTY + topologyId;
+        } else if (metaType == MetaType.TOPOLOGY) {
+            String keyPrefix = METRIC_DATA_30M_TOPOLOGY + topologyId + "-";
+            for (int i = 1; i <= 30; i++) {
+                Object obj = cache.get(keyPrefix + i);
+                if (obj != null) {
+                    Object[] objects = (Object[]) obj;
+                    retMap.put((Long) objects[0], (MetricInfo) objects[1]);
+                }
+            }
+        }
+        if (key != null) {
+            Object obj = cache.get(key);
+            if (obj != null) {
+                Object[] objects = (Object[]) obj;
+                retMap.put((Long) objects[0], (MetricInfo) objects[1]);
+            }
+        }
+        List<MetricInfo> ret = Lists.newArrayList(retMap.values());
+        int cnt = 0;
+        for (MetricInfo metricInfo : ret) {
+            cnt += metricInfo.get_metrics_size();
+        }
+        LOG.info("getMetricData, topology:{}, meta type:{}, metric info size:{}, total metric size:{}",
+                topologyId, metaType, ret.size(), cnt);
+        return ret;
+    }
+
+    public JStormCache putBatch(Map<String, Object> kv) {
+        if (kv.size() > 0) {
+            cache.putBatch(kv);
+        }
+        return cache;
+    }
+
+    public Object get(String k) {
+        return cache.get(k);
+    }
+
+    public void remove(String k) {
+        cache.remove(k);
+    }
+
+    public void removeTopology(String topologyId) {
+        removeTopologyMeta(topologyId);
+        removeTopologyData(topologyId);
+    }
+
+    protected void removeTopologyMeta(String topologyId) {
+        cache.remove(METRIC_META_PREFIX + topologyId);
+    }
+
+    protected void removeTopologyData(String topologyId) {
+        long start = System.currentTimeMillis();
+        cache.remove(METRIC_DATA_PREFIX + topologyId);
+
+        Set<String> metricDataKeys = new HashSet<>();
+        for (int i = 1; i <= 30; i++) {
+            String metricDataKeySuffix = topologyId + "-" + i;
+            metricDataKeys.add(METRIC_DATA_30M_TOPOLOGY + metricDataKeySuffix);
+        }
+        metricDataKeys.add(METRIC_DATA_30M_COMPONENT + topologyId);
+        metricDataKeys.add(METRIC_DATA_30M_TASK + topologyId);
+        metricDataKeys.add(METRIC_DATA_30M_STREAM + topologyId);
+        metricDataKeys.add(METRIC_DATA_30M_WORKER + topologyId);
+        metricDataKeys.add(METRIC_DATA_30M_NETTY + topologyId);
+
+        cache.removeBatch(metricDataKeys);
+        LOG.info("removing metric cache of topology:{}, cost:{}", topologyId, System.currentTimeMillis() - start);
+    }
+
+    public void unregisterWorker(String topologyId, String host, int port) {
+        String prefix = MetricUtils.workerMetricPrefix(topologyId, host, port);
+        synchronized (lock) {
+            //remove dead worker meta info in METRIC_META_PREFIX
+            Map<String, Long> nodes = (Map<String, Long>) cache.get(METRIC_META_PREFIX + topologyId);
+            if (nodes != null) {
+                Iterator<String> keyIterator = nodes.keySet().iterator();
+                while (keyIterator.hasNext()){
+                    String metricName = keyIterator.next();
+                    // remove metric type
+                    metricName = metricName.charAt(0) + metricName.substring(2, metricName.length());
+                    if (metricName.startsWith(prefix)) {
+                        keyIterator.remove();
+                    }
+                }
+                cache.put(METRIC_META_PREFIX + topologyId, nodes);
+            }
+            //remove dead worker in METRIC_DATA_30M_WORKER
+            Object data = cache.get(METRIC_DATA_30M_WORKER + topologyId);
+            if (data != null) {
+                Object[] parts = (Object[]) data;
+                MetricInfo old = (MetricInfo) parts[1];
+                Iterator<String> oldKeys = old.get_metrics().keySet().iterator();
+                while (oldKeys.hasNext()) {
+                    String metricName = oldKeys.next();
+                    metricName = metricName.charAt(0) + metricName.substring(2, metricName.length());
+                    if (metricName.startsWith(prefix)) {
+                        oldKeys.remove();
+                        LOG.info("remove dead worker metric : {}", metricName);
+                    }
+                }
+                cache.put(METRIC_DATA_30M_WORKER + topologyId, data);
+            }
+        }
+    }
+
+    public Map<String, Long> getMeta(String topologyId) {
+        return (Map<String, Long>) cache.get(METRIC_META_PREFIX + topologyId);
+    }
+
+    public void putMeta(String topologyId, Object v) {
+        cache.put(METRIC_META_PREFIX + topologyId, v);
+    }
+
+    public void putSampleRate(String topologyId, double sampleRate) {
+        cache.put(TOPOLOGY_SAMPLE_RATE + topologyId, sampleRate);
+    }
+
+    public void removeSampleRate(String topologyId) {
+        cache.remove(TOPOLOGY_SAMPLE_RATE + topologyId);
+    }
+
+    public double getSampleRate(String topologyId) {
+        String rate = (String) cache.get(TOPOLOGY_SAMPLE_RATE + topologyId);
+        if (rate == null) {
+            return ConfigExtension.DEFAULT_METRIC_SAMPLE_RATE;
+        }
+        return Double.parseDouble(rate);
+    }
+
+    public Map<String, Long> getSentMeta(String topologyId) {
+        return (Map<String, Long>) cache.get(SENT_METRIC_META_PREFIX + topologyId);
+    }
+
+    public void putSentMeta(String topologyId, Object allMetricMeta) {
+        cache.put(SENT_METRIC_META_PREFIX + topologyId, allMetricMeta);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java
index 8221cd8..6531c9c 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/metric/JStormMetrics.java
@@ -15,267 +15,441 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package com.alibaba.jstorm.metric;
 
-import java.io.Serializable;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+package com.alibaba.jstorm.metric;
 
+import backtype.storm.generated.MetricInfo;
+import com.alibaba.jstorm.common.metric.*;
+import com.alibaba.jstorm.common.metric.snapshot.AsmSnapshot;
+import com.alibaba.jstorm.utils.NetWorkUtils;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.generated.MetricInfo;
-
-import com.alibaba.jstorm.common.metric.Counter;
-import com.alibaba.jstorm.common.metric.Gauge;
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.common.metric.Meter;
-import com.alibaba.jstorm.common.metric.MetricRegistry;
-import com.alibaba.jstorm.common.metric.Timer;
-import com.alibaba.jstorm.common.metric.window.Metric;
-import com.alibaba.jstorm.utils.JStormUtils;
+import java.io.Serializable;
+import java.util.*;
 
+/**
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
 public class JStormMetrics implements Serializable {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(JStormMetrics.class);
-    private static final long serialVersionUID = 2046603514943797241L;
+    private static final long serialVersionUID = -2580242512743243267L;
+
+    public static final String NIMBUS_METRIC_KEY = "__NIMBUS__";
+    public static final String CLUSTER_METRIC_KEY = "__CLUSTER__";
+    public static final String SUPERVISOR_METRIC_KEY = "__SUPERVISOR__";
+
+    protected static final Logger LOG = LoggerFactory.getLogger(JStormMetrics.class);
 
     /**
      * Metrics in this object will be uploaded to nimbus
      */
-    static MetricRegistry workerMetrics = new MetricRegistry();
-    static Map<Integer, MetricRegistry> taskMetrics =
-            new ConcurrentHashMap<Integer, MetricRegistry>();
-    /**
-     * Metrics in this object will be just be output to log, won't be uploaded
-     * to nimbus
-     */
-    static MetricRegistry skipMetrics = new MetricRegistry();
+    protected static final AsmMetricRegistry workerMetrics = new AsmMetricRegistry();
+    protected static final AsmMetricRegistry nettyMetrics = new AsmMetricRegistry();
+    protected static final AsmMetricRegistry componentMetrics = new AsmMetricRegistry();
+    protected static final AsmMetricRegistry taskMetrics = new AsmMetricRegistry();
+    protected static final AsmMetricRegistry streamMetrics = new AsmMetricRegistry();
+    protected static final AsmMetricRegistry topologyMetrics = new AsmMetricRegistry();
 
-    protected static MetricInfo exposeWorkerMetrics;
-    protected static Map<String, MetricInfo> exposeNettyMetrics;
-    protected static Map<Integer, MetricInfo> exposeTaskMetrics;
+    protected static final AsmMetricRegistry[] allRegistries = {
+            streamMetrics, taskMetrics, componentMetrics, workerMetrics, nettyMetrics, topologyMetrics};
 
-    static {
-        registerWorkerGauge(new com.codahale.metrics.Gauge<Double>() {
+    protected static String topologyId;
+    protected static String host;
+    protected static int port;
+    protected static boolean debug;
 
-            @Override
-            public Double getValue() {
-                // TODO Auto-generated method stub
-                return JStormUtils.getCpuUsage();
-            }
+    public static final String DEFAULT_GROUP = "sys";
+    public static final String NETTY_GROUP = "netty";
 
-        }, MetricDef.CPU_USED_RATIO);
+    protected static Set<String> debugMetricNames = new HashSet<String>();
 
-        registerWorkerGauge(new com.codahale.metrics.Gauge<Double>() {
+    static {
+        host = NetWorkUtils.ip();
+    }
 
-            @Override
-            public Double getValue() {
-                // TODO Auto-generated method stub
-                return JStormUtils.getMemUsage();
-            }
+    private static boolean enabled = true;
 
-        }, MetricDef.MEMORY_USED);
+    public static int getPort() {
+        return port;
     }
 
-    public static MetricRegistry registerTask(int taskId) {
-        MetricRegistry ret = taskMetrics.get(taskId);
-        if (ret == null) {
-            ret = new MetricRegistry();
-            taskMetrics.put(taskId, ret);
-            LOG.info("Register task MetricRegistry " + taskId);
-        }
+    public static void setPort(int port) {
+        JStormMetrics.port = port;
+    }
 
-        return ret;
+    public static String getHost() {
+        return host;
     }
 
-    public static void unregisterTask(int taskId) {
-    	taskMetrics.remove(taskId);
+    public static void setHost(String host) {
+        JStormMetrics.host = host;
     }
 
-    // the Metric should be one of metrics of task
-    // if register this metric through this function,
-    // the web UI would do sum operation for the metric
-    // the metric will display in component/topology level in web UI
-    public static void registerSumMetric(String name) {
-        MetricDef.MERGE_SUM_TAG.add(name);
+    public static String getTopologyId() {
+        return topologyId;
     }
 
-    public static void unregisterSumMetric(String name) {
-        MetricDef.MERGE_SUM_TAG.remove(name);
+    public static void setTopologyId(String topologyId) {
+        JStormMetrics.topologyId = topologyId;
     }
 
-    // the Metric should be one of metrics of task
-    // if register this metric through this function,
-    // the web UI would do sum operation for the metric
-    // the metric will display in component/topology level in web UI
-    public static void registerAvgMetric(String name) {
-        MetricDef.MERGE_AVG_TAG.add(name);
+    public static boolean isDebug() {
+        return debug;
     }
 
-    public static void unregisterAvgMetric(String name) {
-        MetricDef.MERGE_AVG_TAG.remove(name);
+    public static void setDebug(boolean debug) {
+        JStormMetrics.debug = debug;
+        LOG.info("topology metrics debug enabled:{}", debug);
     }
 
-    public static <T extends Metric> T registerWorkerMetric(T metric,
-            String name, String... args) throws IllegalArgumentException {
-        String registerName = MetricRegistry.name(name, args);
+    public static void setEnabled(boolean enabled) {
+        JStormMetrics.enabled = enabled;
+    }
 
-        return workerMetrics.register(registerName, metric);
+    public static boolean isEnabled() {
+        return enabled;
     }
 
-    public static void unregisterWorkerMetric(String name, String... args) {
-        String registerName = MetricRegistry.name(name, args);
+    public static String workerMetricName(String name, MetricType type) {
+        return MetricUtils.workerMetricName(topologyId, host, port, name, type);
+    }
 
-        workerMetrics.remove(registerName);
+    public static void addDebugMetrics(String names) {
+        String[] metrics = names.split(",");
+        for (String metric : metrics) {
+            metric = metric.trim();
+            if (!StringUtils.isBlank(metric)) {
+                debugMetricNames.add(metric);
+            }
+        }
+        LOG.info("debug metric names:{}", Joiner.on(",").join(debugMetricNames));
     }
 
-    public static <T extends Metric> T registerTaskMetric(T metric, int taskId,
-            String name, String... args) throws IllegalArgumentException {
-        MetricRegistry metrics = taskMetrics.get(taskId);
-        if (metrics == null) {
-            throw new IllegalArgumentException("Invalid taskId " + taskId);
+    /**
+     * reserve for debug purposes
+     */
+    public static AsmMetric find(String name) {
+        for (AsmMetricRegistry registry : allRegistries) {
+            AsmMetric metric = registry.getMetric(name);
+            if (metric != null) {
+                return metric;
+            }
         }
+        return null;
+    }
 
-        String registerName = MetricRegistry.name(name, args);
+    public static AsmMetric registerStreamMetric(String name, AsmMetric metric, boolean mergeTopology) {
+        name = fixNameIfPossible(name);
+        LOG.info("register stream metric:{}", name);
+        AsmMetric ret = streamMetrics.register(name, metric);
 
-        return metrics.register(registerName, metric);
-    }
+        if (metric.isAggregate()) {
+            List<AsmMetric> assocMetrics = new ArrayList<>();
+
+            String taskMetricName = MetricUtils.stream2taskName(name);
+            AsmMetric taskMetric = taskMetrics.register(taskMetricName, metric.clone());
+            assocMetrics.add(taskMetric);
+
+            String compMetricName = MetricUtils.task2compName(taskMetricName);
+            AsmMetric componentMetric = componentMetrics.register(compMetricName, taskMetric.clone());
+            assocMetrics.add(componentMetric);
+
+            String metricName = MetricUtils.getMetricName(name);
+            if (metricName.contains(".")){
+                compMetricName = MetricUtils.task2MergeCompName(taskMetricName);
+                AsmMetric mergeCompMetric = componentMetrics.register(compMetricName, taskMetric.clone());
+                assocMetrics.add(mergeCompMetric);
+            }
+
+            if (mergeTopology){
+                String topologyMetricName = MetricUtils.comp2topologyName(compMetricName);
+                AsmMetric topologyMetric = topologyMetrics.register(topologyMetricName, ret.clone());
+                assocMetrics.add(topologyMetric);
+            }
 
-    public static void unregisterTaskMetric(int taskId, String name,
-            String... args) throws IllegalArgumentException {
-        String registerName = MetricRegistry.name(name, args);
-        MetricRegistry metrics = taskMetrics.get(taskId);
-        if (metrics == null) {
-            throw new IllegalArgumentException("Invalid taskId");
+            ret.addAssocMetrics(assocMetrics.toArray(new AsmMetric[assocMetrics.size()]));
         }
-        metrics.remove(registerName);
-    }
 
-    public static Gauge<Double> registerWorkerGauge(
-            com.codahale.metrics.Gauge<Double> rawGauge, String name,
-            String... args) {
-        Gauge<Double> ret = new Gauge<Double>(rawGauge);
-        registerWorkerMetric(ret, name, args);
         return ret;
     }
 
-    public static Gauge<Double> registerTaskGauge(
-            com.codahale.metrics.Gauge<Double> rawGauge, int taskId,
-            String name, String... args) {
-        Gauge<Double> ret = new Gauge<Double>(rawGauge);
-        registerTaskMetric(ret, taskId, name, args);
+    public static AsmMetric registerTaskMetric(String name, AsmMetric metric) {
+        name = fixNameIfPossible(name);
+        AsmMetric ret = taskMetrics.register(name, metric);
+
+        if (metric.isAggregate()) {
+            String compMetricName = MetricUtils.task2compName(name);
+            AsmMetric componentMetric = componentMetrics.register(compMetricName, ret.clone());
+
+            ret.addAssocMetrics(componentMetric);
+        }
+
         return ret;
     }
 
-    public static Counter<Double> registerWorkerCounter(String name,
-            String... args) throws IllegalArgumentException {
-        Counter<Double> ret =
-                (Counter<Double>) Builder.mkInstance(Builder.COUNTER);
-        registerWorkerMetric(ret, name, args);
-        return ret;
+//    public static AsmMetric registerStreamTopologyMetric(String name, AsmMetric metric) {
+//        name = fixNameIfPossible(name);
+//        LOG.info("register stream metric:{}", name);
+//        AsmMetric ret = streamMetrics.register(name, metric);
+//
+//        if (metric.isAggregate()) {
+//            String taskMetricName = MetricUtils.stream2taskName(name);
+//            AsmMetric taskMetric = taskMetrics.register(taskMetricName, ret.clone());
+//
+//            String compMetricName = MetricUtils.task2compName(taskMetricName);
+//            AsmMetric componentMetric = componentMetrics.register(compMetricName, ret.clone());
+//
+//            String topologyMetricName = MetricUtils.comp2topologyName(compMetricName);
+//            AsmMetric topologyMetric = topologyMetrics.register(topologyMetricName, ret.clone());
+//
+//            ret.addAssocMetrics(taskMetric, componentMetric, topologyMetric);
+//        }
+//
+//        return ret;
+//    }
+
+    public static AsmMetric registerWorkerMetric(String name, AsmMetric metric) {
+        name = fixNameIfPossible(name);
+        return workerMetrics.register(name, metric);
     }
 
-    public static Counter<Double> registerTaskCounter(int taskId, String name,
-            String... args) {
-        Counter<Double> ret =
-                (Counter<Double>) Builder.mkInstance(Builder.COUNTER);
-        registerTaskMetric(ret, taskId, name, args);
+    public static AsmMetric registerWorkerTopologyMetric(String name, AsmMetric metric) {
+        name = fixNameIfPossible(name);
+        AsmMetric ret = workerMetrics.register(name, metric);
+
+        String topologyMetricName = MetricUtils.worker2topologyName(name);
+        AsmMetric topologyMetric = topologyMetrics.register(topologyMetricName, ret.clone());
+
+        ret.addAssocMetrics(topologyMetric);
+
         return ret;
     }
 
-    public static Meter registerWorkerMeter(String name, String... args)
-            throws IllegalArgumentException {
-        Meter ret = (Meter) Builder.mkInstance(Builder.METER);
-        registerWorkerMetric(ret, name, args);
-        return ret;
+    public static AsmMetric registerNettyMetric(String name, AsmMetric metric) {
+        name = fixNameIfPossible(name, NETTY_GROUP);
+        return nettyMetrics.register(name, metric);
     }
 
-    public static Meter registerTaskMeter(int taskId, String name,
-            String... args) {
-        Meter ret = (Meter) Builder.mkInstance(Builder.METER);
-        registerTaskMetric(ret, taskId, name, args);
-        return ret;
+    /**
+     * simplified helper method to register a worker histogram
+     *
+     * @param topologyId topology id
+     * @param name       metric name, NOTE it's not a full-qualified name.
+     * @param histogram  histogram
+     * @return registered histogram
+     */
+    public static AsmHistogram registerWorkerHistogram(String topologyId, String name, AsmHistogram histogram) {
+        return (AsmHistogram) registerWorkerMetric(
+                MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.HISTOGRAM), histogram);
     }
 
-    public static Histogram registerWorkerHistogram(String name, String... args)
-            throws IllegalArgumentException {
-        Histogram ret = (Histogram) Builder.mkInstance(Builder.HISTOGRAM);
-        registerWorkerMetric(ret, name, args);
-        return ret;
+    /**
+     * simplified helper method to register a worker gauge
+     */
+    public static AsmGauge registerWorkerGauge(String topologyId, String name, AsmGauge gauge) {
+        return (AsmGauge) registerWorkerMetric(
+                MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.GAUGE), gauge);
     }
 
-    public static Histogram registerTaskHistogram(int taskId, String name,
-            String... args) {
-        Histogram ret = (Histogram) Builder.mkInstance(Builder.HISTOGRAM);
-        registerTaskMetric(ret, taskId, name, args);
-        return ret;
+    /**
+     * simplified helper method to register a worker meter
+     */
+    public static AsmMeter registerWorkerMeter(String topologyId, String name, AsmMeter meter) {
+        return (AsmMeter) registerWorkerMetric(
+                MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.METER), meter);
     }
 
-    public static Timer registerWorkerTimer(String name, String... args)
-            throws IllegalArgumentException {
-        Timer ret = (Timer) Builder.mkInstance(Builder.TIMER);
-        registerWorkerMetric(ret, name, args);
-        return ret;
+    /**
+     * simplified helper method to register a worker counter
+     */
+    public static AsmCounter registerWorkerCounter(String topologyId, String name, AsmCounter counter) {
+        return (AsmCounter) registerWorkerMetric(
+                MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.COUNTER), counter);
     }
 
-    public static Timer registerTaskTimer(int taskId, String name,
-            String... args) {
-        Timer ret = (Timer) Builder.mkInstance(Builder.TIMER);
-        registerTaskMetric(ret, taskId, name, args);
-        return ret;
+    /**
+     * simplified helper method to register a worker timer
+     */
+    public static AsmTimer registerWorkerTimer(String topologyId, String name, AsmTimer timer) {
+        return (AsmTimer) registerWorkerMetric(
+                MetricUtils.workerMetricName(topologyId, host, 0, name, MetricType.TIMER), timer);
     }
 
-    public static class Builder {
-        public static final int COUNTER = 1;
-        public static final int METER = 2;
-        public static final int HISTOGRAM = 3;
-        public static final int TIMER = 4;
-
-        public static Metric mkInstance(int type) {
-            if (type == COUNTER) {
-                return new Counter<Double>(Double.valueOf(0));
-            } else if (type == METER) {
-                return new Meter();
-            } else if (type == HISTOGRAM) {
-                return new Histogram();
-            } else if (type == TIMER) {
-                return new Timer();
-            } else {
-                throw new IllegalArgumentException();
-            }
-        }
+    public static AsmMetric getStreamMetric(String name) {
+        name = fixNameIfPossible(name);
+        return streamMetrics.getMetric(name);
+    }
+
+    public static AsmMetric getTaskMetric(String name) {
+        name = fixNameIfPossible(name);
+        return taskMetrics.getMetric(name);
+    }
+
+    public static AsmMetric getComponentMetric(String name) {
+        name = fixNameIfPossible(name);
+        return componentMetrics.getMetric(name);
+    }
+
+    public static AsmMetric getWorkerMetric(String name) {
+        name = fixNameIfPossible(name);
+        return workerMetrics.getMetric(name);
     }
 
-    public static MetricInfo getExposeWorkerMetrics() {
-        return exposeWorkerMetrics;
+    public static void unregisterWorkerMetric(String name) {
+        name = fixNameIfPossible(name);
+        workerMetrics.remove(name);
     }
 
-    public static void setExposeWorkerMetrics(MetricInfo exposeWorkerMetrics) {
-        JStormMetrics.exposeWorkerMetrics = exposeWorkerMetrics;
+    public static void unregisterNettyMetric(String name) {
+        name = fixNameIfPossible(name, NETTY_GROUP);
+        nettyMetrics.remove(name);
     }
 
-    public static Map<Integer, MetricInfo> getExposeTaskMetrics() {
-        return exposeTaskMetrics;
+    public static void unregisterTaskMetric(String name) {
+        name = fixNameIfPossible(name);
+        taskMetrics.remove(name);
     }
 
-    public static void setExposeTaskMetrics(
-            Map<Integer, MetricInfo> exposeTaskMetrics) {
-        JStormMetrics.exposeTaskMetrics = exposeTaskMetrics;
+    public static AsmMetricRegistry getNettyMetrics() {
+        return nettyMetrics;
     }
 
-    public static Map<String, MetricInfo> getExposeNettyMetrics() {
-        return exposeNettyMetrics;
+    public static AsmMetricRegistry getWorkerMetrics() {
+        return workerMetrics;
     }
 
-    public static void setExposeNettyMetrics(Map<String, MetricInfo> exposeNettyMetrics) {
-        JStormMetrics.exposeNettyMetrics = exposeNettyMetrics;
+    public static AsmMetricRegistry getComponentMetrics() {
+        return componentMetrics;
     }
 
-    
+    public static AsmMetricRegistry getTaskMetrics() {
+        return taskMetrics;
+    }
+
+    public static AsmMetricRegistry getStreamMetrics() {
+        return streamMetrics;
+    }
+
+    /**
+     * convert snapshots to thrift objects, note that timestamps are aligned to min during the conversion,
+     * so nimbus server will get snapshots with aligned timestamps (still in ms as TDDL will use it).
+     */
+    public static MetricInfo computeAllMetrics() {
+        long start = System.currentTimeMillis();
+        MetricInfo metricInfo = MetricUtils.mkMetricInfo();
+
+        List<Map.Entry<String, AsmMetric>> entries = Lists.newArrayList();
+        entries.addAll(streamMetrics.metrics.entrySet());
+        entries.addAll(taskMetrics.metrics.entrySet());
+        entries.addAll(componentMetrics.metrics.entrySet());
+        entries.addAll(workerMetrics.metrics.entrySet());
+        entries.addAll(nettyMetrics.metrics.entrySet());
+        entries.addAll(topologyMetrics.metrics.entrySet());
+
+        for (Map.Entry<String, AsmMetric> entry : entries) {
+            String name = entry.getKey();
+            AsmMetric metric = entry.getValue();
+            Map<Integer, AsmSnapshot> snapshots = metric.getSnapshots();
+
+            int op = metric.getOp();
+            if ((op & AsmMetric.MetricOp.LOG) == AsmMetric.MetricOp.LOG) {
+                MetricUtils.printMetricSnapshot(metric, snapshots);
+            }
+
+            if ((op & AsmMetric.MetricOp.REPORT) == AsmMetric.MetricOp.REPORT) {
+                MetaType metaType = MetricUtils.metaType(metric.getMetricName());
+                try {
+                    if (metric instanceof AsmCounter) {
+                        Map data = MetricUtils.toThriftCounterSnapshots(snapshots);
+                        putIfNotEmpty(metricInfo.get_metrics(), name, data);
+                    } else if (metric instanceof AsmGauge) {
+                        Map data = MetricUtils.toThriftGaugeSnapshots(snapshots);
+                        putIfNotEmpty(metricInfo.get_metrics(), name, data);
+                    } else if (metric instanceof AsmMeter) {
+                        Map data = MetricUtils.toThriftMeterSnapshots(snapshots);
+                        putIfNotEmpty(metricInfo.get_metrics(), name, data);
+                    } else if (metric instanceof AsmHistogram) {
+                        Map data = MetricUtils.toThriftHistoSnapshots(metaType, snapshots);
+                        putIfNotEmpty(metricInfo.get_metrics(), name, data);
+                    } else if (metric instanceof AsmTimer) {
+                        Map data = MetricUtils.toThriftTimerSnapshots(metaType, snapshots);
+                        putIfNotEmpty(metricInfo.get_metrics(), name, data);
+                    }
+                } catch (Exception ex) {
+                    LOG.error("Error", ex);
+                }
+            }
+        }
+
+        if (debug) {
+            MetricUtils.printMetricInfo(metricInfo, debugMetricNames);
+        }
+        LOG.info("compute all metrics, cost:{}", System.currentTimeMillis() - start);
+
+        return metricInfo;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T extends Map> void putIfNotEmpty(Map base, String name, T data) {
+        if (data != null && data.size() > 0) {
+            base.put(name, data);
+        }
+    }
+
+    public static String fixNameIfPossible(String name) {
+        return fixNameIfPossible(name, DEFAULT_GROUP);
+    }
+
+    public static String fixNameIfPossible(String name, String group) {
+        MetaType type = MetricUtils.metaType(name);
+        String[] parts = name.split(MetricUtils.DELIM);
+        if (parts[1].equals("")) {
+            parts[1] = topologyId;
+        }
+        if (type != MetaType.WORKER && parts[5].equals("")) {
+            parts[5] = group;
+        } else if (parts[2].equals("")) {
+            parts[2] = host;
+            parts[3] = port + "";
+            if (parts[4].equals("")) {
+                parts[4] = group;
+            }
+        }
+        return MetricUtils.concat(parts);
+    }
+
+    public static void main(String[] args) throws Exception {
+        JStormMetrics.topologyId = "topologyId";
+        JStormMetrics.host = "127.0.0.1";
+        JStormMetrics.port = 6800;
+
+        String tpId = "test";
+        String compName = "bolt";
+        int taskId = 1;
+        String streamId = "defaultStream";
+        String type = MetaType.STREAM.getV() + MetricType.COUNTER.getV();
+        String metricName = "counter1";
+        String group = "udf";
+
+        String name = MetricUtils.metricName(type, tpId, compName, taskId, streamId, group, metricName);
+        System.out.println(name);
+
+        AsmCounter counter = new AsmCounter();
+        AsmMetric ret1 = JStormMetrics.registerStreamMetric(name, counter, false);
+        AsmMetric ret2 = JStormMetrics.registerStreamMetric(name, counter, false);
+        System.out.println(ret1 == ret2);
+
+        counter.update(1L);
+
+        metricName = MetricUtils.workerMetricName("metric1", MetricType.COUNTER);
+        System.out.println(metricName);
+        metricName = fixNameIfPossible(metricName);
+        System.out.println(metricName);
+        System.out.println(fixNameIfPossible(metricName));
+    }
 
-    
 }