You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/04/05 22:33:58 UTC
[02/14] storm git commit: STORM-2416 Release Packaging Improvements
http://git-wip-us.apache.org/repos/asf/storm/blob/2a369e40/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
----------------------------------------------------------------------
diff --git a/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
deleted file mode 100755
index a934120..0000000
--- a/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-
-package org.apache.storm.perf.utils;
-
-import org.apache.storm.LocalCluster;
-import org.apache.storm.generated.ClusterSummary;
-import org.apache.storm.generated.ExecutorSpecificStats;
-import org.apache.storm.generated.ExecutorStats;
-import org.apache.storm.generated.ExecutorSummary;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.generated.SpoutStats;
-import org.apache.storm.generated.TopologyInfo;
-import org.apache.storm.generated.TopologySummary;
-import org.apache.storm.utils.Utils;
-
-import java.util.List;
-import java.util.Map;
-
-public class MetricsSample {
-
- private long sampleTime = -1;
- private long totalTransferred = 0l;
- private long totalEmitted = 0l;
- private long totalAcked = 0l;
- private long totalFailed = 0l;
-
- private double totalLatency;
-
- private long spoutEmitted = 0l;
- private long spoutTransferred = 0l;
- private int spoutExecutors = 0;
-
- private int numSupervisors = 0;
- private int numWorkers = 0;
- private int numTasks = 0;
- private int numExecutors = 0;
-
- private int totalSlots = 0;
- private int usedSlots = 0;
-
- public static MetricsSample factory(Nimbus.Client client, String topologyName) throws Exception {
- // "************ Sampling Metrics *****************
-
- ClusterSummary clusterSummary = client.getClusterInfo();
- // get topology info
- TopologySummary topSummary = getTopologySummary(clusterSummary, topologyName);
- int topologyExecutors = topSummary.get_num_executors();
- int topologyWorkers = topSummary.get_num_workers();
- int topologyTasks = topSummary.get_num_tasks();
- TopologyInfo topInfo = client.getTopologyInfo(topSummary.get_id());
-
- MetricsSample sample = getMetricsSample( topInfo);
- sample.numWorkers = topologyWorkers;
- sample.numExecutors = topologyExecutors;
- sample.numTasks = topologyTasks;
- return sample;
- }
-
- public static MetricsSample factory(LocalCluster localCluster, String topologyName) throws Exception {
- TopologyInfo topologyInfo = localCluster.getTopologyInfo(topologyName);;
- return getMetricsSample(topologyInfo);
- }
-
-
- private static MetricsSample getMetricsSample(TopologyInfo topInfo) {
- List<ExecutorSummary> executorSummaries = topInfo.get_executors();
-
- // totals
- long totalTransferred = 0l;
- long totalEmitted = 0l;
- long totalAcked = 0l;
- long totalFailed = 0l;
-
- // number of spout executors
- int spoutExecCount = 0;
- double spoutLatencySum = 0.0;
-
- long spoutEmitted = 0l;
- long spoutTransferred = 0l;
-
- // Executor summaries
- for(ExecutorSummary executorSummary : executorSummaries){
- ExecutorStats execuatorStats = executorSummary.get_stats();
- if(execuatorStats == null){
- continue;
- }
-
- ExecutorSpecificStats executorSpecificStats = execuatorStats.get_specific();
- if(executorSpecificStats == null){
- // bail out
- continue;
- }
-
- // transferred totals
- Map<String,Map<String,Long>> transferred = execuatorStats.get_transferred();
- Map<String, Long> txMap = transferred.get(":all-time");
- if(txMap == null){
- continue;
- }
- for(String key : txMap.keySet()){
- // todo, ignore the master batch coordinator ?
- if(!Utils.isSystemId(key)){
- Long count = txMap.get(key);
- totalTransferred += count;
- if(executorSpecificStats.is_set_spout()){
- spoutTransferred += count;
- }
- }
- }
-
- // we found a spout
- if(executorSpecificStats.isSet(2)) { // spout
-
- SpoutStats spoutStats = executorSpecificStats.get_spout();
- Map<String, Long> acked = spoutStats.get_acked().get(":all-time");
- if(acked != null){
- for(String key : acked.keySet()) {
- totalAcked += acked.get(key);
- }
- }
-
- Map<String, Long> failed = spoutStats.get_failed().get(":all-time");
- if(failed != null){
- for(String key : failed.keySet()) {
- totalFailed += failed.get(key);
- }
- }
-
- Double total = 0d;
- Map<String, Double> vals = spoutStats.get_complete_ms_avg().get(":all-time");
- for(String key : vals.keySet()){
- total += vals.get(key);
- }
- Double latency = total / vals.size();
-
- spoutExecCount++;
- spoutLatencySum += latency;
- }
-
-
- } // end executor summary
-
- MetricsSample ret = new MetricsSample();
- ret.totalEmitted = totalEmitted;
- ret.totalTransferred = totalTransferred;
- ret.totalAcked = totalAcked;
- ret.totalFailed = totalFailed;
- ret.totalLatency = spoutLatencySum/spoutExecCount;
- ret.spoutEmitted = spoutEmitted;
- ret.spoutTransferred = spoutTransferred;
- ret.sampleTime = System.currentTimeMillis();
-// ret.numSupervisors = clusterSummary.get_supervisors_size();
- ret.numWorkers = 0;
- ret.numExecutors = 0;
- ret.numTasks = 0;
- ret.spoutExecutors = spoutExecCount;
- return ret;
- }
-
- public static TopologySummary getTopologySummary(ClusterSummary cs, String name) {
- for (TopologySummary ts : cs.get_topologies()) {
- if (name.equals(ts.get_name())) {
- return ts;
- }
- }
- return null;
- }
-
-
-
- // getters
- public long getSampleTime() {
- return sampleTime;
- }
-
- public long getTotalTransferred() {
- return totalTransferred;
- }
-
- public long getTotalEmitted() {
- return totalEmitted;
- }
-
- public long getTotalAcked() {
- return totalAcked;
- }
-
- public long getTotalFailed() {
- return totalFailed;
- }
-
- public double getTotalLatency() {
- return totalLatency;
- }
-
- public long getSpoutEmitted() {
- return spoutEmitted;
- }
-
- public long getSpoutTransferred() {
- return spoutTransferred;
- }
-
- public int getNumSupervisors() {
- return numSupervisors;
- }
-
- public int getNumWorkers() {
- return numWorkers;
- }
-
- public int getNumTasks() {
- return numTasks;
- }
-
- public int getTotalSlots() {
- return totalSlots;
- }
-
- public int getSpoutExecutors(){
- return this.spoutExecutors;
- }
-
- public int getNumExecutors() {
- return this.numExecutors;
- }
-
- public int getUsedSlots() {
- return this.usedSlots;
- }
-
-}
\ No newline at end of file