You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:52 UTC

[14/51] [partial] storm git commit: Update JStorm to latest release 2.1.0

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java
index b8eeb20..7af67c2 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,711 +17,895 @@
  */
 package com.alibaba.jstorm.daemon.nimbus;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.cache.JStormCache;
+import backtype.storm.generated.MetricInfo;
+import backtype.storm.generated.MetricSnapshot;
+import backtype.storm.generated.TopologyMetric;
+import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.callback.AsyncLoopThread;
 import com.alibaba.jstorm.callback.RunnableCallback;
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.Cluster;
-import com.alibaba.jstorm.cluster.Common;
 import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.metric.AlimonitorClient;
-import com.alibaba.jstorm.metric.MetricDef;
-import com.alibaba.jstorm.metric.MetricSendClient;
-import com.alibaba.jstorm.metric.MetricThrift;
-import com.alibaba.jstorm.metric.SimpleJStormMetric;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.MetricMeta;
+import com.alibaba.jstorm.daemon.nimbus.metric.uploader.DefaultMetricUploader;
+import com.alibaba.jstorm.daemon.nimbus.metric.uploader.MetricUploader;
+import com.alibaba.jstorm.metric.*;
 import com.alibaba.jstorm.schedule.Assignment;
 import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
-import com.alibaba.jstorm.utils.TimeCacheMap;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
 import com.codahale.metrics.Gauge;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import backtype.storm.generated.MetricInfo;
-import backtype.storm.generated.MetricWindow;
-import backtype.storm.generated.TopologyMetric;
-import backtype.storm.generated.WorkerUploadMetrics;
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerArray;
 
