You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:04:52 UTC
[14/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java
index b8eeb20..7af67c2 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyMetricsRunnable.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p/>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,711 +17,895 @@
*/
package com.alibaba.jstorm.daemon.nimbus;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.cache.JStormCache;
+import backtype.storm.generated.MetricInfo;
+import backtype.storm.generated.MetricSnapshot;
+import backtype.storm.generated.TopologyMetric;
+import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.cluster.Cluster;
-import com.alibaba.jstorm.cluster.Common;
import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.common.metric.Histogram;
-import com.alibaba.jstorm.metric.AlimonitorClient;
-import com.alibaba.jstorm.metric.MetricDef;
-import com.alibaba.jstorm.metric.MetricSendClient;
-import com.alibaba.jstorm.metric.MetricThrift;
-import com.alibaba.jstorm.metric.SimpleJStormMetric;
+import com.alibaba.jstorm.common.metric.AsmGauge;
+import com.alibaba.jstorm.common.metric.MetricMeta;
+import com.alibaba.jstorm.daemon.nimbus.metric.uploader.DefaultMetricUploader;
+import com.alibaba.jstorm.daemon.nimbus.metric.uploader.MetricUploader;
+import com.alibaba.jstorm.metric.*;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.default_assign.ResourceWorkerSlot;
-import com.alibaba.jstorm.utils.TimeCacheMap;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeUtils;
import com.codahale.metrics.Gauge;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import backtype.storm.generated.MetricInfo;
-import backtype.storm.generated.MetricWindow;
-import backtype.storm.generated.TopologyMetric;
-import backtype.storm.generated.WorkerUploadMetrics;
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicIntegerArray;
-public class TopologyMetricsRunnable extends RunnableCallback {
+/**
+ * Topology metrics thread which resides in nimbus.
+ * This class is responsible for generating metrics IDs and uploading metrics to the underlying storage system.
+ *
+ * @author Cody (weiyue.wy@alibaba-inc.com)
+ * @since 2.0.5
+ */
+public class TopologyMetricsRunnable extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(TopologyMetricsRunnable.class);
- private static final String DEAD_SUPERVISOR_HEAD = "DeadSupervisor-";
-
- public static interface Event {
-
- }
-
- public static class Update implements Event {
- public WorkerUploadMetrics workerMetrics;
- }
-
- public static class Remove implements Event {
- public String topologyId;
- }
-
- public static class Upload implements Event {
- public long timeStamp;
- }
-
- public static final String CACHE_NAMESPACE_METRIC = "cache_namespace_metric";
- public static final String CACHE_NAMESPACE_NETTY = "cache_namespace_netty";
- protected NimbusCache nimbusCache;
- protected JStormCache dbCache;
-
+
+ protected JStormMetricCache metricCache;
+
/**
- * cache all worker metrics will waste a little memory
- *
+ * map<topologyId, map<worker, metricInfo>>, local memory cache, keeps only one snapshot of metrics.
*/
- protected Map<String, Set<String>> topologyWorkers;
- protected TimeCacheMap<String, Long> removing;
-
- protected BlockingDeque<TopologyMetricsRunnable.Event> queue;
+ protected final ConcurrentMap<String, TopologyMetricContext> topologyMetricContexts =
+ new ConcurrentHashMap<>();
+
+ protected final BlockingDeque<TopologyMetricsRunnable.Event> queue = new LinkedBlockingDeque<>();
+
+ private static final String PENDING_UPLOAD_METRIC_DATA = "__pending.upload.metrics__";
+ private static final String PENDING_UPLOAD_METRIC_DATA_INFO = "__pending.upload.metrics.info__";
+
+ // the slot is empty
+ private static final int UNSET = 0;
+ // the slot is ready for uploading
+ private static final int SET = 1;
+ // the slot is being uploaded
+ private static final int UPLOADING = 2;
+ // the slot will be set ready for uploading
+ private static final int PRE_SET = 3;
+
+ protected final AtomicIntegerArray metricStat;
+
protected StormClusterState stormClusterState;
-
- protected MetricSendClient metricSendClient;
- protected TopologyMetric emptyTopologyMetric = mkTopologyMetric();
- protected TreeMap<String, MetricInfo> emptyNettyMetric = new TreeMap<String, MetricInfo>();
+
+ protected MetricUploader metricUploader;
+
protected AtomicBoolean isShutdown;
- protected boolean localMode;
- protected TopologyNettyMgr topologyNettyMgr;
-
- protected Histogram updateHistogram;
- protected AtomicBoolean isUploading = new AtomicBoolean(false);
- protected Histogram uploadHistogram;
-
- public TopologyMetricsRunnable(NimbusData nimbusData) {
-
- this.nimbusCache = nimbusData.getNimbusCache();
- this.dbCache = nimbusCache.getDbCache();
- this.topologyWorkers = new ConcurrentHashMap<String, Set<String>>();
- this.removing = new TimeCacheMap<String, Long>(600);
- this.queue = new LinkedBlockingDeque<TopologyMetricsRunnable.Event>();
+ protected String clusterName;
+ protected int maxPendingUploadMetrics;
+
+ private final boolean localMode;
+ private final NimbusData nimbusData;
+ private MetricQueryClient metricQueryClient;
+
+ private ScheduledExecutorService clusterMetricsUpdateExecutor;
+
+ /**
+ * refreshes alive topologies every min or on startup.
+ */
+ protected AsyncLoopThread refreshTopologiesThread;
+
+ /**
+ * the thread for metric sending, checks every second.
+ */
+ private final Thread uploadThread = new MetricsUploadThread();
+
+ /**
+ * async flush metric meta
+ */
+ private final Thread flushMetricMetaThread = new FlushMetricMetaThread();
+
+ /**
+ * use default UUID generator
+ */
+ private final MetricIDGenerator metricIDGenerator = new DefaultMetricIDGenerator();
+
+ public TopologyMetricsRunnable(final NimbusData nimbusData) {
+ setName(getClass().getSimpleName());
+
+ this.nimbusData = nimbusData;
+
+ this.localMode = nimbusData.isLocalMode();
+ if (localMode) {
+ this.metricStat = new AtomicIntegerArray(1);
+ return;
+ }
+
+ LOG.info("create topology metrics runnable.");
+ this.metricCache = nimbusData.getMetricCache();
this.stormClusterState = nimbusData.getStormClusterState();
this.isShutdown = nimbusData.getIsShutdown();
- this.topologyNettyMgr = nimbusData.getTopologyNettyMgr();
-
- if (ConfigExtension.isAlimonitorMetricsPost(nimbusData.getConf())) {
- metricSendClient = new AlimonitorClient(AlimonitorClient.DEFAUT_ADDR, AlimonitorClient.DEFAULT_PORT, true);
- } else {
- metricSendClient = new MetricSendClient();
- }
- localMode = StormConfig.local_mode(nimbusData.getConf());
-
- updateHistogram = SimpleJStormMetric.registerHistorgram("TopologyMetricsRunnable_Update");
- uploadHistogram = SimpleJStormMetric.registerHistorgram("TopologyMetricsRunnable_Upload");
-
- SimpleJStormMetric.registerWorkerGauge(new Gauge<Double>() {
-
+
+ clusterName = ConfigExtension.getClusterName(nimbusData.getConf());
+ if (clusterName == null) {
+ throw new RuntimeException("cluster.name property must be set in storm.yaml!");
+ }
+
+ this.maxPendingUploadMetrics = ConfigExtension.getMaxPendingMetricNum(nimbusData.getConf());
+ this.metricStat = new AtomicIntegerArray(this.maxPendingUploadMetrics);
+
+ int cnt = 0;
+ for (int i = 0; i < maxPendingUploadMetrics; i++) {
+ TopologyMetricDataInfo obj = getMetricDataInfoFromCache(i);
+ if (obj != null) {
+ this.metricStat.set(i, SET);
+ cnt++;
+ }
+ }
+ LOG.info("pending upload metrics: {}", cnt);
+
+ // init alive topologies from zk
+ this.refreshTopologies();
+ this.refreshTopologiesThread = new AsyncLoopThread(new RefreshTopologiesThread());
+
+ this.clusterMetricsUpdateExecutor = Executors.newSingleThreadScheduledExecutor();
+ this.clusterMetricsUpdateExecutor.scheduleAtFixedRate(new Runnable() {
@Override
- public Double getValue() {
- // TODO Auto-generated method stub
- return (double) queue.size();
+ public void run() {
+ int secOffset = TimeUtils.secOffset();
+ int offset = 55;
+ if (secOffset < offset) {
+ JStormUtils.sleepMs((offset - secOffset) * 1000);
+ } else if (secOffset == offset) {
+ // do nothing
+ } else {
+ JStormUtils.sleepMs((60 - secOffset + offset) * 1000);
+ }
+
+ LOG.info("cluster metrics force upload.");
+ mergeAndUploadClusterMetrics();
}
- }, "TopologyMetricsRunnable_Queue");
+ }, 5, 60, TimeUnit.SECONDS);
+
+ // track nimbus JVM heap
+ JStormMetrics.registerWorkerGauge(JStormMetrics.NIMBUS_METRIC_KEY, MetricDef.MEMORY_USED,
+ new AsmGauge(new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage();
+ return (double) memoryUsage.getUsed();
+ }
+ }));
}
-
- public void pushEvent(TopologyMetricsRunnable.Event cmd) {
- queue.offer(cmd);
+
+ /**
+ * init metric uploader
+ */
+ public void init() {
+ String metricUploadClass = ConfigExtension.getMetricUploaderClass(nimbusData.getConf());
+ if (StringUtils.isBlank(metricUploadClass)) {
+ metricUploadClass = DefaultMetricUploader.class.getName();
+ }
+ // init metric uploader
+ LOG.info("metric uploader class:{}", metricUploadClass);
+ Object instance = Utils.newInstance(metricUploadClass);
+ if (!(instance instanceof MetricUploader)) {
+ throw new RuntimeException(metricUploadClass + " isn't MetricUploader class ");
+ }
+ this.metricUploader = (MetricUploader) instance;
+ try {
+ metricUploader.init(nimbusData);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ LOG.info("Successfully init {}", metricUploadClass);
+
+ // init metric query client
+ String metricQueryClientClass = ConfigExtension.getMetricQueryClientClass(nimbusData.getConf());
+ if (!StringUtils.isBlank(metricQueryClientClass)) {
+ LOG.info("metric query client class:{}", metricQueryClientClass);
+ this.metricQueryClient = (MetricQueryClient) Utils.newInstance(metricQueryClientClass);
+ } else {
+ LOG.warn("use default metric query client class.");
+ this.metricQueryClient = new DefaultMetricQueryClient();
+ }
+ try {
+ metricQueryClient.init(nimbusData.getConf());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ this.uploadThread.start();
+ this.flushMetricMetaThread.start();
+
+ LOG.info("init topology metric runnable done.");
+ }
+
+ public void shutdown() {
+ LOG.info("Begin to shutdown");
+ metricUploader.cleanup();
+
+ LOG.info("Successfully shutdown");
}
-
- public TopologyMetric mkTopologyMetric() {
- TopologyMetric emptyTopologyMetric = new TopologyMetric();
-
- MetricInfo topologyMetricInfo = MetricThrift.mkMetricInfo();
- emptyTopologyMetric.set_topologyMetric(topologyMetricInfo);
-
- emptyTopologyMetric.set_componentMetric(new HashMap<String, MetricInfo>());
- emptyTopologyMetric.set_workerMetric(new HashMap<String, MetricInfo>());
- emptyTopologyMetric.set_taskMetric(new HashMap<Integer, MetricInfo>());
- return emptyTopologyMetric;
- }
-
+
@Override
public void run() {
- try {
- TopologyMetricsRunnable.Event event = queue.take();
-
- if (event instanceof Remove) {
-
- handleRemoveEvent((Remove) event);
- return;
- } else if (event instanceof Update) {
- handleUpdateEvent((Update) event);
- return;
- } else if (event instanceof Upload) {
- handleUploadEvent((Upload) event);
- return;
- } else {
- LOG.error("Unknow event type");
+ while (!isShutdown.get()) {
+ if (localMode) {
return;
}
-
- } catch (Exception e) {
- if (isShutdown.get() == false) {
- LOG.error(e.getMessage(), e);
+
+ try {
+ // wait for metricUploader to be ready, for some external plugin like database, it'll take a few seconds
+ if (this.metricUploader != null) {
+ Event event = queue.poll();
+ if (event == null) {
+ continue;
+ }
+
+ if (event instanceof Remove) {
+ handleRemoveEvent((Remove) event);
+ } else if (event instanceof Update) {
+ handleUpdateEvent((Update) event);
+ } else if (event instanceof Refresh) {
+ handleRefreshEvent((Refresh) event);
+ } else if (event instanceof KillTopologyEvent) {
+ handleKillTopologyEvent((KillTopologyEvent) event);
+ } else if (event instanceof StartTopologyEvent) {
+ handleStartTopologyEvent((StartTopologyEvent) event);
+ } else if (event instanceof TaskDeadEvent) {
+ handleTaskDeadEvent((TaskDeadEvent) event);
+ } else if (event instanceof TaskStartEvent) {
+ handleTaskStartEvent((TaskStartEvent) event);
+ } else {
+ LOG.error("Unknown event type:{}", event.getClass());
+ }
+ }
+ } catch (Exception e) {
+ if (!isShutdown.get()) {
+ LOG.error(e.getMessage(), e);
+ }
}
}
}
-
- public void handleRemoveEvent(Remove event) {
- String topologyId = event.topologyId;
- TopologyMetric topologyMetric = (TopologyMetric) dbCache.get(getTopologyKey(topologyId));
- if (topologyMetric == null) {
- LOG.warn("No TopologyMetric of " + topologyId);
- return;
+
+
+ public boolean isTopologyAlive(String topologyId) {
+ return topologyMetricContexts.containsKey(topologyId);
+ }
+
+ private int getAndPresetFirstEmptyIndex() {
+ for (int i = 0; i < maxPendingUploadMetrics; i++) {
+ if (metricStat.get(i) == UNSET) {
+ if (metricStat.compareAndSet(i, UNSET, PRE_SET)) {
+ return i;
+ }
+ }
}
-
- removing.put(topologyId, System.currentTimeMillis());
- dbCache.remove(getTopologyKey(topologyId));
- dbCache.remove(getNettyTopologyKey(topologyId));
- topologyNettyMgr.rmTopology(topologyId);
- LOG.info("Successfully remove TopologyMetric of " + topologyId);
- return;
-
- }
-
- public void cleanDeadSupervisorWorker(TopologyMetric metric) {
- List<String> removeList = new ArrayList<String>();
-
- Map<String, MetricInfo> workerMetric = metric.get_workerMetric();
- if (workerMetric == null) {
- return;
+ return -1;
+ }
+
+ private int getFirstPendingUploadIndex() {
+ for (int i = 0; i < maxPendingUploadMetrics; i++) {
+ if (metricStat.get(i) == SET) {
+ return i;
+ }
}
- for (String hostPort : workerMetric.keySet()) {
- if (hostPort.startsWith(DEAD_SUPERVISOR_HEAD)) {
- removeList.add(hostPort);
+ return -1;
+ }
+
+ public void markUploaded(int idx) {
+ this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA + idx);
+ this.metricCache.remove(PENDING_UPLOAD_METRIC_DATA_INFO + idx);
+ this.metricStat.set(idx, UNSET);
+ }
+
+ public void markUploading(int idx) {
+ this.metricStat.set(idx, UPLOADING);
+ }
+
+ public void markSet(int idx) {
+ this.metricStat.set(idx, SET);
+ }
+
+ public TopologyMetric getMetricDataFromCache(int idx) {
+ return (TopologyMetric) metricCache.get(PENDING_UPLOAD_METRIC_DATA + idx);
+ }
+
+ public TopologyMetricDataInfo getMetricDataInfoFromCache(int idx) {
+ return (TopologyMetricDataInfo) metricCache.get(PENDING_UPLOAD_METRIC_DATA_INFO + idx);
+ }
+
+ public void pushEvent(Event cmd) {
+ queue.offer(cmd);
+ }
+
+ public Map<String, Long> registerMetrics(String topologyId, Set<String> metricNames) {
+ TimeTicker ticker = new TimeTicker(TimeUnit.MILLISECONDS, true);
+
+ ConcurrentMap<String, Long> memMeta = topologyMetricContexts.get(topologyId).getMemMeta();
+ Map<String, Long> ret = new HashMap<>();
+ for (String metricName : metricNames) {
+ Long id = memMeta.get(metricName);
+ if (id != null && MetricUtils.isValidId(id)) {
+ ret.put(metricName, id);
+ } else {
+ id = metricIDGenerator.genMetricId(metricName);
+ Long old = memMeta.putIfAbsent(metricName, id);
+ if (old == null) {
+ ret.put(metricName, id);
+ } else {
+ ret.put(metricName, old);
+ }
}
}
-
- for (String removed : removeList) {
- workerMetric.remove(removed);
+
+ long cost = ticker.stop();
+ LOG.info("register metrics, topology:{}, size:{}, cost:{}", topologyId, metricNames.size(), cost);
+
+ return ret;
+ }
+
+ public void handleRemoveEvent(Remove event) {
+ String topologyId = event.topologyId;
+ if (topologyId != null) {
+ removeTopology(topologyId);
}
+ LOG.info("remove topology:{}.", topologyId);
+
}
-
- public void cleanTopology() {
- Map<String, Long> removingMap = removing.buildMap();
-
- Map<String, Assignment> assignMap = null;
+
+ private void removeTopology(String topologyId) {
+ metricCache.removeTopology(topologyId);
+ metricCache.removeSampleRate(topologyId);
+
+ topologyMetricContexts.remove(topologyId);
+ }
+
+
+ public void refreshTopologies() {
+ if (!topologyMetricContexts.containsKey(JStormMetrics.NIMBUS_METRIC_KEY)) {
+ LOG.info("adding __nimbus__ to metric context.");
+ Set<ResourceWorkerSlot> workerSlot = Sets.newHashSet(new ResourceWorkerSlot());
+ TopologyMetricContext metricContext = new TopologyMetricContext(workerSlot);
+ topologyMetricContexts.putIfAbsent(JStormMetrics.NIMBUS_METRIC_KEY, metricContext);
+ syncMetaFromCache(JStormMetrics.NIMBUS_METRIC_KEY, topologyMetricContexts.get(JStormMetrics.NIMBUS_METRIC_KEY));
+ }
+ if (!topologyMetricContexts.containsKey(JStormMetrics.CLUSTER_METRIC_KEY)) {
+ LOG.info("adding __cluster__ to metric context.");
+ Set<ResourceWorkerSlot> workerSlot = Sets.newHashSet(new ResourceWorkerSlot());
+ Map conf = new HashMap();
+ //there's no need to consider sample rate when cluster metrics merge
+ conf.put(ConfigExtension.TOPOLOGY_METRIC_SAMPLE_RATE, 1.0);
+ TopologyMetricContext metricContext = new TopologyMetricContext(
+ JStormMetrics.CLUSTER_METRIC_KEY, workerSlot, conf);
+ topologyMetricContexts.putIfAbsent(JStormMetrics.CLUSTER_METRIC_KEY, metricContext);
+ syncMetaFromCache(JStormMetrics.CLUSTER_METRIC_KEY, topologyMetricContexts.get(JStormMetrics.CLUSTER_METRIC_KEY));
+ }
+
+ Map<String, Assignment> assignMap;
try {
assignMap = Cluster.get_all_assignment(stormClusterState, null);
+ for (String topologyId : assignMap.keySet()) {
+ if (!topologyMetricContexts.containsKey(topologyId)) {
+ Assignment assignment = assignMap.get(topologyId);
+ TopologyMetricContext metricContext =
+ new TopologyMetricContext(assignment.getWorkers());
+ metricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment));
+ syncMetaFromCache(topologyId, metricContext);
+
+ LOG.info("adding {} to metric context.", topologyId);
+ topologyMetricContexts.put(topologyId, metricContext);
+ }
+ }
} catch (Exception e1) {
- // TODO Auto-generated catch block
- LOG.info("Failed to get Assignments");
+ LOG.warn("failed to get assignments");
+ return;
}
-
- for (String topologyId : topologyWorkers.keySet()) {
- if (assignMap.containsKey(topologyId) == false) {
- removingMap.put(topologyId, System.currentTimeMillis());
+
+ List<String> removing = new ArrayList<>();
+ for (String topologyId : topologyMetricContexts.keySet()) {
+ if (!JStormMetrics.NIMBUS_METRIC_KEY.equals(topologyId)
+ && !JStormMetrics.CLUSTER_METRIC_KEY.equals(topologyId)
+ && !assignMap.containsKey(topologyId)) {
+ removing.add(topologyId);
}
}
-
- for (String topologyId : removingMap.keySet()) {
- dbCache.remove(getTopologyKey(topologyId));
-
- Set<String> workers = topologyWorkers.get(topologyId);
- if (workers != null) {
- for (String workerSlot : workers) {
- dbCache.remove(getWorkerKey(topologyId, workerSlot));
+
+ for (String topologyId : removing) {
+ LOG.info("removing topology:{}", topologyId);
+ removeTopology(topologyId);
+ }
+ }
+
+ /**
+ * sync topology metric meta from external storage like TDDL/OTS.
+ * nimbus server will skip syncing, only followers do this
+ */
+ public void syncTopologyMeta() {
+ String nimbus = JStormMetrics.NIMBUS_METRIC_KEY;
+ if (topologyMetricContexts.containsKey(nimbus)) {
+ syncMetaFromRemote(nimbus, topologyMetricContexts.get(nimbus));
+ }
+ String cluster = JStormMetrics.CLUSTER_METRIC_KEY;
+ if (topologyMetricContexts.containsKey(cluster)) {
+ syncMetaFromRemote(cluster, topologyMetricContexts.get(cluster));
+ }
+
+ Map<String, Assignment> assignMap;
+ try {
+ assignMap = Cluster.get_all_assignment(stormClusterState, null);
+ for (String topologyId : assignMap.keySet()) {
+ if (topologyMetricContexts.containsKey(topologyId)) {
+ Assignment assignment = assignMap.get(topologyId);
+ TopologyMetricContext metricContext =
+ new TopologyMetricContext(assignment.getWorkers());
+ metricContext.setTaskNum(NimbusUtils.getTopologyTaskNum(assignment));
+
+ syncMetaFromCache(topologyId, metricContext);
+ syncMetaFromRemote(topologyId, metricContext);
}
- topologyWorkers.remove(topologyId);
- }
-
- }
-
- for (Entry<String, Set<String>> entry : topologyWorkers.entrySet()) {
- String topologyId = entry.getKey();
- Set<String> metricWorkers = entry.getValue();
-
- Set<String> workerSlots = new HashSet<String>();
-
- Assignment assignment = assignMap.get(topologyId);
- if (assignment == null) {
- LOG.error("Assignment disappear of " + topologyId);
- continue;
}
-
- for (ResourceWorkerSlot worker : assignment.getWorkers()) {
- String slot = getWorkerSlotName(worker.getNodeId(), worker.getPort());
- workerSlots.add(slot);
+ } catch (Exception e1) {
+ LOG.warn("failed to get assignments");
+ }
+ }
+
+ /**
+ * sync metric meta from rocks db into mem cache on startup
+ */
+ private void syncMetaFromCache(String topologyId, TopologyMetricContext context) {
+ if (!context.syncMeta()) {
+ Map<String, Long> meta = metricCache.getMeta(topologyId);
+ if (meta != null) {
+ context.getMemMeta().putAll(meta);
}
-
- Set<String> removes = new HashSet<String>();
- for (String slot : metricWorkers) {
- if (workerSlots.contains(slot) == false) {
- LOG.info("Remove worker metrics of {}:{}", topologyId, slot);
- removes.add(slot);
+ context.setSyncMeta(true);
+ }
+ }
+
+ private void syncMetaFromRemote(String topologyId, TopologyMetricContext context) {
+ try {
+ int memSize = context.getMemMeta().size();
+ int zkSize = (Integer) stormClusterState.get_topology_metric(topologyId);
+
+ if (memSize != zkSize) {
+ ConcurrentMap<String, Long> memMeta = context.getMemMeta();
+ for (MetaType metaType : MetaType.values()) {
+ List<MetricMeta> metaList = metricQueryClient.getMetricMeta(clusterName, topologyId, metaType);
+ if (metaList != null) {
+ LOG.info("get remote metric meta, topology:{}, metaType:{}, mem:{}, zk:{}, new size:{}",
+ topologyId, metaType, memSize, zkSize, metaList.size());
+ for (MetricMeta meta : metaList) {
+ memMeta.putIfAbsent(meta.getFQN(), meta.getId());
+ }
+ }
}
+ metricCache.putMeta(topologyId, memMeta);
}
-
- for (String slot : removes) {
- metricWorkers.remove(slot);
- dbCache.remove(getWorkerKey(topologyId, slot));
- }
+ } catch (Exception ex) {
+ LOG.error("failed to sync remote meta", ex);
}
}
-
+
/**
- * Upload metric to ZK
- *
- * @param event
+ * send topology track to jstorm monitor
*/
- public void handleUploadEvent(Upload event) {
- if (isUploading.getAndSet(true) == true) {
- LOG.info("Nimbus is alread uploading");
- return ;
- }
-
- long start = System.currentTimeMillis();
-
- cleanTopology();
-
- render();
-
- isUploading.set(false);
-
- long end = System.currentTimeMillis();
- uploadHistogram.update(end - start);
-
-
- }
-
- public String getWorkerHostname(WorkerUploadMetrics workerMetrics) {
-
- String hostname = null;
- String supervisorId = workerMetrics.get_supervisor_id();
- try {
- hostname = Cluster.get_supervisor_hostname(stormClusterState, supervisorId);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- LOG.warn("Failed to get hostname of " + supervisorId);
+ protected void handleKillTopologyEvent(KillTopologyEvent event) {
+ metricUploader.sendEvent(this.clusterName, event);
+ removeTopology(event.topologyId);
+ }
+
+ private void handleStartTopologyEvent(StartTopologyEvent event) {
+ this.metricCache.putSampleRate(event.topologyId, event.sampleRate);
+ metricUploader.sendEvent(this.clusterName, event);
+ if (!topologyMetricContexts.containsKey(event.topologyId)) {
+ TopologyMetricContext metricContext = new TopologyMetricContext();
+ // note that workerNum is not set here.
+ this.topologyMetricContexts.put(event.topologyId, metricContext);
}
- if (hostname == null) {
- hostname = DEAD_SUPERVISOR_HEAD + supervisorId;
+ }
+
+ private void handleTaskDeadEvent(TaskDeadEvent event) {
+ metricUploader.sendEvent(this.clusterName, event);
+
+ // unregister dead workers
+ Set<ResourceWorkerSlot> workers = new HashSet<>();
+ workers.addAll(event.deadTasks.values());
+ for (ResourceWorkerSlot worker : workers) {
+ metricCache.unregisterWorker(event.topologyId, worker.getHostname(), worker.getPort());
}
-
- return hostname;
}
-
- public void avgMetricWindow(MetricWindow metric, int parallel) {
- if (parallel == 0) {
- return;
+
+ private void handleTaskStartEvent(final TaskStartEvent event) {
+ Assignment assignment = event.newAssignment;
+ TopologyMetricContext metricContext = topologyMetricContexts.get(event.topologyId);
+ if (metricContext != null) {
+ metricContext.setWorkerSet(assignment.getWorkers());
+ } else {
+ metricContext = new TopologyMetricContext();
+ metricContext.setWorkerSet(assignment.getWorkers());
+ topologyMetricContexts.put(event.topologyId, metricContext);
}
- Map<Integer, Double> map = metric.get_metricWindow();
- Map<Integer, Double> newMap = new HashMap<Integer, Double>();
- if (map != null) {
- for (Entry<Integer, Double> entry : map.entrySet()) {
- newMap.put(entry.getKey(), entry.getValue() / parallel);
+ metricUploader.sendEvent(this.clusterName, event);
+ }
+
+ /**
+ * merge and send all metric data.
+ */
+ public void handleRefreshEvent(Refresh dummy) {
+ TimeTicker ticker = new TimeTicker(TimeUnit.MILLISECONDS, true);
+ try {
+ refreshTopologies();
+ LOG.info("refresh topologies, cost:{}", ticker.stopAndRestart());
+ if (!nimbusData.isLeader()) {
+ syncTopologyMeta();
+ LOG.info("sync topology meta, cost:{}", ticker.stop());
}
+ } catch (Exception ex) {
+ LOG.error("handleRefreshEvent error:", ex);
}
-
- metric.set_metricWindow(newMap);
- }
-
- public MetricInfo mergeMetricInfo(MetricInfo from, MetricInfo to, Set<String> tags) {
- if (to == null) {
- to = MetricThrift.mkMetricInfo();
- }
-
- if (from.get_baseMetric() == null) {
- LOG.warn("No base Metric ");
- return to;
- }
-
- for (String tag : tags) {
-
- MetricWindow fromMetric = from.get_baseMetric().get(tag);
- Map<String, MetricWindow> toMetricMap = to.get_baseMetric();
- if (toMetricMap == null) {
- toMetricMap = new HashMap<String, MetricWindow>();
- to.set_baseMetric(toMetricMap);
- }
-
- MetricWindow toMetric = toMetricMap.get(tag);
-
- toMetric = MetricThrift.mergeMetricWindow(fromMetric, toMetric);
-
- toMetricMap.put(tag, toMetric);
-
- }
-
- return to;
- }
-
- public Map<String, Map<String, MetricWindow>> mergeTaskStreams(
- Map<String, Map<String, MetricWindow>> componentStreams,
- Map<String, Map<String, MetricWindow>> taskStreams,
- Map<String, Map<String, AtomicInteger>> componentStreamParallel) {
-
- if (taskStreams == null || taskStreams.size() == 0) {
- return componentStreams;
- }
-
- if (componentStreams == null) {
- componentStreams = new HashMap<String, Map<String, MetricWindow>>();
- }
-
- for (Entry<String, Map<String, MetricWindow>> entry : taskStreams.entrySet()) {
+ }
+
+ private TopologyMetricContext getClusterTopologyMetricContext() {
+ return topologyMetricContexts.get(JStormMetrics.CLUSTER_METRIC_KEY);
+ }
+
+ private void mergeAndUploadClusterMetrics() {
+ TopologyMetricContext context = getClusterTopologyMetricContext();
+ TopologyMetric tpMetric = context.mergeMetrics();
+ if (tpMetric == null) {
+ tpMetric = MetricUtils.mkTopologyMetric();
+ tpMetric.set_topologyMetric(MetricUtils.mkMetricInfo());
+ }
+
+ //reset snapshots metric id
+ MetricInfo clusterMetrics = tpMetric.get_topologyMetric();
+ Map<String, Long> metricNames = context.getMemMeta();
+ for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : clusterMetrics.get_metrics().entrySet()) {
String metricName = entry.getKey();
- Map<String, MetricWindow> streamMetricWindows = entry.getValue();
-
- if (streamMetricWindows == null) {
- continue;
- }
-
- Map<String, AtomicInteger> streamCounters = componentStreamParallel.get(metricName);
- if (streamCounters == null) {
- streamCounters = new HashMap<String, AtomicInteger>();
- componentStreamParallel.put(metricName, streamCounters);
+ MetricType metricType = MetricUtils.metricType(metricName);
+ Long metricId = metricNames.get(metricName);
+ for (Map.Entry<Integer, MetricSnapshot> metric : entry.getValue().entrySet()) {
+ MetricSnapshot snapshot = metric.getValue();
+ snapshot.set_metricId(metricId);
+ if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) {
+ snapshot.set_points(new ArrayList<Long>(0));
+ }
+// entry.getValue().put(metric.getKey(), snapshot);
}
-
- Map<String, MetricWindow> componentStreamMetricWindows = componentStreams.get(metricName);
- if (componentStreamMetricWindows == null) {
- componentStreamMetricWindows = new HashMap<String, MetricWindow>();
- componentStreams.put(metricName, componentStreamMetricWindows);
+ }
+
+ //fill the unacquired metrics with zero
+ long ts = System.currentTimeMillis();
+ for (Map.Entry<String, Long> entry : metricNames.entrySet()) {
+ String name = entry.getKey();
+ if (!clusterMetrics.get_metrics().containsKey(name)) {
+ Map<Integer, MetricSnapshot> metric = new HashMap<>();
+ MetricType type = MetricUtils.metricType(name);
+ metric.put(AsmWindow.M1_WINDOW, new MetricSnapshot(entry.getValue(), ts, type.getT()));
+ clusterMetrics.put_to_metrics(name, metric);
}
-
- for (Entry<String, MetricWindow> streamEntry : streamMetricWindows.entrySet()) {
- String streamName = streamEntry.getKey();
- MetricWindow taskMetricWindow = streamEntry.getValue();
-
- MetricWindow componentMetricWindow = componentStreamMetricWindows.get(streamName);
-
- componentMetricWindow = MetricThrift.mergeMetricWindow(taskMetricWindow, componentMetricWindow);
-
- componentStreamMetricWindows.put(streamName, componentMetricWindow);
-
- AtomicInteger counter = streamCounters.get(streamName);
- if (counter == null) {
- counter = new AtomicInteger(0);
- streamCounters.put(streamName, counter);
+ }
+
+ //upload to cache
+ Update event = new Update();
+ event.timestamp = System.currentTimeMillis();
+ event.topologyMetrics = tpMetric;
+ event.topologyId = JStormMetrics.CLUSTER_METRIC_KEY;
+ pushEvent(event);
+
+ LOG.info("send update event for cluster metrics, size : {}", clusterMetrics.get_metrics_size());
+ }
+
+ //update cluster metrics local cache
+ private void updateClusterMetrics(String topologyId, TopologyMetric tpMetric) {
+ if (tpMetric.get_topologyMetric().get_metrics_size() > 0) {
+ TopologyMetricContext context = getClusterTopologyMetricContext();
+ MetricInfo topologyMetrics = tpMetric.get_topologyMetric();
+ // make a new MetricInfo to save the topologyId's metric
+ MetricInfo clusterMetrics = MetricUtils.mkMetricInfo();
+ Set<String> metricNames = new HashSet<>();
+ for (Map.Entry<String, Map<Integer, MetricSnapshot>> entry : topologyMetrics.get_metrics().entrySet()) {
+ String metricName = MetricUtils.topo2clusterName(entry.getKey());
+ MetricType metricType = MetricUtils.metricType(metricName);
+ Map<Integer, MetricSnapshot> winData = new HashMap<>();
+ for (Map.Entry<Integer, MetricSnapshot> entryData : entry.getValue().entrySet()) {
+ MetricSnapshot snapshot = entryData.getValue().deepCopy();
+ winData.put(entryData.getKey(), snapshot);
+ if (metricType == MetricType.HISTOGRAM || metricType == MetricType.TIMER) {
+ // reset topology metric points
+ entryData.getValue().set_points(new ArrayList<Long>(0));
+ }
}
- counter.incrementAndGet();
+ clusterMetrics.put_to_metrics(metricName, winData);
+ metricNames.add(metricName);
}
+ // save to local cache, waiting for merging
+ context.addToMemCache(topologyId, clusterMetrics);
+ registerMetrics(JStormMetrics.CLUSTER_METRIC_KEY, metricNames);
}
-
- return componentStreams;
}
-
- public void avgStreams(Map<String, Map<String, MetricWindow>> tagStreamsMetrics, Map<String, Map<String, AtomicInteger>> counters, String tag) {
- if (tagStreamsMetrics == null) {
- return;
- }
-
- Map<String, MetricWindow> streamMetrics = tagStreamsMetrics.get(tag);
- if (streamMetrics == null) {
- return;
+
+ /**
+ * put metric data to metric cache.
+ */
+ public void handleUpdateEvent(Update event) {
+ TopologyMetric topologyMetrics = event.topologyMetrics;
+ final String topologyId = event.topologyId;
+
+ if (this.topologyMetricContexts.containsKey(topologyId)) {
+ if (!JStormMetrics.CLUSTER_METRIC_KEY.equals(topologyId)) {
+ updateClusterMetrics(topologyId, topologyMetrics);
+ }
+
+ // overwrite
+ metricCache.putMetricData(topologyId, topologyMetrics);
+
+ // below process is kind of a transaction, first we lock an empty slot, mark it as PRE_SET
+ // by this time the slot is not yet ready for uploading as the upload thread looks for SET slots only
+ // after all metrics data has been saved, we mark it as SET, then it's ready for uploading.
+ int idx = getAndPresetFirstEmptyIndex();
+ if (idx >= 0) {
+ TopologyMetricDataInfo summary = new TopologyMetricDataInfo();
+ summary.topologyId = topologyId;
+ summary.timestamp = event.timestamp;
+ if (topologyId.equals(JStormMetrics.NIMBUS_METRIC_KEY) ||
+ topologyId.equals(JStormMetrics.CLUSTER_METRIC_KEY)) {
+ summary.type = MetricUploader.METRIC_TYPE_TOPLOGY;
+ } else {
+ if (topologyMetrics.get_topologyMetric().get_metrics_size() > 0 ||
+ topologyMetrics.get_componentMetric().get_metrics_size() > 0) {
+ if (topologyMetrics.get_taskMetric().get_metrics_size() +
+ topologyMetrics.get_workerMetric().get_metrics_size() +
+ topologyMetrics.get_nettyMetric().get_metrics_size() +
+ topologyMetrics.get_streamMetric().get_metrics_size() > 0) {
+ summary.type = MetricUploader.METRIC_TYPE_ALL;
+ } else {
+ summary.type = MetricUploader.METRIC_TYPE_TOPLOGY;
+ }
+ } else {
+ summary.type = MetricUploader.METRIC_TYPE_TASK;
+ }
+ }
+
+ metricCache.put(PENDING_UPLOAD_METRIC_DATA_INFO + idx, summary);
+ metricCache.put(PENDING_UPLOAD_METRIC_DATA + idx, topologyMetrics);
+ markSet(idx);
+ LOG.info("put metric data to local cache, topology:{}, idx:{}", topologyId, idx);
+ } else {
+ LOG.error("exceeding maxPendingUploadMetrics, skip metrics data for topology:{}", topologyId);
+ }
+ } else {
+ LOG.warn("topology {} has been killed or has not started, skip update.", topologyId);
}
-
- for (Entry<String, MetricWindow> entry : streamMetrics.entrySet()) {
- String streamName = entry.getKey();
- MetricWindow metric = entry.getValue();
-
- AtomicInteger counter = counters.get(tag).get(streamName);
- if (counter == null) {
- continue;
-
+ }
+
+ /**
+ * get topology metrics, note that only topology & component & worker metrics are returned
+ */
+ public TopologyMetric getTopologyMetric(String topologyId) {
+ long start = System.nanoTime();
+ try {
+ TopologyMetric ret = new TopologyMetric();
+ List<MetricInfo> topologyMetrics = metricCache.getMetricData(topologyId, MetaType.TOPOLOGY);
+ List<MetricInfo> componentMetrics = metricCache.getMetricData(topologyId, MetaType.COMPONENT);
+ List<MetricInfo> workerMetrics = metricCache.getMetricData(topologyId, MetaType.WORKER);
+
+ MetricInfo dummy = MetricUtils.mkMetricInfo();
+ if (topologyMetrics.size() > 0) {
+ // get the last min topology metric
+ ret.set_topologyMetric(topologyMetrics.get(topologyMetrics.size() - 1));
+ } else {
+ ret.set_topologyMetric(dummy);
}
-
- avgMetricWindow(metric, counter.get());
- }
- }
-
- public void mergeTasks(TopologyMetric topologyMetric, String topologyId) {
- Map<Integer, MetricInfo> taskMetrics = topologyMetric.get_taskMetric();
-
- Map<Integer, String> taskToComponent = null;
- try {
- taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- LOG.error("Failed to get taskToComponent");
- return ;
- }
- if (taskToComponent == null) {
- LOG.error("Failed to get taskToComponent");
- return ;
- }
-
- Map<String, MetricInfo> componentMetrics = topologyMetric.get_componentMetric();
- if (componentMetrics == null) {
- componentMetrics = new HashMap<String, MetricInfo>();
- topologyMetric.set_componentMetric(componentMetrics);
- }
-
- Map<String, AtomicInteger> componentTaskParallel = new HashMap<String, AtomicInteger>();
- Map<String, Map<String, AtomicInteger>> componentStreamParallel = new HashMap<String, Map<String, AtomicInteger>>();
-
- for (Entry<Integer, MetricInfo> entry : taskMetrics.entrySet()) {
- Integer taskId = entry.getKey();
- MetricInfo taskMetric = entry.getValue();
-
- String component = taskToComponent.get(taskId);
- if (component == null) {
- LOG.error("Failed to get component of task " + taskId);
- continue;
+ if (componentMetrics.size() > 0) {
+ ret.set_componentMetric(componentMetrics.get(0));
+ } else {
+ ret.set_componentMetric(dummy);
}
-
- MetricInfo componentMetric = componentMetrics.get(component);
-
- componentMetric = mergeMetricInfo(taskMetric, componentMetric, MetricDef.MERGE_SUM_TAG);
- componentMetric = mergeMetricInfo(taskMetric, componentMetric, MetricDef.MERGE_AVG_TAG);
-
- Map<String, Map<String, MetricWindow>> input = mergeTaskStreams(componentMetric.get_inputMetric(), taskMetric.get_inputMetric(), componentStreamParallel);
- componentMetric.set_inputMetric(input);
-
- Map<String, Map<String, MetricWindow>> output = mergeTaskStreams(componentMetric.get_outputMetric(), taskMetric.get_outputMetric(), componentStreamParallel);
- componentMetric.set_outputMetric(output);
-
- componentMetrics.put(component, componentMetric);
-
- AtomicInteger counter = componentTaskParallel.get(component);
- if (counter == null) {
- counter = new AtomicInteger(0);
- componentTaskParallel.put(component, counter);
+ if (workerMetrics.size() > 0) {
+ ret.set_workerMetric(workerMetrics.get(0));
+ } else {
+ ret.set_workerMetric(dummy);
}
-
- counter.incrementAndGet();
- }
-
- for (Entry<String, MetricInfo> entry : componentMetrics.entrySet()) {
- String componentName = entry.getKey();
- MetricInfo metricInfo = entry.getValue();
-
- AtomicInteger counter = componentTaskParallel.get(componentName);
-
- for (String tag : MetricDef.MERGE_AVG_TAG) {
- MetricWindow metricWindow = metricInfo.get_baseMetric().get(tag);
-
- avgMetricWindow(metricWindow, counter.get());
-
- avgStreams(metricInfo.get_inputMetric(), componentStreamParallel, tag);
- avgStreams(metricInfo.get_outputMetric(), componentStreamParallel, tag);
+ ret.set_taskMetric(dummy);
+ ret.set_streamMetric(dummy);
+ ret.set_nettyMetric(dummy);
+
+ return ret;
+ } finally {
+ long end = System.nanoTime();
+ SimpleJStormMetric.updateNimbusHistogram("getTopologyMetric", (end - start) / TimeUtils.NS_PER_US);
+ }
+ }
+
+ public static String getWorkerSlotName(String hostname, Integer port) {
+ return hostname + ":" + port;
+ }
+
+ class RefreshTopologiesThread extends RunnableCallback {
+ @Override
+ public void run() {
+ if (!isShutdown.get()) {
+ pushEvent(new Refresh());
}
}
+
+ @Override
+ public Object getResult() {
+ return TimeUtils.SEC_PER_MIN;
+ }
+
+ @Override
+ public String getThreadName() {
+ return "RefreshThread";
+ }
}
-
- public void mergeComponent(TopologyMetric topologyMetric) {
- MetricInfo topologyMetricInfo = MetricThrift.mkMetricInfo();
- topologyMetric.set_topologyMetric(topologyMetricInfo);
- Map<String, MetricInfo> componentMetrics = topologyMetric.get_componentMetric();
- if (componentMetrics == null) {
- return;
+
+ class MetricsUploadThread extends Thread {
+ public MetricsUploadThread() {
+ setName("main-upload-thread");
}
-
- for (MetricInfo componentMetric : componentMetrics.values()) {
- topologyMetricInfo = mergeMetricInfo(componentMetric, topologyMetricInfo, MetricDef.MERGE_SUM_TAG);
- }
-
- topologyMetric.set_topologyMetric(topologyMetricInfo);
- }
-
- public void mergeTopology(TopologyMetric topologyMetric, WorkerUploadMetrics workerMetrics) {
- String topologyId = workerMetrics.get_topology_id();
-
- Map<Integer, MetricInfo> taskMetrics = topologyMetric.get_taskMetric();
- if (taskMetrics == null) {
- taskMetrics = new HashMap<Integer, MetricInfo>();
- topologyMetric.set_taskMetric(taskMetrics);
- }
- taskMetrics.putAll(workerMetrics.get_taskMetric());
-
- String hostname = getWorkerHostname(workerMetrics);
- topologyMetric.put_to_workerMetric(getWorkerSlotName(hostname, workerMetrics.get_port()), workerMetrics.get_workerMetric());
-
- }
-
- public void mergeNetty(WorkerUploadMetrics workerMetric, String topologyId, Set<String> connections) {
-
- if (topologyNettyMgr.getTopology(topologyId) == false) {
- return ;
- }
- Map<String, MetricInfo> connectionMetrics = workerMetric.get_nettyMetric().get_connections();
- for (Entry<String, MetricInfo> entry : connectionMetrics.entrySet()) {
- String connectionName = entry.getKey();
- MetricInfo metric = entry.getValue();
-
- MetricInfo cacheMetric = (MetricInfo)dbCache.get(getNettyConnectionKey(topologyId, connectionName));
- cacheMetric = MetricThrift.mergeMetricInfo(metric, cacheMetric);
-
- connections.add(connectionName);
-
- dbCache.put(getNettyConnectionKey(topologyId, connectionName), cacheMetric);
- }
- }
-
- public void mergeNetty(String topologyId, Set<String> connections) {
- if (topologyNettyMgr.getTopology(topologyId) == false) {
- LOG.info("Skip merge netty detail metrics");
- return ;
- }
- // @@@
- // this function will cost much memory when worker number is more than 200
- Map<String, MetricInfo> metricMap = new TreeMap<String, MetricInfo>();
-
- for (String connection : connections) {
- MetricInfo cacheMetric = (MetricInfo)dbCache.get(getNettyConnectionKey(topologyId, connection));
- if (cacheMetric == null) {
- LOG.warn("Failed to get cacheMetric of {}:{}", topologyId, connection );
- continue;
- }
-
- metricMap.put(connection, cacheMetric);
- dbCache.remove(getNettyConnectionKey(topologyId, connection));
- }
-
- dbCache.put(getNettyTopologyKey(topologyId), metricMap);
- // accelerate free memory
- metricMap.clear();
- }
-
- public void render() {
- for (Entry<String, Set<String>> entry : topologyWorkers.entrySet()) {
- String topologyId = entry.getKey();
- Set<String> workers = entry.getValue();
- Set<String> connections = new TreeSet<String>();
-
- TopologyMetric topologyMetric = new TopologyMetric();
-
- boolean isExistWorker = false;
- for (String workerId : workers) {
- WorkerUploadMetrics workerMetric = (WorkerUploadMetrics) dbCache.get(getWorkerKey(topologyId, workerId));
- if (workerMetric == null) {
- LOG.warn("Failed to get WorkerUploadMetrics of " + getWorkerKey(topologyId, workerId));
- continue;
+
+ @Override
+ public void run() {
+ while (!isShutdown.get()) {
+ try {
+ if (metricUploader != null && nimbusData.isLeader()) {
+ final int idx = getFirstPendingUploadIndex();
+ if (idx >= 0) {
+ markUploading(idx);
+ upload(clusterName, idx);
+ }
+ }
+ JStormUtils.sleepMs(5);
+ } catch (Exception ex) {
+ LOG.error("Error", ex);
}
- isExistWorker = true;
- mergeTopology(topologyMetric, workerMetric);
-
- mergeNetty(workerMetric, topologyId, connections);
}
- if (isExistWorker == false) {
- LOG.info("No worker metrics of {}", topologyId);
- continue;
+ }
+
+ public boolean upload(final String clusterName, final int idx) {
+ final TopologyMetricDataInfo summary = getMetricDataInfoFromCache(idx);
+ if (summary == null) {
+ LOG.warn("metric summary is null from cache idx:{}", idx);
+ markUploaded(idx);
+ return true;
}
-
- mergeTasks(topologyMetric, topologyId);
-
- mergeComponent(topologyMetric);
-
-
- dbCache.put(getTopologyKey(topologyId), topologyMetric);
-
- mergeNetty(topologyId, connections);
-
- LOG.info("Successfully render topologyId of " + topologyId);
-
- uploadToAlimonitor(topologyMetric, topologyId);
-
- cleanDeadSupervisorWorker(topologyMetric);
-
-
- try {
-
- //LOG.info(topologyId + " metrics is :\n" + Utils.toPrettyJsonString(topologyMetric));
- LOG.info(topologyId + " finish metric");
- stormClusterState.set_topology_metric(topologyId, topologyMetric);
- LOG.info("Successfully uploaded toplogy metrics: " + topologyId);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- LOG.info("Failed to upload toplogy metrics: " + topologyId, e);
- continue;
+
+ final String topologyId = summary.topologyId;
+ if (!isTopologyAlive(topologyId)) {
+ LOG.warn("topology {} is not alive, skip sending metrics.", topologyId);
+ markUploaded(idx);
+ return true;
}
-
+
+ return metricUploader.upload(clusterName, topologyId, idx, summary.toMap());
}
}
-
- public void handleUpdateEvent(Update event) {
- long start = System.currentTimeMillis();
-
- WorkerUploadMetrics workerMetrics = event.workerMetrics;
-
- String topologyId = workerMetrics.get_topology_id();
- if (removing.containsKey(topologyId) == true) {
- LOG.info("Topology " + topologyId + " has been removed, skip update");
- return;
+
+ class FlushMetricMetaThread extends Thread {
+
+ public FlushMetricMetaThread() {
+ setName("FlushMetricMetaThread");
}
-
- Set<String> workers = topologyWorkers.get(topologyId);
- if (workers == null) {
- workers = new HashSet<String>();
- topologyWorkers.put(topologyId, workers);
- }
-
- String workerSlot = getWorkerSlotName(workerMetrics.get_supervisor_id(), workerMetrics.get_port());
-
- workers.add(workerSlot);
- dbCache.put(getWorkerKey(topologyId, workerSlot), workerMetrics);
-
- long end = System.currentTimeMillis();
-
- updateHistogram.update((end - start));
- }
-
- public void uploadToAlimonitor(TopologyMetric topologyMetric, String topologyId) {
- // @@@ TODO
- }
-
-
- public TopologyMetric getTopologyMetric(String topologyId) {
- long start = System.nanoTime();
- try {
- TopologyMetric ret = (TopologyMetric) dbCache.get(getTopologyKey(topologyId));
- if (ret == null) {
- return emptyTopologyMetric;
- } else {
- return ret;
+
+ @Override
+ public void run() {
+ while (!isShutdown.get()) {
+ long start = System.currentTimeMillis();
+ try {
+ // if metricUploader is not fully initialized, return directly
+ if (nimbusData.isLeader() && metricUploader != null) {
+ for (Map.Entry<String, TopologyMetricContext> entry : topologyMetricContexts.entrySet()) {
+ String topologyId = entry.getKey();
+ TopologyMetricContext metricContext = entry.getValue();
+
+ Map<String, Long> cachedMeta = metricCache.getMeta(topologyId);
+ if (cachedMeta == null) {
+ cachedMeta = new HashMap<>();
+ }
+ Map<String, Long> memMeta = metricContext.getMemMeta();
+ if (memMeta.size() > cachedMeta.size()) {
+ cachedMeta.putAll(memMeta);
+ }
+ metricCache.putMeta(topologyId, cachedMeta);
+
+ int curSize = cachedMeta.size();
+ if (curSize != metricContext.getFlushedMetaNum()) {
+ metricContext.setFlushedMetaNum(curSize);
+
+ metricUploader.registerMetrics(clusterName, topologyId, cachedMeta);
+ LOG.info("flush metric meta, topology:{}, total:{}, cost:{}.",
+ topologyId, curSize, System.currentTimeMillis() - start);
+ }
+ stormClusterState.set_topology_metric(topologyId, curSize);
+ }
+ }
+
+ JStormUtils.sleepMs(15000);
+ } catch (Exception ex) {
+ LOG.error("Error", ex);
+ }
}
- }finally {
- long end = System.nanoTime();
-
- SimpleJStormMetric.updateHistorgram("getTopologyMetric", (end - start)/1000000.0d);
}
}
-
- public SortedMap<String, MetricInfo> getNettyMetric(String topologyId) {
- TreeMap<String, MetricInfo> ret = (TreeMap<String, MetricInfo>)dbCache.get(getNettyTopologyKey(topologyId));
- if (ret == null) {
- return emptyNettyMetric;
- }else {
+
+ public static class TopologyMetricDataInfo implements Serializable {
+ private static final long serialVersionUID = 1303262512351757610L;
+
+ public String topologyId;
+ public String type; // "tp" for tp/comp metrics OR "task" for task/stream/worker/netty metrics
+ public long timestamp; // metrics report time
+
+ public Map<String, Object> toMap() {
+ Map<String, Object> ret = new HashMap<String, Object>();
+ ret.put(MetricUploader.METRIC_TIME, timestamp);
+ ret.put(MetricUploader.METRIC_TYPE, type);
+
return ret;
}
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ }
}
-
- public static String getWorkerSlotName(String hostname, Integer port) {
- return hostname + ":" + port;
+
+ // ==============================================
+ // =================== events ===================
+ // ==============================================
+ public static class Event {
+ protected Event() {
+ }
+
+ public String clusterName;
+ public String topologyId;
+ public long timestamp;
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+ }
+
+ public static class Update extends Event {
+ public TopologyMetric topologyMetrics;
}
-
- public static String getWorkerKey(String topologyId, String workerSlot) {
- return CACHE_NAMESPACE_METRIC + "@" + topologyId + "@" + workerSlot;
+
+ public static class Remove extends Event {
}
-
- public static String getTopologyKey(String topologyId) {
- return CACHE_NAMESPACE_METRIC + "@" + topologyId;
+
+ public static class Refresh extends Event {
}
-
- public static String getNettyConnectionKey(String topologyId, String connection) {
- return CACHE_NAMESPACE_NETTY + "@" + topologyId + "@" + connection;
+
+
+ public static class KillTopologyEvent extends Event {
}
-
- public static String getNettyTopologyKey(String topologyId) {
- return CACHE_NAMESPACE_NETTY + "@" + topologyId;
+
+ public static class StartTopologyEvent extends Event {
+ public double sampleRate;
+ }
+
+ public static class TaskDeadEvent extends Event {
+ public Map<Integer, ResourceWorkerSlot> deadTasks;
+ }
+
+ public static class TaskStartEvent extends Event {
+ public Assignment oldAssignment;
+ public Assignment newAssignment;
+ public Map<Integer, String> task2Component;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java
index 7eaccab..6e55049 100644
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/TopologyNettyMgr.java
@@ -1,105 +1,78 @@
package com.alibaba.jstorm.daemon.nimbus;
-import java.util.Map;
-
+import backtype.storm.Config;
+import backtype.storm.generated.InvalidTopologyException;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.metric.MetricUtils;
import org.jboss.netty.util.internal.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.cluster.Common;
-import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.utils.JStormUtils;
-
-import backtype.storm.Config;
-import backtype.storm.generated.InvalidTopologyException;
+import java.util.Map;
public class TopologyNettyMgr {
- private static Logger LOG = LoggerFactory.getLogger(TopologyNettyMgr.class);
- private boolean defaultEnable = true;
- private Map nimbusConf;
- private ConcurrentHashMap<String, Boolean> setting = new ConcurrentHashMap<String, Boolean>();
- private static final int WORKER_DISABLE_THREADHOLD = 200;
-
- public TopologyNettyMgr(Map conf) {
- nimbusConf = conf;
-
- Boolean isEnable = ConfigExtension.isEnableTopologyNettyMetrics(conf);
- if (isEnable != null) {
- defaultEnable = isEnable;
- }
-
- LOG.info("Default netty metrics setting is " + defaultEnable);
- }
-
- protected boolean getTopology(Map conf) {
- Boolean isEnable = ConfigExtension.isEnableTopologyNettyMetrics(conf);
- if (isEnable != null) {
- return isEnable;
- }
-
- int workerNum = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_WORKERS), 1);
- if (workerNum <= WORKER_DISABLE_THREADHOLD) {
- isEnable = Boolean.TRUE;
- }else {
- isEnable = Boolean.FALSE;
- }
-
- return isEnable;
- }
-
- public boolean getTopology(String topologyId) {
- try {
- String topologyName = Common.topologyIdToName(topologyId);
-
- Boolean isEnable = setting.get(topologyName);
- if (isEnable != null) {
- return isEnable;
- }
-
- Map topologyConf =
- StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
-
- isEnable = getTopology(topologyConf);
- setting.put(topologyName, isEnable);
- LOG.info("{} netty metrics setting is {}", topologyName, isEnable);
- return isEnable;
-
- }catch(Exception e) {
- LOG.info("Failed to get {} netty metrics setting ", topologyId);
- return defaultEnable;
- }
-
-
- }
-
- public void setTopology(Map conf) {
- String topologyName = (String)conf.get(Config.TOPOLOGY_NAME);
- if (topologyName == null) {
- LOG.info("No topologyName setting");
- return ;
- }
-
- boolean isEnable = getTopology(conf);
-
- setting.put(topologyName, isEnable);
-
- LOG.info("{} netty metrics setting is {}", topologyName, isEnable);
- return ;
-
- }
-
- public void rmTopology(String topologyId) {
- String topologyName;
- try {
- topologyName = Common.topologyIdToName(topologyId);
- setting.remove(topologyName);
- LOG.info("Remove {} netty metrics setting ", topologyName);
- } catch (InvalidTopologyException e) {
- // TODO Auto-generated catch block
-
- }
-
- }
+ private static Logger LOG = LoggerFactory.getLogger(TopologyNettyMgr.class);
+ private Map nimbusConf;
+ private ConcurrentHashMap<String, Boolean> setting = new ConcurrentHashMap<String, Boolean>();
+
+ public TopologyNettyMgr(Map conf) {
+ nimbusConf = conf;
+
+ }
+
+ protected boolean getTopology(Map conf) {
+ return MetricUtils.isEnableNettyMetrics(conf);
+ }
+
+ public boolean getTopology(String topologyId) {
+ try {
+ String topologyName = Common.topologyIdToName(topologyId);
+
+ Boolean isEnable = setting.get(topologyName);
+ if (isEnable != null) {
+ return isEnable;
+ }
+
+ Map topologyConf = StormConfig.read_nimbus_topology_conf(nimbusConf, topologyId);
+
+ isEnable = getTopology(topologyConf);
+ setting.put(topologyName, isEnable);
+ LOG.info("{} netty metrics setting is {}", topologyName, isEnable);
+ return isEnable;
+
+ } catch (Exception e) {
+ LOG.info("Failed to get {} netty metrics setting ", topologyId);
+ return true;
+ }
+
+ }
+
+ public void setTopology(Map conf) {
+ String topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
+ if (topologyName == null) {
+ LOG.info("No topologyName setting");
+ return;
+ }
+
+ boolean isEnable = getTopology(conf);
+
+ setting.put(topologyName, isEnable);
+
+ LOG.info("{} netty metrics setting is {}", topologyName, isEnable);
+ return;
+
+ }
+
+ public void rmTopology(String topologyId) {
+ String topologyName;
+ try {
+ topologyName = Common.topologyIdToName(topologyId);
+ setting.remove(topologyName);
+ LOG.info("Remove {} netty metrics setting ", topologyName);
+ } catch (InvalidTopologyException ignored) {
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java
new file mode 100644
index 0000000..78bb1d2
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/AlimonitorClient.java
@@ -0,0 +1,226 @@
+package com.alibaba.jstorm.daemon.nimbus.metric.uploader;
+
+import backtype.storm.generated.TopologyMetric;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AlimonitorClient extends DefaultMetricUploader {
+
+ public static Logger LOG = LoggerFactory.getLogger(AlimonitorClient.class);
+
+ // Send to localhost:15776 by default
+ public static final String DEFAUT_ADDR = "127.0.0.1";
+ public static final String DEFAULT_PORT = "15776";
+ public static final int DEFAUTL_FLAG = 0;
+ public static final String DEFAULT_ERROR_INFO = "";
+
+ private final String COLLECTION_FLAG = "collection_flag";
+ private final String ERROR_INFO = "error_info";
+ private final String MSG = "MSG";
+
+ private String port;
+ private String requestIP;
+ private String monitorName;
+ private int collectionFlag;
+ private String errorInfo;
+
+ private boolean post;
+
+ public AlimonitorClient() {
+ }
+
+ public AlimonitorClient(String requestIP, String port, boolean post) {
+ this.requestIP = requestIP;
+ this.port = port;
+ this.post = post;
+ this.monitorName = null;
+ this.collectionFlag = 0;
+ this.errorInfo = null;
+ }
+
+ public void setIpAddr(String ipAddr) {
+ this.requestIP = ipAddr;
+ }
+
+ public void setPort(String port) {
+ this.port = port;
+ }
+
+ public void setMonitorName(String monitorName) {
+ this.monitorName = monitorName;
+ }
+
+ public void setCollectionFlag(int flag) {
+ this.collectionFlag = flag;
+ }
+
+ public void setErrorInfo(String msg) {
+ this.errorInfo = msg;
+ }
+
+ public void setPostFlag(boolean post) {
+ this.post = post;
+ }
+
+ public String buildURL() {
+ return "http://" + requestIP + ":" + port + "/passive";
+ }
+
+ public String buildRqstAddr() {
+ return "http://" + requestIP + ":" + port + "/passive?name=" + monitorName + "&msg=";
+ }
+
+
+ public Map buildAliMonitorMsg(int collection_flag, String error_message) {
+ // Json format of the message sent to Alimonitor
+ // {
+ // "collection_flag":int,
+ // "error_info":string,
+ // "MSG": ojbect | array
+ // }
+ Map ret = new HashMap();
+ ret.put(COLLECTION_FLAG, collection_flag);
+ ret.put(ERROR_INFO, error_message);
+ ret.put(MSG, null);
+
+ return ret;
+ }
+
+ private void addMsgData(Map jsonObj, Map<String, Object> map) {
+ jsonObj.put(MSG, map);
+ }
+
+ private boolean sendRequest(int collection_flag, String error_message, Map<String, Object> msg) throws Exception {
+ boolean ret = false;
+
+ if (msg.size() == 0)
+ return ret;
+
+ Map jsonObj = buildAliMonitorMsg(collection_flag, error_message);
+ addMsgData(jsonObj, msg);
+ String jsonMsg = jsonObj.toString();
+ LOG.info(jsonMsg);
+
+ if (post == true) {
+ String url = buildURL();
+ ret = httpPost(url, jsonMsg);
+ } else {
+ String request = buildRqstAddr();
+ StringBuilder postAddr = new StringBuilder();
+ postAddr.append(request);
+ postAddr.append(URLEncoder.encode(jsonMsg));
+
+ ret = httpGet(postAddr);
+ }
+
+ return ret;
+ }
+
+ private boolean httpGet(StringBuilder postAddr) {
+ boolean ret = false;
+
+ CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+ CloseableHttpResponse response = null;
+
+ try {
+ HttpGet request = new HttpGet(postAddr.toString());
+ response = httpClient.execute(request);
+ HttpEntity entity = response.getEntity();
+ if (entity != null) {
+ LOG.info(EntityUtils.toString(entity));
+ }
+ EntityUtils.consume(entity);
+ ret = true;
+ } catch (Exception e) {
+ LOG.error("Exception when sending http request to alimonitor", e);
+ } finally {
+ try {
+ if (response != null)
+ response.close();
+ httpClient.close();
+ } catch (Exception e) {
+ LOG.error("Exception when closing httpclient", e);
+ }
+ }
+
+ return ret;
+ }
+
+ private boolean httpPost(String url, String msg) {
+ boolean ret = false;
+
+ CloseableHttpClient httpClient = HttpClientBuilder.create().build();
+ CloseableHttpResponse response = null;
+
+ try {
+ HttpPost request = new HttpPost(url);
+ List<NameValuePair> nvps = new ArrayList<NameValuePair>();
+ nvps.add(new BasicNameValuePair("name", monitorName));
+ nvps.add(new BasicNameValuePair("msg", msg));
+ request.setEntity(new UrlEncodedFormEntity(nvps));
+ response = httpClient.execute(request);
+ HttpEntity entity = response.getEntity();
+ if (entity != null) {
+ LOG.info(EntityUtils.toString(entity));
+ }
+ EntityUtils.consume(entity);
+ ret = true;
+ } catch (Exception e) {
+ LOG.error("Exception when sending http request to alimonitor", e);
+ } finally {
+ try {
+ if (response != null)
+ response.close();
+ httpClient.close();
+ } catch (Exception e) {
+ LOG.error("Exception when closing httpclient", e);
+ }
+ }
+
+ return ret;
+ }
+
+
+ protected Map<String, Object> convertMap(String clusterName, String topologyId, TopologyMetric tpMetric) {
+ /**
+ * @@@ Todo
+ */
+ return null;
+ }
+
+ @Override
+ public boolean upload(String clusterName, String topologyId, TopologyMetric tpMetric, Map<String, Object> metricContext) {
+ // TODO Auto-generated method stub
+ Map<String, Object> metricMap = convertMap(clusterName, topologyId, tpMetric);
+ if (metricMap == null || metricMap.isEmpty() == true) {
+ return false;
+ }
+
+ try {
+ sendRequest(collectionFlag, errorInfo, metricMap);
+ return true;
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ LOG.error("Failed upload metric to Alimonitor", e);
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java
new file mode 100644
index 0000000..58e4e7d
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/DefaultMetricUploader.java
@@ -0,0 +1,71 @@
+package com.alibaba.jstorm.daemon.nimbus.metric.uploader;
+
+import backtype.storm.generated.TopologyMetric;
+import com.alibaba.jstorm.daemon.nimbus.NimbusData;
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class DefaultMetricUploader implements MetricUploader {
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+ protected NimbusData nimbusData;
+ protected TopologyMetricsRunnable metricsRunnable;
+
+ public DefaultMetricUploader() {
+ }
+
+ @Override
+ public void init(NimbusData nimbusData) throws Exception {
+ this.nimbusData = nimbusData;
+ this.metricsRunnable = nimbusData.getMetricRunnable();
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public boolean registerMetrics(String clusterName, String topologyId,
+ Map<String, Long> metrics) {
+ if (metrics.size() > 0) {
+ logger.info("register metrics, topology:{}, total:{}", topologyId, metrics.size());
+ }
+ return true;
+ }
+
+ @Override
+ public boolean upload(String clusterName, String topologyId, TopologyMetric tpMetric, Map<String, Object> metricContext) {
+ if (tpMetric == null) {
+ logger.info("No metric of {}", topologyId);
+ return true;
+ }
+
+ int totalSize = tpMetric.get_topologyMetric().get_metrics_size() +
+ tpMetric.get_componentMetric().get_metrics_size() +
+ tpMetric.get_taskMetric().get_metrics_size() +
+ tpMetric.get_streamMetric().get_metrics_size() +
+ tpMetric.get_workerMetric().get_metrics_size() +
+ tpMetric.get_nettyMetric().get_metrics_size();
+
+ logger.info("send metrics, cluster:{}, topology:{}, metric size:{}, metricContext:{}",
+ clusterName, topologyId, totalSize, metricContext);
+
+ return true;
+ }
+
+ @Override
+ public boolean upload(String clusterName, String topologyId, Object key, Map<String, Object> metricContext) {
+ metricsRunnable.markUploaded((Integer) key);
+ return true;
+ }
+
+
+ @Override
+ public boolean sendEvent(String clusterName, Event event) {
+ logger.info("Successfully sendEvent {} of {}", event, clusterName);
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java
new file mode 100644
index 0000000..9b7c745
--- /dev/null
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/nimbus/metric/uploader/MetricUploader.java
@@ -0,0 +1,46 @@
+package com.alibaba.jstorm.daemon.nimbus.metric.uploader;
+
+import backtype.storm.generated.TopologyMetric;
+import com.alibaba.jstorm.daemon.nimbus.NimbusData;
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
+
+import java.util.Map;
+
+public interface MetricUploader {
+ /**
+ * Set NimbusData to MetricUploader
+ */
+ void init(NimbusData nimbusData) throws Exception;
+
+ void cleanup();
+
+ /**
+ * register metrics to external metric plugin
+ */
+ boolean registerMetrics(String clusterName, String topologyId,
+ Map<String, Long> metrics) throws Exception;
+
+ String METRIC_TYPE = "metric.type";
+ String METRIC_TYPE_TOPLOGY = "TP";
+ String METRIC_TYPE_TASK = "TASK";
+ String METRIC_TYPE_ALL = "ALL";
+ String METRIC_TIME = "metric.timestamp";
+
+ /**
+ * upload topologyMetric to external metric plugin (such as database plugin)
+ *
+ * @return true means success, false means failure
+ */
+ boolean upload(String clusterName, String topologyId, TopologyMetric tpMetric, Map<String, Object> metricContext);
+
+ /**
+ * upload metrics with given key and metric context. the implementation can retrieve metric data from rocks db
+ * in the handler thread, which is kind of lazy-init, making it more GC-friendly
+ */
+ boolean upload(String clusterName, String topologyId, Object key, Map<String, Object> metricContext);
+
+ /**
+ * Send an event to underlying handler
+ */
+ boolean sendEvent(String clusterName, TopologyMetricsRunnable.Event event);
+}