-public class TopologyMetricsRunnable extends RunnableCallback {
+/**
+ * Topology metrics thread which resides in nimbus.
+ * This class is responsible for generating metrics IDs and uploading metrics to the underlying storage system.
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class TopologyMetricsRunnable extends Thread {
     private static final Logger LOG = LoggerFactory.getLogger(TopologyMetricsRunnable.class);
-    private static final String DEAD_SUPERVISOR_HEAD = "DeadSupervisor-";
-    
-    public static interface Event {
-        
-    }
-    
-    public static class Update implements Event {
-        public WorkerUploadMetrics workerMetrics;
-    }
-    
-    public static class Remove implements Event {
-        public String topologyId;
-    }
-    
-    public static class Upload implements Event {
-        public long timeStamp;
-    }
-    
-    public static final String CACHE_NAMESPACE_METRIC = "cache_namespace_metric";
-    public static final String CACHE_NAMESPACE_NETTY = "cache_namespace_netty";
-    protected NimbusCache nimbusCache;
-    protected JStormCache dbCache;
-    
+
+    protected JStormMetricCache metricCache;
+
     /**
-     * cache all worker metrics will waste a little memory
-     * 
+     * map<topologyId, map<worker, metricInfo>>, local memory cache, keeps only one snapshot of metrics.
      */
-    protected Map<String, Set<String>> topologyWorkers;
-    protected TimeCacheMap<String, Long> removing;
-    
-    protected BlockingDeque<TopologyMetricsRunnable.Event> queue;
+    protected final ConcurrentMap<String, TopologyMetricContext> topologyMetricContexts =
+            new ConcurrentHashMap<>();
+
+    protected final BlockingDeque<TopologyMetricsRunnable.Event> queue = new LinkedBlockingDeque<>();
+
+    private static final String PENDING_UPLOAD_METRIC_DATA = "__pending.upload.metrics__";
+    private static final String PENDING_UPLOAD_METRIC_DATA_INFO = "__pending.upload.metrics.info__";
+
+    // the slot is empty
+    private static final int UNSET = 0;
+    // the slot is ready for uploading
+    private static final int SET = 1;
+    // the slot is being uploaded
+    private static final int UPLOADING = 2;
+    // the slot will be set ready for uploading
+    private static final int PRE_SET = 3;
+
+    protected final AtomicIntegerArray metricStat;
+
     protected StormClusterState stormClusterState;
-    
-    protected MetricSendClient metricSendClient;
-    protected TopologyMetric emptyTopologyMetric = mkTopologyMetric();
-    protected TreeMap<String, MetricInfo>   emptyNettyMetric = new TreeMap<String, MetricInfo>();
+
+    protected MetricUploader metricUploader;
+
     protected AtomicBoolean isShutdown;
-    protected boolean localMode;
-    protected TopologyNettyMgr topologyNettyMgr;
-    
-    protected Histogram updateHistogram;
-    protected AtomicBoolean isUploading = new AtomicBoolean(false);
-    protected Histogram uploadHistogram;
-    
-    public TopologyMetricsRunnable(NimbusData nimbusData) {
-        
-        this.nimbusCache = nimbusData.getNimbusCache();
-        this.dbCache = nimbusCache.getDbCache();
-        this.topologyWorkers = new ConcurrentHashMap<String, Set<String>>();
-        this.removing = new TimeCacheMap<String, Long>(600);
-        this.queue = new LinkedBlockingDeque<TopologyMetricsRunnable.Event>();
+    protected String clusterName;
+    protected int maxPendingUploadMetrics;
+
+    private final boolean localMode;
+    private final NimbusData nimbusData;
+    private MetricQueryClient metricQueryClient;
+
+    private ScheduledExecutorService clusterMetricsUpdateExecutor;
+
+    /**
+     * refreshes alive topologies every min or on startup.
+     */
+    protected AsyncLoopThread refreshTopologiesThread;
+
+    /**
+     * the thread for metric sending, checks every second.
+     */
+    private final Thread uploadThread = new MetricsUploadThread();
+
+    /**
+     * async flush metric meta
+     */
+    private final Thread flushMetricMetaThread = new FlushMetricMetaThread();
+
+    /**
+     * use default UUID generator
+     */
+    private final MetricIDGenerator metricIDGenerator = new DefaultMetricIDGenerator();
+
+    public TopologyMetricsRunnable(final NimbusData nimbusData) {
+        setName(getClass().getSimpleName());
+
+        this.nimbusData = nimbusData;
+
+        this.localMode = nimbusData.isLocalMode();
+        if (localMode) {
+            this.metricStat = new AtomicIntegerArray(1);
+            return;
+        }
+
+        LOG.info("create topology metrics runnable.");
+        this.metricCache = nimbusData.getMetricCache();
         this.stormClusterState = nimbusData.getStormClusterState();
         this.isShutdown = nimbusData.getIsShutdown();
-        this.topologyNettyMgr = nimbusData.getTopologyNettyMgr();
-        
-        if (ConfigExtension.isAlimonitorMetricsPost(nimbusData.getConf())) {
-            metricSendClient = new AlimonitorClient(AlimonitorClient.DEFAUT_ADDR, AlimonitorClient.DEFAULT_PORT, true);
-        } else {
-            metricSendClient = new MetricSendClient();
-        }
-        localMode = StormConfig.local_mode(nimbusData.getConf());
-        
-        updateHistogram = SimpleJStormMetric.registerHistorgram("TopologyMetricsRunnable_Update");
-        uploadHistogram = SimpleJStormMetric.registerHistorgram("TopologyMetricsRunnable_Upload");
-        
-        SimpleJStormMetric.registerWorkerGauge(new Gauge<Double>() {
-            
+
+        clusterName = ConfigExtension.getClusterName(nimbusData.getConf());
+        if (clusterName == null) {
+            throw new RuntimeException("cluster.name property must be set in storm.yaml!");
+        }
+
+        this.maxPendingUploadMetrics = ConfigExtension.getMaxPendingMetricNum(nimbusData.getConf());
+        this.metricStat = new AtomicIntegerArray(this.maxPendingUploadMetrics);
+
+        int cnt = 0;
+        for (int i = 0; i < maxPendingUploadMetrics; i++) {
+            TopologyMetricDataInfo obj = getMetricDataInfoFromCache(i);
+            if (obj != null) {
+                this.metricStat.set(i, SET);
+                cnt++;
+            }
+        }
+        LOG.info("pending upload metrics: {}", cnt);
+
+        // init alive topologies from zk
+        this.refreshTopologies();
+        this.refreshTopologiesThread = new AsyncLoopThread(new RefreshTopologiesThread());
+
+        this.clusterMetricsUpdateExecutor = Executors.newSingleThreadScheduledExecutor();
+        this.clusterMetricsUpdateExecutor.scheduleAtFixedRate(new Runnable() {
             @Override
-            public Double getValue() {
-                // TODO Auto-generated method stub
-                return (double) queue.size();
+            public void run() {
+                int secOffset = TimeUtils.secOffset();
+                int offset = 55;
+                if (secOffset < offset) {
+                    JStormUtils.sleepMs((offset - secOffset) * 1000);
+                } else if (secOffset == offset) {
+                    // do nothing
+                } else {
+                    JStormUtils.sleepMs((60 - secOffset + offset) * 1000);
+                }
+
+                LOG.info("cluster metrics force upload.");
+                mergeAndUploadClusterMetrics();
             }
-        }, "TopologyMetricsRunnable_Queue");
+        }, 5, 60, TimeUnit.SECONDS);
+
+        // track nimbus JVM heap
+        JStormMetrics.registerWorkerGauge(JStormMetrics.NIMBUS_METRIC_KEY, MetricDef.MEMORY_USED,
+                new AsmGauge(new Gauge<Double>() {
+                    @Override
+                    public Double getValue() {
+                        MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+                        MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage();
+                        return (double) memoryUsage.getUsed();
+                    }
+                }));
     }
-    
-    public void pushEvent(TopologyMetricsRunnable.Event cmd) {
-        queue.offer(cmd);
+
+    /**
+     * init metric uploader
+     */
+    public void init() {
+        String metricUploadClass = ConfigExtension.getMetricUploaderClass(nimbusData.getConf());
+        if (StringUtils.isBlank(metricUploadClass)) {
+            metricUploadClass = DefaultMetricUploader.class.getName();
+        }
+        // init metric uploader
+        LOG.info("metric uploader class:{}", metricUploadClass);
+        Object instance = Utils.newInstance(metricUploadClass);
+        if (!(instance instanceof MetricUploader)) {
+            throw new RuntimeException(metricUploadClass + " isn't MetricUploader class ");
+        }
+        this.metricUploader = (MetricUploader) instance;
+        try {
+            metricUploader.init(nimbusData);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        LOG.info("Successfully init {}", metricUploadClass);
+
+        // init metric query client
+        String metricQueryClientClass = ConfigExtension.getMetricQueryClientClass(nimbusData.getConf());
+        if (!StringUtils.isBlank(metricQueryClientClass)) {
+            LOG.info("metric query client class:{}", metricQueryClientClass);
+            this.metricQueryClient = (MetricQueryClient) Utils.newInstance(metricQueryClientClass);
+        } else {
+            LOG.warn("use default metric query client class.");
+            this.metricQueryClient = new DefaultMetricQueryClient();
+        }
+        try {
+            metricQueryClient.init(nimbusData.getConf());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        this.uploadThread.start();
+        this.flushMetricMetaThread.start();
+
+        LOG.info("init topology metric runnable done.");
+    }
+
+    public void shutdown() {
+        LOG.info("Begin to shutdown");
+        metricUploader.cleanup();
+
+        LOG.info("Successfully shutdown");
     }
-    
-    public TopologyMetric mkTopologyMetric() {
-        TopologyMetric emptyTopologyMetric = new TopologyMetric();
-        
-        MetricInfo topologyMetricInfo = MetricThrift.mkMetricInfo();
-        emptyTopologyMetric.set_topologyMetric(topologyMetricInfo);
-        
-        emptyTopologyMetric.set_componentMetric(new HashMap<String, MetricInfo>());
-        emptyTopologyMetric.set_workerMetric(new HashMap<String, MetricInfo>());
-        emptyTopologyMetric.set_taskMetric(new HashMap<Integer, MetricInfo>());
-        return emptyTopologyMetric;
-    }
-    
+
     @Override
     public void run() {
-        try {
-            TopologyMetricsRunnable.Event event = queue.take();
-            
-            if (event instanceof Remove) {
-                
-                handleRemoveEvent((Remove) event);
-                return;
-            } else if (event instanceof Update) {
-                handleUpdateEvent((Update) event);
-                return;
-            } else if (event instanceof Upload) {
-                handleUploadEvent((Upload) event);
-                return;
-            } else {
-                LOG.error("Unknow event type");
+        while (!isShutdown.get()) {
+            if (localMode) {
                 return;
             }
-            
-        } catch (Exception e) {
-            if (isShutdown.get() == false) {
-                LOG.error(e.getMessage(), e);
+
+            try {
+                // wait for metricUploader to be ready, for some external plugin like database, it'll take a few seconds
+                if (this.metricUploader != null) {
+                    Event event = queue.poll();
+                    if (event == null) {
+                        continue;
+                    }
+
+                    if (event instanceof Remove) {
+                        handleRemoveEvent((Remove) event);
+                    } else if (event instanceof Update) {
+                        handleUpdateEvent((Update) event);
+                    } else if (event instanceof Refresh) {
+                        handleRefreshEvent((Refresh) event);
+                    } else if (event instanceof KillTopologyEvent) {
+                        handleKillTopologyEvent((KillTopologyEvent) event);
+                    } else if (event instanceof StartTopologyEvent) {
+                        handleStartTopologyEvent((StartTopologyEvent) event);
+                    } else if (event instanceof TaskDeadEvent) {
+                        handleTaskDeadEvent((TaskDeadEvent) event);
+                    } else if (event instanceof TaskStartEvent) {
+                        handleTaskStartEvent((TaskStartEvent) event);
+                    } else {
+                        LOG.error("Unknown event type:{}", event.getClass());
+                    }
+                }
+            } catch (Exception e) {
+                if (!isShutdown.get()) {
+                    LOG.error(e.getMessage(), e);
+                }
             }
         }
     }
-    
-    public void handleRemoveEvent(Remove event) {
-        String topologyId = event.topologyId;
-        TopologyMetric topologyMetric = (TopologyMetric) dbCache.get(getTopologyKey(topologyId));
-        if (topologyMetric == null) {
-            LOG.warn("No TopologyMetric of  " + topologyId);
-            return;
+
+
+    public boolean isTopologyAlive(String topologyId) {
+        return topologyMetricContexts.containsKey(topologyId);
+    }
+
+    private int getAndPresetFirstEmptyIndex() {
+        for (int i = 0; i < maxPendingUploadMetrics; i++) {
+            if (metricStat.get(i) == UNSET) {
+                if (metricStat.compareAndSet(i, UNSET, PRE_SET)) {
+                    return i;
+                }
+            }
         }
-        
-        removing.put(topologyId, System.currentTimeMillis());
-        dbCache.remove(getTopologyKey(topologyId));
-        dbCache.remove(getNettyTopologyKey(topologyId));
-        topologyNettyMgr.rmTopology(topologyId);
-        LOG.info("Successfully remove TopologyMetric of " + topologyId);
-        return;
-        
-    }
-    
-    public void cleanDeadSupervisorWorker(TopologyMetric metric) {
-        List<String> removeList = new ArrayList<String>();
-        
-        Map<String, MetricInfo> workerMetric = metric.get_workerMetric();
-        if (workerMetric == null) {
-            return;
+        return -1;
+    }
+
+    private int getFirstPendingUploadIndex() {
+        for (int i = 0; i < maxPendingUploadMetrics; i++) {
+            if (metricStat.get(i) == SET) {
+                return i;
+            }
         }
-        for (String hostPort : workerMetric.keySet()) {
-            if (hostPort.startsWith(DEAD_SUPERVISOR_HEAD)) {
-                removeList.add(hostPort);
+        return -1;
+    }
+
+    public void markUploaded(int idx) {
+        this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA + idx);
+        this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA_INFO + idx);
+        this.metricStat.set(idx, UNSET);
+    }
+
+    public void markUploading(int idx) {
+        this.metricStat.set(idx, UPLOADING);
+    }
+
+    public void markSet(int idx) {
+        this.metricStat.set(idx, SET);
+    }
+
+    public TopologyMetric getMetricDataFromCache(int idx) {
+        return (TopologyMetric) metricCache.get(PENDING_UPLOAD_METRIC_DATA + idx);
+    }
+
+    public TopologyMetricDataInfo getMetricDataInfoFromCache(int idx) {
+        return (TopologyMetricDataInfo) metricCache.get(PENDING_UPLOAD_METRIC_DATA_INFO + idx);
+    }
+
+    public void pushEvent(Event cmd) {
+        queue.offer(cmd);
+    }
+
+    public Map<String, Long> registerMetrics(String topologyId, Set<String> metricNames) {
+        TimeTicker ticker = new TimeTicker(TimeUnit.MILLISECONDS, true);
+
+        ConcurrentMap<String, Long> memMeta = topologyMetricContexts.get(topologyId).getMemMeta();
+        Map<String, Long> ret = new HashMap<>();
+        for (String metricName : metricNames) {
+            Long id = memMeta.get(metricName);
+            if (id != null && MetricUtils.isValidId(id)) {
+                ret.put(metricName, id);
+            } else {
+                id = metricIDGenerator.genMetricId(metricName);
+                Long old = memMeta.putIfAbsent(metricName, id);
+                if (old == null) {
+                    ret.put(metricName, id);
+                } else {
+                    ret.put(metricName, old);
+                }
             }
         }
-        
-        for (String removed : removeList) {
-            workerMetric.remove(removed);
+
+        long cost = ticker.stop();
+        LOG.info("register metrics, topology:{}, size:{}, cost:{}", topologyId, metricNames.size(), cost);
+
+        return ret;
+    }
+
+    public void handleRemoveEvent(Remove event) {
+        String topologyId = event.topologyId;
+        if (topologyId != null) {
+            removeTopology(topologyId);
         }
+        LOG.info("remove topology:{}.", topologyId);
+
     }
-    
-    public void cleanTopology() {
-        Map<String, Long> removingMap = removing.buildMap();
-        
-        Map<String, Assignment> assignMap = null;
+
+    private void removeTopology(String topologyId) {
+        metricCache.removeTopology(topologyId);
+        metricCache.removeSampleRate(topologyId);
+
+        topologyMetricContexts.remove(topologyId);
+    }
+
+
+    public void refreshTopologies() {
+        if (!topologyMetricContexts.containsKey(JStormMetrics.NIMBUS_METRIC_KEY)) {
+            LOG.info("adding __nimbus__ to metric context.");
+            Set<ResourceWorkerSlot> workerSlot = Sets.newHashSet(new ResourceWorkerSlot());
+            TopologyMetricContext metricContext = new TopologyMetricContext(workerSlot);
+            topologyMetricContexts.putIfAbsent(JStormMetrics.NIMBUS_METRIC_KEY, metricContext);
+            syncMetaFromCache(JStormMetrics.NIMBUS_METRIC_KEY, topologyMetricContexts.get(JStormMetrics.NIMBUS_METRIC_KEY));
+        }
+        if (!topologyMetricContexts.containsKey(JStormMetrics.CLUSTER_METRIC_KEY)) {
+            LOG.info("adding __cluster__ to metric context.");
+            Set<ResourceWorkerSlot> workerSlot = Sets.newHashSet(new ResourceWorkerSlot());
+            Map conf = new HashMap();
+            //there's no need to consider sample rate when cluster metrics merge
+            conf.put(ConfigExtension.TOPOLOGY_METRIC_SAMPLE_RATE, 1.0);
+            TopologyMetricContext metricContext = new TopologyMetricContext(
+                    JStormMetrics.CLUSTER_METRIC_KEY, workerSlot, conf);
+            topologyMetricContexts.putIfAbsent(JStormMetrics.CLUSTER_METRIC_KEY, metricContext);
+            syncMetaFromCache(JStormMetrics.CLUSTER_METRIC_KEY, topologyMetricContexts.get(JStormMetrics.CLUSTER_METRIC_KEY));
+        }
+
+        Map<String, Assignment> assignMap;
         try {
             assignMap = Cluster.get_all_assignment(stormClusterState, null);
+            for (String topologyId : assignMap.keySet()) {
+                if (!topologyMetricContexts.containsKey(topologyId)) {
+                    Assignment assignment = assignMap.get(topologyId);
+                    TopologyMetricContext metricContext =
+                            new TopologyMetricContext(assignment.getWorkers());
+                    metricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment));
+                    syncMetaFromCache(topologyId, metricContext);
+
+                    LOG.info("adding {} to metric context.", topologyId);
+                    topologyMetricContexts.put(topologyId, metricContext);
+                }
+            }
         } catch (Exception e1) {
-            // TODO Auto-generated catch block
-            LOG.info("Failed to get Assignments");
+            LOG.warn("failed to get assignments");
+            return;
         }
-        
-        for (String topologyId : topologyWorkers.keySet()) {
-            if (assignMap.containsKey(topologyId) == false) {
-                removingMap.put(topologyId, System.currentTimeMillis());
+
+        List<String> removing = new ArrayList<>();
+        for (String topologyId : topologyMetricContexts.keySet()) {
+            if (!JStormMetrics.NIMBUS_METRIC_KEY.equals(topologyId)
+                    && !JStormMetrics.CLUSTER_METRIC_KEY.equals(topologyId)
+                    && !assignMap.containsKey(topologyId)) {
+                removing.add(topologyId);
             }
         }
-        
-        for (String topologyId : removingMap.keySet()) {
-            dbCache.remove(getTopologyKey(topologyId));
-            
-            Set<String> workers = topologyWorkers.get(topologyId);
-            if (workers != null) {
-                for (String workerSlot : workers) {
-                    dbCache.remove(getWorkerKey(topologyId, workerSlot));
+
+        for (String topologyId : removing) {
+            LOG.info("removing topology:{}", topologyId);
+            removeTopology(topologyId);
+        }
+    }
+
+    /**
+     * sync topology metric meta from external storage like TDDL/OTS.
+     * nimbus server will skip syncing, only followers do this
+     */
+    public void syncTopologyMeta() {
+        String nimbus = JStormMetrics.NIMBUS_METRIC_KEY;
+        if (topologyMetricContexts.containsKey(nimbus)) {
+            syncMetaFromRemote(nimbus, topologyMetricContexts.get(nimbus));
+        }
+        String cluster = JStormMetrics.CLUSTER_METRIC_KEY;
+        if (topologyMetricContexts.containsKey(cluster)) {
+            syncMetaFromRemote(cluster, topologyMetricContexts.get(cluster));
+        }
+
+        Map<String, Assignment> assignMap;
+        try {
+            assignMap = Cluster.get_all_assignment(stormClusterState, null);
+            for (String topologyId : assignMap.keySet()) {
+                if (topologyMetricContexts.containsKey(topologyId)) {
+                    Assignment assignment = assignMap.get(topologyId);
+                    TopologyMetricContext metricContext =
+                            new TopologyMetricContext(assignment.getWorkers());
+                    metricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment));
+
+                    syncMetaFromCache(topologyId, metricContext);
+                    syncMetaFromRemote(topologyId, metricContext);
                 }
-                topologyWorkers.remove(topologyId);
-            }
-            
-        }
-        
-        for (Entry<String, Set<String>> entry : topologyWorkers.entrySet()) {
-            String topologyId = entry.getKey();
-            Set<String> metricWorkers = entry.getValue();
-            
-            Set<String> workerSlots = new HashSet<String>();
-            
-            Assignment assignment = assignMap.get(topologyId);
-            if (assignment == null) {
-                LOG.error("Assignment disappear of " + topologyId);
-                continue;
             }
-            
-            for (ResourceWorkerSlot worker : assignment.getWorkers()) {
-                String slot = getWorkerSlotName(worker.getNodeId(), worker.getPort());
-                workerSlots.add(slot);
+        } catch (Exception e1) {
+            LOG.warn("failed to get assignments");
+        }
+    }
+
+    /**
+     * sync metric meta from rocks db into mem cache on startup
+     */
+    private void syncMetaFromCache(String topologyId, TopologyMetricContext context) {
+        if (!context.syncMeta()) {
+            Map<String, Long> meta = metricCache.getMeta(topologyId);
+            if (meta != null) {
+                context.getMemMeta().putAll(meta);
             }
-            
-            Set<String> removes = new HashSet<String>();
-            for (String slot : metricWorkers) {
-                if (workerSlots.contains(slot) == false) {
-                    LOG.info("Remove worker metrics of {}:{}", topologyId, slot);
-                    removes.add(slot);
+            context.setSyncMeta(true);
+        }
+    }
+
+    private void syncMetaFromRemote(String topologyId, TopologyMetricContext context) {
+        try {
+            int memSize = context.getMemMeta().size();
+            int zkSize = (Integer) stormClusterState.get_topology_metric(topologyId);
+
+            if (memSize != zkSize) {
+                ConcurrentMap<String, Long> memMeta = context.getMemMeta();
+                for (MetaType metaType : MetaType.values()) {
+                    List<MetricMeta> metaList = metricQueryClient.getMetricMeta(clusterName, topologyId, metaType);
+                    if (metaList != null) {
+                        LOG.info("get remote metric meta, topology:{}, metaType:{}, mem:{}, zk:{}, new size:{}",
+                                topologyId, metaType, memSize, zkSize, metaList.size());
+                        for (MetricMeta meta : metaList) {
+                            memMeta.putIfAbsent(meta.getFQN(), meta.getId());
+                        }
+                    }
                 }
+                metricCache.putMeta(topologyId, memMeta);
             }
-            
-            for (String slot : removes) {
-                metricWorkers.remove(slot);
-                dbCache.remove(getWorkerKey(topologyId, slot));
-            }
+        } catch (Exception ex) {
+            LOG.error("failed to sync remote meta", ex);
         }
     }
-    
+
     /**
-     * Upload metric to ZK
-     * 
-     * @param event
+     * send topology track to jstorm monitor
      */
-    public void handleUploadEvent(Upload event) {
-        if (isUploading.getAndSet(true) == true) {
-            LOG.info("Nimbus is alread uploading");
-            return ;
-        }
-        
-        long start = System.currentTimeMillis();
-        
-        cleanTopology();
-        
-        render();
-        
-        isUploading.set(false);
-        
-        long end = System.currentTimeMillis();
-        uploadHistogram.update(end - start);
-        
-        
-    }
-    
-    public String getWorkerHostname(WorkerUploadMetrics workerMetrics) {
-        
-        String hostname = null;
-        String supervisorId = workerMetrics.get_supervisor_id();
-        try {
-            hostname = Cluster.get_supervisor_hostname(stormClusterState, supervisorId);
-        } catch (Exception e) {
-            // TODO Auto-generated catch block
-            LOG.warn("Failed to get hostname of " + supervisorId);
+    protected void handleKillTopologyEvent(KillTopologyEvent event) {
+        metricUploader.sendEvent(this.clusterName, event);
+        removeTopology(event.topologyId);
+    }
+
+    private void handleStartTopologyEvent(StartTopologyEvent event) {
+        this.metricCache.putSampleRate(event.topologyId, event.sampleRate);
+        metricUploader.sendEvent(this.clusterName, event);
+        if (!topologyMetricContexts.containsKey(event.topologyId)) {
+            TopologyMetricContext metricContext = new TopologyMetricContext();
+            // note that workerNum is not set here.
+            this.topologyMetricContexts.put(event.topologyId, metricContext);
         }
-        if (hostname == null) {
-            hostname = DEAD_SUPERVISOR_HEAD + supervisorId;
+    }
+
+    private void handleTaskDeadEvent(TaskDeadEvent event) {
+        metricUploader.sendEvent(this.clusterName, event);
+
+        // unregister dead workers
+        Set<ResourceWorkerSlot> workers = new HashSet<>();
+        workers.addAll(event.deadTasks.values());
+        for (ResourceWorkerSlot worker : workers) {
+            metricCache.unregisterWorker(event.topologyId, worker.getHostname(), worker.getPort());
         }
-        
-        return hostname;
     }
-    
-    public void avgMetricWindow(MetricWindow metric, int parallel) {
-        if (parallel == 0) {
-            return;
+
+    private void handleTaskStartEvent(final TaskStartEvent event) {
+        Assignment assignment = event.newAssignment;
+        TopologyMetricContext metricContext = topologyMetricContexts.get(event.topologyId);
+        if (metricContext != null) {
+            metricContext.setWorkerSet(assignment.getWorkers());
+        } else {
+            metricContext = new TopologyMetricContext();
+            metricContext.setWorkerSet(assignment.getWorkers());
+            topologyMetricContexts.put(event.topologyId, metricContext);
         }
-        Map<Integer, Double> map = metric.get_metricWindow();
-        Map<Integer, Double> newMap = new HashMap<Integer, Double>();
-        if (map != null) {
-            for (Entry<Integer, Double> entry : map.entrySet()) {
-                newMap.put(entry.getKey(), entry.getValue() / parallel);
+        metricUploader.sendEvent(this.clusterName, event);
+    }
+
+    /**
+     * merge and send all metric data.
+     */
+    public void handleRefreshEvent(Refresh dummy) {
+        TimeTicker ticker = new TimeTicker(TimeUnit.MILLISECONDS, true);
+        try {
+            refreshTopologies();
+            LOG.info("refresh topologies, cost:{}", ticker.stopAndRestart());
+            if (!nimbusData.isLeader()) {
+                syncTopologyMeta();
+                LOG.info("sync topology meta, cost:{}", ticker.stop());
             }
+        } catch (Exception ex) {
+            LOG.error("handleRefreshEvent error:", ex);
         }
-        
-        metric.set_metricWindow(newMap);
-    }
-    
-    public MetricInfo mergeMetricInfo(MetricInfo from, MetricInfo to, Set<String> tags) {
-        if (to == null) {
-            to = MetricThrift.mkMetricInfo();
-        }
-        
-        if (from.get_baseMetric() == null) {
-            LOG.warn("No base Metric ");
-            return to;
-        }
-        
-        for (String tag : tags) {
-            
-            MetricWindow fromMetric = from.get_baseMetric().get(tag);
-            Map<String, MetricWindow> toMetricMap = to.get_baseMetric();
-            if (toMetricMap == null) {
-                toMetricMap = new HashMap<String, MetricWindow>();
-                to.set_baseMetric(toMetricMap);
-            }
-            
-            MetricWindow toMetric = toMetricMap.get(tag);
-            
-            toMetric = MetricThrift.mergeMetricWindow(fromMetric, toMetric);
-            
-            toMetricMap.put(tag, toMetric);
-            
-        }
-        
-        return to;
-    }
-    
-    public Map<String, Map<String, MetricWindow>> mergeTaskStreams(
-            Map<String, Map<String, MetricWindow>> componentStreams,
-            Map<String, Map<String, MetricWindow>> taskStreams,
-            Map<String, Map<String, AtomicInteger>> componentStreamParallel) {
-        
-        if (taskStreams == null || taskStreams.size() == 0) {
-            return componentStreams;
-        }
-        
-        if (componentStreams == null) {
-            componentStreams = new HashMap<String, Map<String, MetricWindow>>();
-        }
-        
-        for (Entry<String, Map<String, MetricWindow>> entry : taskStreams.entrySet()) {
+    }
+
+    private TopologyMetricContext getClusterTopologyMetricContext() {
+        return topologyMetricContexts.get(JStormMetrics.CLUSTER_METRIC_KEY);
+    }
+
+    private void mergeAndUploadClusterMetrics() {
+        TopologyMetricContext context = getClusterTopologyMetricContext();
+        TopologyMetric tpMetric = context.mergeMetrics();
+        if (tpMetric == null) {
+            tpMetric = MetricUtils.mkTopologyMetric();
+            tpMetric.set_topologyMetric(MetricUtils.mkMetricInfo());
+        }
+
+        //reset snapshots metric id
+        MetricInfo clusterMetrics = tpMetric.get_topologyMetric();
+        Map<String, Long> metricNames = context.getMemMeta();
+        for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : clusterMetrics.get_metrics().entrySet()) {
             String metricName = entry.getKey();
-            Map<String, MetricWindow> streamMetricWindows = entry.getValue();
-            
-            if (streamMetricWindows == null) {
-                continue;
-            }
-            
-            Map<String, AtomicInteger> streamCounters = componentStreamParallel.get(metricName);
-            if (streamCounters == null) {
-                streamCounters = new HashMap<String, AtomicInteger>();
-                componentStreamParallel.put(metricName, streamCounters);
+            MetricType metricType = MetricUtils.metricType(metricName);
+            Long metricId = metricNames.get(metricName);
+            for (Map.Entry<Integer, MetricSnapshot> metric : entry.getValue().entrySet()) {
+                MetricSnapshot snapshot = metric.getValue();
+                snapshot.set_metricId(metricId);
+                if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) {
+                    snapshot.set_points(new ArrayList<Long>(0));
+                }
+//                entry.getValue().put(metric.getKey(), snapshot);
             }
-            
-            Map<String, MetricWindow> componentStreamMetricWindows = componentStreams.get(metricName);
-            if (componentStreamMetricWindows == null) {
-                componentStreamMetricWindows = new HashMap<String, MetricWindow>();
-                componentStreams.put(metricName, componentStreamMetricWindows);
+        }
+
+        //fill the unacquired metrics with zero
+        long ts = System.currentTimeMillis();
+        for (Map.Entry<String, Long> entry : metricNames.entrySet()) {
+            String name = entry.getKey();
+            if (!clusterMetrics.get_metrics().containsKey(name)) {
+                Map<Integer, MetricSnapshot> metric = new HashMap<>();
+                MetricType type = MetricUtils.metricType(name);
+                metric.put(AsmWindow.M1_WINDOW, new MetricSnapshot(entry.getValue(), ts, type.getT()));
+                clusterMetrics.put_to_metrics(name, metric);
             }
-            
-            for (Entry<String, MetricWindow> streamEntry : streamMetricWindows.entrySet()) {
-                String streamName = streamEntry.getKey();
-                MetricWindow taskMetricWindow = streamEntry.getValue();
-                
-                MetricWindow componentMetricWindow = componentStreamMetricWindows.get(streamName);
-                
-                componentMetricWindow = MetricThrift.mergeMetricWindow(taskMetricWindow, componentMetricWindow);
-                
-                componentStreamMetricWindows.put(streamName, componentMetricWindow);
-                
-                AtomicInteger counter = streamCounters.get(streamName);
-                if (counter == null) {
-                    counter = new AtomicInteger(0);
-                    streamCounters.put(streamName, counter);
+        }
+
+        //upload to cache
+        Update event = new Update();
+        event.timestamp = System.currentTimeMillis();
+        event.topologyMetrics = tpMetric;
+        event.topologyId = JStormMetrics.CLUSTER_METRIC_KEY;
+        pushEvent(event);
+
+        LOG.info("send update event for cluster metrics, size : {}", clusterMetrics.get_metrics_size());
+    }
+
+    //update cluster metrics local cache
+    private void updateClusterMetrics(String topologyId, TopologyMetric tpMetric) {
+        if (tpMetric.get_topologyMetric().get_metrics_size() > 0) {
+            TopologyMetricContext context = getClusterTopologyMetricContext();
+            MetricInfo topologyMetrics = tpMetric.get_topologyMetric();
+            // make a new MetricInfo to save the topologyId's metric
+            MetricInfo clusterMetrics = MetricUtils.mkMetricInfo();
+            Set<String> metricNames = new HashSet<>();
+            for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : topologyMetrics.get_metrics().entrySet()) {
+                String metricName = MetricUtils.topo2clusterName(entry.getKey());
+                MetricType metricType = MetricUtils.metricType(metricName);
+                Map<Integer, MetricSnapshot> winData = new HashMap<>();
+                for (Map.Entry<Integer, MetricSnapshot> entryData : entry.getValue().entrySet()) {
+                    MetricSnapshot snapshot = entryData.getValue().deepCopy();
+                    winData.put(entryData.getKey(), snapshot);
+                    if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) {
+                        // reset topology metric points
+                        entryData.getValue().set_points(new ArrayList<Long>(0));
+                    }
                 }
-                counter.incrementAndGet();
+                clusterMetrics.put_to_metrics(metricName, winData);
+                metricNames.add(metricName);
             }
+            // save to local cache, waiting for merging
+            context.addToMemCache(topologyId, clusterMetrics);
+            registerMetrics(JStormMetrics.CLUSTER_METRIC_KEY, metricNames);
         }
-        
-        return componentStreams;
     }
-    
-    public void avgStreams(Map<String, Map<String, MetricWindow>> tagStreamsMetrics, Map<String, Map<String, AtomicInteger>> counters, String tag) {
-        if (tagStreamsMetrics == null) {
-            return;
-        }
-        
-        Map<String, MetricWindow> streamMetrics = tagStreamsMetrics.get(tag);
-        if (streamMetrics == null) {
-            return;
+
+    /**
+     * put metric data to metric cache.
+     */
+    public void handleUpdateEvent(Update event) {
+        TopologyMetric topologyMetrics = event.topologyMetrics;
+        final String topologyId = event.topologyId;
+
+        if (this.topologyMetricContexts.containsKey(topologyId)) {
+            if (!JStormMetrics.CLUSTER_METRIC_KEY.equals(topologyId)) {
+                updateClusterMetrics(topologyId, topologyMetrics);
+            }
+
+            // overwrite
+            metricCache.putMetricData(topologyId, topologyMetrics);
+
+            // below process is kind of a transaction, first we lock an empty slot, mark it as PRE_SET
+            // by this time the slot is not yet ready for uploading as the upload thread looks for SET slots only
+            // after all metrics data has been saved, we mark it as SET, then it's ready for uploading.
+            int idx = getAndPresetFirstEmptyIndex();
+            if (idx >= 0) {
+                TopologyMetricDataInfo summary = new TopologyMetricDataInfo();
+                summary.topologyId = topologyId;
+                summary.timestamp = event.timestamp;
+                if (topologyId.equals(JStormMetrics.NIMBUS_METRIC_KEY) ||
+                        topologyId.equals(JStormMetrics.CLUSTER_METRIC_KEY)) {
+                    summary.type = MetricUploader.METRIC_TYPE_TOPLOGY;
+                } else {
+                    if (topologyMetrics.get_topologyMetric().get_metrics_size() > 0 ||
+                            topologyMetrics.get_componentMetric().get_metrics_size() > 0) {
+                        if (topologyMetrics.get_taskMetric().get_metrics_size() +
+                                topologyMetrics.get_workerMetric().get_metrics_size() +
+                                topologyMetrics.get_nettyMetric().get_metrics_size() +
+                                topologyMetrics.get_streamMetric().get_metrics_size() > 0) {
+                            summary.type = MetricUploader.METRIC_TYPE_ALL;
+                        } else {
+                            summary.type = MetricUploader.METRIC_TYPE_TOPLOGY;
+                        }
+                    } else {
+                        summary.type = MetricUploader.METRIC_TYPE_TASK;
+                    }
+                }
+
+                metricCache.put(PENDING_UPLOAD_METRIC_DATA_INFO + idx, summary);
+                metricCache.put(PENDING_UPLOAD_METRIC_DATA + idx, topologyMetrics);
+                markSet(idx);
+                LOG.info("put metric data to local cache, topology:{}, idx:{}", topologyId, idx);
+            } else {
+                LOG.error("exceeding maxPendingUploadMetrics, skip metrics data for topology:{}", topologyId);
+            }
+        } else {
+            LOG.warn("topology {} has been killed or has not started, skip update.", topologyId);
         }
-        
-        for (Entry<String, MetricWindow> entry : streamMetrics.entrySet()) {
-            String streamName = entry.getKey();
-            MetricWindow metric = entry.getValue();
-            
-            AtomicInteger counter = counters.get(tag).get(streamName);
-            if (counter == null) {
-                continue;
-                
+    }
+
+    /**
+     * get topology metrics, note that only topology & component & worker metrics are returned
+     */
+    public TopologyMetric getTopologyMetric(String topologyId) {
+        long start = System.nanoTime();
+        try {
+            TopologyMetric ret = new TopologyMetric();
+            List<MetricInfo> topologyMetrics = metricCache.getMetricData(topologyId, MetaType.TOPOLOGY);
+            List<MetricInfo> componentMetrics = metricCache.getMetricData(topologyId, MetaType.COMPONENT);
+            List<MetricInfo> workerMetrics = metricCache.getMetricData(topologyId, MetaType.WORKER);
+
+            MetricInfo dummy = MetricUtils.mkMetricInfo();
+            if (topologyMetrics.size() > 0) {
+                // get the last min topology metric
+                ret.set_topologyMetric(topologyMetrics.get(topologyMetrics.size() - 1));
+            } else {
+                ret.set_topologyMetric(dummy);
             }
-            
-            avgMetricWindow(metric, counter.get());
-        }
-    }
-    
-    public void mergeTasks(TopologyMetric topologyMetric, String topologyId) {
-        Map<Integer, MetricInfo> taskMetrics = topologyMetric.get_taskMetric();
-        
-        Map<Integer, String> taskToComponent = null;
-		try {
-			taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
-		} catch (Exception e) {
-			// TODO Auto-generated catch block
-			LOG.error("Failed to get taskToComponent");
-            return ;
-		}
-        if (taskToComponent == null) {
-            LOG.error("Failed to get taskToComponent");
-            return ;
-        }
-        
-        Map<String, MetricInfo> componentMetrics = topologyMetric.get_componentMetric();
-        if (componentMetrics == null) {
-            componentMetrics = new HashMap<String, MetricInfo>();
-            topologyMetric.set_componentMetric(componentMetrics);
-        }
-        
-        Map<String, AtomicInteger> componentTaskParallel = new HashMap<String, AtomicInteger>();
-        Map<String, Map<String, AtomicInteger>> componentStreamParallel = new HashMap<String, Map<String, AtomicInteger>>();
-        
-        for (Entry<Integer, MetricInfo> entry : taskMetrics.entrySet()) {
-            Integer taskId = entry.getKey();
-            MetricInfo taskMetric = entry.getValue();
-            
-            String component = taskToComponent.get(taskId);
-            if (component == null) {
-                LOG.error("Failed to get component of task " + taskId);
-                continue;
+            if (componentMetrics.size() > 0) {
+                ret.set_componentMetric(componentMetrics.get(0));
+            } else {
+                ret.set_componentMetric(dummy);
             }
-            
-            MetricInfo componentMetric = componentMetrics.get(component);
-            
-            componentMetric = mergeMetricInfo(taskMetric, componentMetric, MetricDef.MERGE_SUM_TAG);
-            componentMetric = mergeMetricInfo(taskMetric, componentMetric, MetricDef.MERGE_AVG_TAG);
-            
-            Map<String, Map<String, MetricWindow>> input = mergeTaskStreams(componentMetric.get_inputMetric(), taskMetric.get_inputMetric(), componentStreamParallel);
-            componentMetric.set_inputMetric(input);
-            
-            Map<String, Map<String, MetricWindow>> output = mergeTaskStreams(componentMetric.get_outputMetric(), taskMetric.get_outputMetric(), componentStreamParallel);
-            componentMetric.set_outputMetric(output);
-            
-            componentMetrics.put(component, componentMetric);
-            
-            AtomicInteger counter = componentTaskParallel.get(component);
-            if (counter == null) {
-                counter = new AtomicInteger(0);
-                componentTaskParallel.put(component, counter);
+            if (workerMetrics.size() > 0) {
+                ret.set_workerMetric(workerMetrics.get(0));
+            } else {
+                ret.set_workerMetric(dummy);
             }
-            
-            counter.incrementAndGet();
-        }
-        
-        for (Entry<String, MetricInfo> entry : componentMetrics.entrySet()) {
-            String componentName = entry.getKey();
-            MetricInfo metricInfo = entry.getValue();
-            
-            AtomicInteger counter = componentTaskParallel.get(componentName);
-            
-            for (String tag : MetricDef.MERGE_AVG_TAG) {
-                MetricWindow metricWindow = metricInfo.get_baseMetric().get(tag);
-                
-                avgMetricWindow(metricWindow, counter.get());
-                
-                avgStreams(metricInfo.get_inputMetric(), componentStreamParallel, tag);
-                avgStreams(metricInfo.get_outputMetric(), componentStreamParallel, tag);
+            ret.set_taskMetric(dummy);
+            ret.set_streamMetric(dummy);
+            ret.set_nettyMetric(dummy);
+
+            return ret;
+        } finally {
+            long end = System.nanoTime();
+            SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", (end - start) / TimeUtils.NS_PER_US);
+        }
+    }
+
+    public static String getWorkerSlotName(String hostname, Integer port) {
+        return hostname + ":" + port;
+    }
+
+    class RefreshTopologiesThread extends RunnableCallback {
+        @Override
+        public void run() {
+            if (!isShutdown.get()) {
+                pushEvent(new Refresh());
             }
         }
+
+        @Override
+        public Object getResult() {
+            return TimeUtils.SEC_PER_MIN;
+        }
+
+        @Override
+        public String getThreadName() {
+            return "RefreshThread";
+        }
     }
-    
-    public void mergeComponent(TopologyMetric topologyMetric) {
-        MetricInfo topologyMetricInfo = MetricThrift.mkMetricInfo();
-        topologyMetric.set_topologyMetric(topologyMetricInfo);
-        Map<String, MetricInfo> componentMetrics = topologyMetric.get_componentMetric();
-        if (componentMetrics == null) {
-            return;
+
+    class MetricsUploadThread extends Thread {
+        public MetricsUploadThread() {
+            setName("main-upload-thread");
         }
-        
-        for (MetricInfo componentMetric : componentMetrics.values()) {
-            topologyMetricInfo = mergeMetricInfo(componentMetric, topologyMetricInfo, MetricDef.MERGE_SUM_TAG);
-        }
-        
-        topologyMetric.set_topologyMetric(topologyMetricInfo);
-    }
-    
-    public void mergeTopology(TopologyMetric topologyMetric, WorkerUploadMetrics workerMetrics) {
-        String topologyId = workerMetrics.get_topology_id();
-        
-        Map<Integer, MetricInfo> taskMetrics = topologyMetric.get_taskMetric();
-        if (taskMetrics == null) {
-            taskMetrics = new HashMap<Integer, MetricInfo>();
-            topologyMetric.set_taskMetric(taskMetrics);
-        }
-        taskMetrics.putAll(workerMetrics.get_taskMetric());
-        
-        String hostname = getWorkerHostname(workerMetrics);
-        topologyMetric.put_to_workerMetric(getWorkerSlotName(hostname, workerMetrics.get_port()), workerMetrics.get_workerMetric());
-        
-    }
-    
-    public void mergeNetty(WorkerUploadMetrics workerMetric, String topologyId, Set<String> connections) {
-        
-    	if (topologyNettyMgr.getTopology(topologyId) == false) {
-            return ;
-        }
-        Map<String, MetricInfo> connectionMetrics = workerMetric.get_nettyMetric().get_connections();
-        for (Entry<String, MetricInfo> entry : connectionMetrics.entrySet()) {
-            String connectionName = entry.getKey();
-            MetricInfo metric = entry.getValue();
-            
-            MetricInfo cacheMetric = (MetricInfo)dbCache.get(getNettyConnectionKey(topologyId, connectionName));
-            cacheMetric = MetricThrift.mergeMetricInfo(metric, cacheMetric);
-            
-            connections.add(connectionName);
-            
-            dbCache.put(getNettyConnectionKey(topologyId, connectionName), cacheMetric);
-        }
-    }
-    
-    public void mergeNetty(String topologyId, Set<String> connections) {
-    	if (topologyNettyMgr.getTopology(topologyId) == false) {
-    		LOG.info("Skip merge netty detail metrics");
-            return ;
-        }
-        // @@@
-        // this function will cost much memory when worker number is more than 200
-        Map<String, MetricInfo> metricMap = new TreeMap<String, MetricInfo>();
-        
-        for (String connection : connections) {
-            MetricInfo cacheMetric = (MetricInfo)dbCache.get(getNettyConnectionKey(topologyId, connection));
-            if (cacheMetric == null) {
-                LOG.warn("Failed to get cacheMetric of {}:{}", topologyId, connection );
-                continue;
-            }
-            
-            metricMap.put(connection, cacheMetric);
-            dbCache.remove(getNettyConnectionKey(topologyId, connection));
-        }
-        
-        dbCache.put(getNettyTopologyKey(topologyId), metricMap);
-        // accelerate free memory
-        metricMap.clear();
-    }
-    
-    public void render() {
-        for (Entry<String, Set<String>> entry : topologyWorkers.entrySet()) {
-            String topologyId = entry.getKey();
-            Set<String> workers = entry.getValue();
-            Set<String> connections = new TreeSet<String>();
-            
-            TopologyMetric topologyMetric = new TopologyMetric();
-            
-            boolean isExistWorker = false;
-            for (String workerId : workers) {
-                WorkerUploadMetrics workerMetric = (WorkerUploadMetrics) dbCache.get(getWorkerKey(topologyId, workerId));
-                if (workerMetric == null) {
-                    LOG.warn("Failed to get WorkerUploadMetrics of " + getWorkerKey(topologyId, workerId));
-                    continue;
+
+        @Override
+        public void run() {
+            while (!isShutdown.get()) {
+                try {
+                    if (metricUploader != null && nimbusData.isLeader()) {
+                        final int idx = getFirstPendingUploadIndex();
+                        if (idx >= 0) {
+                            markUploading(idx);
+                            upload(clusterName, idx);
+                        }
+                    }
+                    JStormUtils.sleepMs(5);
+                } catch (Exception ex) {
+                    LOG.error("Error", ex);
                 }
-                isExistWorker = true;
-                mergeTopology(topologyMetric, workerMetric);
-                
-                mergeNetty(workerMetric, topologyId, connections);
             }
-            if (isExistWorker == false) {
-            	LOG.info("No worker metrics of {}", topologyId);
-            	continue;
+        }
+
+        public boolean upload(final String clusterName, final int idx) {
+            final TopologyMetricDataInfo summary = getMetricDataInfoFromCache(idx);
+            if (summary == null) {
+                LOG.warn("metric summary is null from cache idx:{}", idx);
+                markUploaded(idx);
+                return true;
             }
-            
-            mergeTasks(topologyMetric, topologyId);
-            
-            mergeComponent(topologyMetric);
-            
-            
-            dbCache.put(getTopologyKey(topologyId), topologyMetric);
-            
-            mergeNetty(topologyId, connections);
-            
-            LOG.info("Successfully render topologyId of " + topologyId);
-            
-            uploadToAlimonitor(topologyMetric, topologyId);
-            
-            cleanDeadSupervisorWorker(topologyMetric);
-            
-            
-            try {
-                
-                //LOG.info(topologyId + " metrics is :\n" + Utils.toPrettyJsonString(topologyMetric));
-                LOG.info(topologyId + " finish metric");
-                stormClusterState.set_topology_metric(topologyId, topologyMetric);
-                LOG.info("Successfully uploaded toplogy metrics: " + topologyId);
-            } catch (Exception e) {
-                // TODO Auto-generated catch block
-                LOG.info("Failed to upload toplogy metrics: " + topologyId, e);
-                continue;
+
+            final String topologyId = summary.topologyId;
+            if (!isTopologyAlive(topologyId)) {
+                LOG.warn("topology {} is not alive, skip sending metrics.", topologyId);
+                markUploaded(idx);
+                return true;
             }
-            
+
+            return metricUploader.upload(clusterName, topologyId, idx, summary.toMap());
         }
     }
-    
-    public void handleUpdateEvent(Update event) {
-        long start = System.currentTimeMillis();
-        
-        WorkerUploadMetrics workerMetrics = event.workerMetrics;
-        
-        String topologyId = workerMetrics.get_topology_id();
-        if (removing.containsKey(topologyId) == true) {
-            LOG.info("Topology " + topologyId + " has been removed, skip update");
-            return;
+
+    class FlushMetricMetaThread extends Thread {
+
+        public FlushMetricMetaThread() {
+            setName("FlushMetricMetaThread");
         }
-        
-        Set<String> workers = topologyWorkers.get(topologyId);
-        if (workers == null) {
-            workers = new HashSet<String>();
-            topologyWorkers.put(topologyId, workers);
-        }
-        
-        String workerSlot = getWorkerSlotName(workerMetrics.get_supervisor_id(), workerMetrics.get_port());
-        
-        workers.add(workerSlot);
-        dbCache.put(getWorkerKey(topologyId, workerSlot), workerMetrics);
-        
-        long end = System.currentTimeMillis();
-        
-        updateHistogram.update((end - start));
-    }
-    
-    public void uploadToAlimonitor(TopologyMetric topologyMetric, String topologyId) {
-        // @@@ TODO
-    }
-    
-    
-    public TopologyMetric getTopologyMetric(String topologyId) {
-        long start = System.nanoTime();
-        try {
-            TopologyMetric ret = (TopologyMetric) dbCache.get(getTopologyKey(topologyId));
-            if (ret == null) {
-                return emptyTopologyMetric;
-            } else {
-                return ret;
+
+        @Override
+        public void run() {
+            while (!isShutdown.get()) {
+                long start = System.currentTimeMillis();
+                try {
+                    // if metricUploader is not fully initialized, return directly
+                    if (nimbusData.isLeader() && metricUploader != null) {
+                        for (Map.Entry<String, TopologyMetricContext> entry : topologyMetricContexts.entrySet()) {
+                            String topologyId = entry.getKey();
+                            TopologyMetricContext metricContext = entry.getValue();
+
+                            Map<String, Long> cachedMeta = metricCache.getMeta(topologyId);
+                            if (cachedMeta == null) {
+                                cachedMeta = new HashMap<>();
+                            }
+                            Map<String, Long> memMeta = metricContext.getMemMeta();
+                            if (memMeta.size() > cachedMeta.size()) {
+                                cachedMeta.putAll(memMeta);
+                            }
+                            metricCache.putMeta(topologyId, cachedMeta);
+
+                            int curSize = cachedMeta.size();
+                            if (curSize != metricContext.getFlushedMetaNum()) {
+                                metricContext.setFlushedMetaNum(curSize);
+
+                                metricUploader.registerMetrics(clusterName, topologyId, cachedMeta);
+                                LOG.info("flush metric meta, topology:{}, total:{}, cost:{}.",
+                                        topologyId, curSize, System.currentTimeMillis() - start);
+                            }
+                            stormClusterState.set_topology_metric(topologyId, curSize);
+                        }
+                    }
+
+                    JStormUtils.sleepMs(15000);
+                } catch (Exception ex) {
+                    LOG.error("Error", ex);
+                }
             }
-        }finally {
-            long end = System.nanoTime();
-            
-            SimpleJStormMetric.updateHistorgram("getTopologyMetric", (end - start)/1000000.0d);
         }
     }
-    
-    public SortedMap<String, MetricInfo> getNettyMetric(String topologyId) {
-        TreeMap<String, MetricInfo> ret = (TreeMap<String, MetricInfo>)dbCache.get(getNettyTopologyKey(topologyId));
-        if (ret == null) {
-            return emptyNettyMetric;
-        }else {
+
+    public static class TopologyMetricDataInfo implements Serializable {
+        private static final long serialVersionUID = 1303262512351757610L;
+
+        public String topologyId;
+        public String type; // "tp" for tp/comp metrics OR "task" for task/stream/worker/netty metrics
+        public long timestamp;   // metrics report time
+
+        public Map<String, Object> toMap() {
+            Map<String, Object> ret = new HashMap<String, Object>();
+            ret.put(MetricUploader.METRIC_TIME, timestamp);
+            ret.put(MetricUploader.METRIC_TYPE, type);
+
             return ret;
         }
+
+        @Override
+        public String toString() {
+            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+        }
     }
-    
-    public static String getWorkerSlotName(String hostname, Integer port) {
-        return hostname + ":" + port;
+
+    // ==============================================
+    // =================== events ===================
+    // ==============================================
+    public static class Event {
+        protected Event() {
+        }
+
+        public String clusterName;
+        public String topologyId;
+        public long timestamp;
+
+        @Override
+        public String toString() {
+            return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+        }
+    }
+
+    public static class Update extends Event {
+        public TopologyMetric topologyMetrics;
     }
-    
-    public static String getWorkerKey(String topologyId, String workerSlot) {
-        return CACHE_NAMESPACE_METRIC + "@" + topologyId + "@" + workerSlot;
+
+    public static class Remove extends Event {
     }
-    
-    public static String getTopologyKey(String topologyId) {
-        return CACHE_NAMESPACE_METRIC + "@" + topologyId;
+
+    public static class Refresh extends Event {
     }
-    
-    public static String getNettyConnectionKey(String topologyId, String connection) {
-        return CACHE_NAMESPACE_NETTY + "@" + topologyId + "@" + connection;
+
+
+    public static class KillTopologyEvent extends Event {
     }
-    
-    public static String getNettyTopologyKey(String topologyId) {
-        return CACHE_NAMESPACE_NETTY + "@" + topologyId;
+
+    public static class StartTopologyEvent extends Event {
+        public double sampleRate;
+    }
+
+    public static class TaskDeadEvent extends Event {
+        public Map<Integer, ResourceWorkerSlot> deadTasks;
+    }
+
+    public static class TaskStartEvent extends Event {
+        public Assignment oldAssignment;
+        public Assignment newAssignment;
+        public Map<Integer, String> task2Component;
     }
-    
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java
index 7eaccab..6e55049 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java
@@ -1,105 +1,78 @@
 package com.alibaba.jstorm.daemon.nimbus;
 
-import java.util.Map;
-
+import backtype.storm.Config;
+import backtype.storm.generated.InvalidTopologyException;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.metric.MetricUtils;
 import org.jboss.netty.util.internal.ConcurrentHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-import backtype.storm.Config;
-import backtype.storm.generated.InvalidTopologyException;
+import java.util.Map;
 
 public class TopologyNettyMgr {
-	private static Logger LOG = LoggerFactory.getLogger(TopologyNettyMgr.class);
-	private boolean defaultEnable = true;
-	private Map nimbusConf;
-	private ConcurrentHashMap<String, Boolean> setting = new ConcurrentHashMap<String, Boolean>();
-	private static final int WORKER_DISABLE_THREADHOLD = 200;
-	
-	public TopologyNettyMgr(Map conf) {
-		nimbusConf = conf;
-		
-		Boolean isEnable = ConfigExtension.isEnableTopologyNettyMetrics(conf);
-		if (isEnable != null) {
-			defaultEnable = isEnable;
-		}
-		
-		LOG.info("Default netty metrics setting is " + defaultEnable);
-	}
-	
-	protected boolean getTopology(Map conf) {
-		Boolean isEnable = ConfigExtension.isEnableTopologyNettyMetrics(conf);
-		if (isEnable != null) {
-			return isEnable;
-		}
-		
-		int workerNum = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_WORKERS), 1);
-		if (workerNum <=  WORKER_DISABLE_THREADHOLD) {
-			isEnable = Boolean.TRUE;
-		}else {
-			isEnable = Boolean.FALSE;
-		}
-		
-		return isEnable;
-	}
-	
-	public boolean getTopology(String topologyId) {
-		try {
-			String topologyName = Common.topologyIdToName(topologyId);
-			
-			Boolean isEnable = setting.get(topologyName);
-			if (isEnable != null) {
-				return isEnable;
-			}
-			
-			Map topologyConf =
-	                StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
-			
-		    isEnable = getTopology(topologyConf);
-			setting.put(topologyName, isEnable);
-			LOG.info("{} netty metrics setting is {}", topologyName, isEnable);
-			return isEnable;
-		
-		}catch(Exception e) {
-			LOG.info("Failed to get {} netty metrics setting ", topologyId);
-			return defaultEnable;
-		}
-		
-		
-	}
-	
-	public void setTopology(Map conf) {
-		String topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
-		if (topologyName == null) {
-			LOG.info("No topologyName setting");
-			return ;
-		}
-		
-		boolean isEnable = getTopology(conf);
-		
-		setting.put(topologyName, isEnable);
-		
-		LOG.info("{} netty metrics setting is {}", topologyName, isEnable);
-		return ;
-		
-	}
-	
-	public void rmTopology(String topologyId) {
-		String topologyName;
-		try {
-			topologyName = Common.topologyIdToName(topologyId);
-			setting.remove(topologyName);
-			LOG.info("Remove {} netty metrics setting ", topologyName);
-		} catch (InvalidTopologyException e) {
-			// TODO Auto-generated catch block
-			
-		}
-		
-	}
+    private static Logger LOG = LoggerFactory.getLogger(TopologyNettyMgr.class);
+    private Map nimbusConf;
+    private ConcurrentHashMap<String, Boolean> setting = new ConcurrentHashMap<String, Boolean>();
+
+    public TopologyNettyMgr(Map conf) {
+        nimbusConf = conf;
+
+    }
+
+    protected boolean getTopology(Map conf) {
+        return MetricUtils.isEnableNettyMetrics(conf);
+    }
+
+    public boolean getTopology(String topologyId) {
+        try {
+            String topologyName = Common.topologyIdToName(topologyId);
+
+            Boolean isEnable = setting.get(topologyName);
+            if (isEnable != null) {
+                return isEnable;
+            }
+
+            Map topologyConf = StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
+
+            isEnable = getTopology(topologyConf);
+            setting.put(topologyName, isEnable);
+            LOG.info("{} netty metrics setting is {}", topologyName, isEnable);
+            return isEnable;
+
+        } catch (Exception e) {
+            LOG.info("Failed to get {} netty metrics setting ", topologyId);
+            return true;
+        }
+
+    }
+
+    public void setTopology(Map conf) {
+        String topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
+        if (topologyName == null) {
+            LOG.info("No topologyName setting");
+            return;
+        }
+
+        boolean isEnable = getTopology(conf);
+
+        setting.put(topologyName, isEnable);
+
+        LOG.info("{} netty metrics setting is {}", topologyName, isEnable);
+        return;
+
+    }
+
+    public void rmTopology(String topologyId) {
+        String topologyName;
+        try {
+            topologyName = Common.topologyIdToName(topologyId);
+            setting.remove(topologyName);
+            LOG.info("Remove {} netty metrics setting ", topologyName);
+        } catch (InvalidTopologyException ignored) {
+        }
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java
new file mode 100644
index 0000000..78bb1d2
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java
@@ -0,0 +1,226 @@
+package com.alibaba.jstorm.daemon.nimbus.metric.uploader;
+
+import backtype.storm.generated.TopologyMetric;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlimonitorClient extends DefaultMetricUploader {
+
+    public static Logger LOG = LoggerFactory.getLogger(AlimonitorClient.class);
+
+    // Send to localhost:15776 by default
+    public static final String DEFAUT_ADDR = "127.0.0.1";
+    public static final String DEFAULT_PORT = "15776";
+    public static final int DEFAUTL_FLAG = 0;
+    public static final String DEFAULT_ERROR_INFO = "";
+
+    private final String COLLECTION_FLAG = "collection_flag";
+    private final String ERROR_INFO = "error_info";
+    private final String MSG = "MSG";
+
+    private String port;
+    private String requestIP;
+    private String monitorName;
+    private int collectionFlag;
+    private String errorInfo;
+
+    private boolean post;
+
+    public AlimonitorClient() {
+    }
+
+    public AlimonitorClient(String requestIP, String port, boolean post) {
+        this.requestIP = requestIP;
+        this.port = port;
+        this.post = post;
+        this.monitorName = null;
+        this.collectionFlag = 0;
+        this.errorInfo = null;
+    }
+
+    public void setIpAddr(String ipAddr) {
+        this.requestIP = ipAddr;
+    }
+
+    public void setPort(String port) {
+        this.port = port;
+    }
+
+    public void setMonitorName(String monitorName) {
+        this.monitorName = monitorName;
+    }
+
+    public void setCollectionFlag(int flag) {
+        this.collectionFlag = flag;
+    }
+
+    public void setErrorInfo(String msg) {
+        this.errorInfo = msg;
+    }
+
+    public void setPostFlag(boolean post) {
+        this.post = post;
+    }
+
+    public String buildURL() {
+        return "http://" + requestIP + ":" + port + "/passive";
+    }
+
+    public String buildRqstAddr() {
+        return "http://" + requestIP + ":" + port + "/passive?name=" + monitorName + "&msg=";
+    }
+
+    
+    public Map buildAliMonitorMsg(int collection_flag, String error_message) {
+        // Json format of the message sent to Alimonitor
+        // {
+        // "collection_flag":int,
+        // "error_info":string,
+        // "MSG": ojbect | array
+        // }
+        Map ret = new HashMap();
+        ret.put(COLLECTION_FLAG, collection_flag);
+        ret.put(ERROR_INFO, error_message);
+        ret.put(MSG, null);
+
+        return ret;
+    }
+
+    private void addMsgData(Map jsonObj, Map<String, Object> map) {
+        jsonObj.put(MSG, map);
+    }
+
+    private boolean sendRequest(int collection_flag, String error_message, Map<String, Object> msg) throws Exception {
+        boolean ret = false;
+
+        if (msg.size() == 0)
+            return ret;
+
+        Map jsonObj = buildAliMonitorMsg(collection_flag, error_message);
+        addMsgData(jsonObj, msg);
+        String jsonMsg = jsonObj.toString();
+        LOG.info(jsonMsg);
+
+        if (post == true) {
+            String url = buildURL();
+            ret = httpPost(url, jsonMsg);
+        } else {
+            String request = buildRqstAddr();
+            StringBuilder postAddr = new StringBuilder();
+            postAddr.append(request);
+            postAddr.append(URLEncoder.encode(jsonMsg));
+
+            ret = httpGet(postAddr);
+        }
+
+        return ret;
+    }
+
+    private boolean httpGet(StringBuilder postAddr) {
+        boolean ret = false;
+
+        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+        CloseableHttpResponse response = null;
+
+        try {
+            HttpGet request = new HttpGet(postAddr.toString());
+            response = httpClient.execute(request);
+            HttpEntity entity = response.getEntity();
+            if (entity != null) {
+                LOG.info(EntityUtils.toString(entity));
+            }
+            EntityUtils.consume(entity);
+            ret = true;
+        } catch (Exception e) {
+            LOG.error("Exception when sending http request to alimonitor", e);
+        } finally {
+            try {
+                if (response != null)
+                    response.close();
+                httpClient.close();
+            } catch (Exception e) {
+                LOG.error("Exception when closing httpclient", e);
+            }
+        }
+
+        return ret;
+    }
+
+    private boolean httpPost(String url, String msg) {
+        boolean ret = false;
+
+        CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+        CloseableHttpResponse response = null;
+
+        try {
+            HttpPost request = new HttpPost(url);
+            List<NameValuePair> nvps = new ArrayList<NameValuePair>();
+            nvps.add(new BasicNameValuePair("name", monitorName));
+            nvps.add(new BasicNameValuePair("msg", msg));
+            request.setEntity(new UrlEncodedFormEntity(nvps));
+            response = httpClient.execute(request);
+            HttpEntity entity = response.getEntity();
+            if (entity != null) {
+                LOG.info(EntityUtils.toString(entity));
+            }
+            EntityUtils.consume(entity);
+            ret = true;
+        } catch (Exception e) {
+            LOG.error("Exception when sending http request to alimonitor", e);
+        } finally {
+            try {
+                if (response != null)
+                    response.close();
+                httpClient.close();
+            } catch (Exception e) {
+                LOG.error("Exception when closing httpclient", e);
+            }
+        }
+
+        return ret;
+    }
+
+
+    protected Map<String, Object> convertMap(String clusterName, String topologyId, TopologyMetric tpMetric) {
+    	/**
+    	 * @@@ Todo
+    	 */
+    	return null;
+    }
+
+	@Override
+	public boolean upload(String clusterName, String topologyId, TopologyMetric tpMetric, Map<String, Object> metricContext) {
+		// TODO Auto-generated method stub
+		Map<String, Object> metricMap = convertMap(clusterName, topologyId, tpMetric);
+		if (metricMap == null || metricMap.isEmpty() == true) {
+			return false;
+		}
+		
+		try {
+			sendRequest(collectionFlag, errorInfo, metricMap);
+			return true;
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			LOG.error("Failed upload metric to Alimonitor", e);
+			return false;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java
new file mode 100644
index 0000000..58e4e7d
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java
@@ -0,0 +1,71 @@
+package com.alibaba.jstorm.daemon.nimbus.metric.uploader;
+
+import backtype.storm.generated.TopologyMetric;
+import com.alibaba.jstorm.daemon.nimbus.NimbusData;
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class DefaultMetricUploader implements MetricUploader {
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+    protected NimbusData nimbusData;
+    protected TopologyMetricsRunnable metricsRunnable;
+
+    public DefaultMetricUploader() {
+    }
+
+    @Override
+    public void init(NimbusData nimbusData) throws Exception {
+        this.nimbusData = nimbusData;
+        this.metricsRunnable = nimbusData.getMetricRunnable();
+    }
+
+    @Override
+    public void cleanup() {
+    }
+
+    @Override
+    public boolean registerMetrics(String clusterName, String topologyId,
+                                   Map<String, Long> metrics) {
+        if (metrics.size() > 0) {
+            logger.info("register metrics, topology:{}, total:{}", topologyId, metrics.size());
+        }
+        return true;
+    }
+
+    @Override
+    public boolean upload(String clusterName, String topologyId, TopologyMetric tpMetric, Map<String, Object> metricContext) {
+        if (tpMetric == null) {
+            logger.info("No metric of {}", topologyId);
+            return true;
+        }
+
+        int totalSize = tpMetric.get_topologyMetric().get_metrics_size() +
+                tpMetric.get_componentMetric().get_metrics_size() +
+                tpMetric.get_taskMetric().get_metrics_size() +
+                tpMetric.get_streamMetric().get_metrics_size() +
+                tpMetric.get_workerMetric().get_metrics_size() +
+                tpMetric.get_nettyMetric().get_metrics_size();
+
+        logger.info("send metrics, cluster:{}, topology:{}, metric size:{}, metricContext:{}",
+                clusterName, topologyId, totalSize, metricContext);
+
+        return true;
+    }
+
+    @Override
+    public boolean upload(String clusterName, String topologyId, Object key, Map<String, Object> metricContext) {
+        metricsRunnable.markUploaded((Integer) key);
+        return true;
+    }
+
+
+    @Override
+    public boolean sendEvent(String clusterName, Event event) {
+        logger.info("Successfully sendEvent {} of {}", event, clusterName);
+        return true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java
new file mode 100644
index 0000000..9b7c745
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java
@@ -0,0 +1,46 @@
+package com.alibaba.jstorm.daemon.nimbus.metric.uploader;
+
+import backtype.storm.generated.TopologyMetric;
+import com.alibaba.jstorm.daemon.nimbus.NimbusData;
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
+
+import java.util.Map;
+
+public interface MetricUploader {
+    /**
+     * Set NimbusData to MetricUploader
+     */
+    void init(NimbusData nimbusData) throws Exception;
+
+    void cleanup();
+
+    /**
+     * register metrics to external metric plugin
+     */
+    boolean registerMetrics(String clusterName, String topologyId,
+                            Map<String, Long> metrics) throws Exception;
+
+    String METRIC_TYPE = "metric.type";
+    String METRIC_TYPE_TOPLOGY = "TP";
+    String METRIC_TYPE_TASK = "TASK";
+    String METRIC_TYPE_ALL = "ALL";
+    String METRIC_TIME = "metric.timestamp";
+
+    /**
+     * upload topologyMetric to external metric plugin (such as database plugin)
+     *
+     * @return true means success, false means failure
+     */
+    boolean upload(String clusterName, String topologyId, TopologyMetric tpMetric, Map<String, Object> metricContext);
+
+    /**
+     * upload metrics with given key and metric context. the implementation can retrieve metric data from rocks db
+     * in the handler thread, which is kind of lazy-init, making it more GC-friendly
+     */
+    boolean upload(String clusterName, String topologyId, Object key, Map<String, Object> metricContext);
+
+    /**
+     * Send an event to underlying handler
+     */
+    boolean sendEvent(String clusterName, TopologyMetricsRunnable.Event event);
+}