You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by GitBox <gi...@apache.org> on 2019/01/09 10:22:37 UTC

[GitHub] GabrielBrascher closed pull request #3078: Add influxdb to statscollector

GabrielBrascher closed pull request #3078: Add influxdb to statscollector
URL: https://github.com/apache/cloudstack/pull/3078
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/pom.xml b/core/pom.xml
index e95a72aab5b..a3ef70aeda2 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -37,6 +37,11 @@
             <artifactId>cloud-engine-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.cloudstack</groupId>
+            <artifactId>cloud-engine-schema</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.cloudstack</groupId>
             <artifactId>cloud-framework-security</artifactId>
diff --git a/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java b/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java
index 82041a604e8..5fd1c515b6e 100644
--- a/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java
+++ b/core/src/main/java/com/cloud/agent/api/HostStatsEntry.java
@@ -20,10 +20,12 @@
 package com.cloud.agent.api;
 
 import com.cloud.host.HostStats;
+import com.cloud.host.HostVO;
 
 public class HostStatsEntry implements HostStats {
 
     long hostId;
+    HostVO hostVo;
     String entityType;
     double cpuUtilization;
     double networkReadKBs;
@@ -112,4 +114,16 @@ public HostStats getHostStats() {
     public void setHostId(long hostId) {
         this.hostId = hostId;
     }
+
+    public long getHostId() {
+        return hostId;
+    }
+
+    public HostVO getHostVo() {
+        return hostVo;
+    }
+
+    public void setHostVo(HostVO hostVo) {
+        this.hostVo = hostVo;
+    }
 }
diff --git a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java b/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
index e6063b9ab5b..9f8280898ee 100644
--- a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
+++ b/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
@@ -19,22 +19,25 @@
 
 package com.cloud.agent.api;
 
+import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VmStats;
 
 public class VmStatsEntry implements VmStats {
 
-    double cpuUtilization;
-    double networkReadKBs;
-    double networkWriteKBs;
-    double diskReadIOs;
-    double diskWriteIOs;
-    double diskReadKBs;
-    double diskWriteKBs;
-    double memoryKBs;
-    double intfreememoryKBs;
-    double targetmemoryKBs;
-    int numCPUs;
-    String entityType;
+    private long vmId;
+    private UserVmVO userVmVO;
+    private double cpuUtilization;
+    private double networkReadKBs;
+    private double networkWriteKBs;
+    private double diskReadIOs;
+    private double diskWriteIOs;
+    private double diskReadKBs;
+    private double diskWriteKBs;
+    private double memoryKBs;
+    private double intfreememoryKBs;
+    private double targetmemoryKBs;
+    private int numCPUs;
+    private String entityType;
 
     public VmStatsEntry() {
     }
@@ -50,14 +53,12 @@ public VmStatsEntry(double memoryKBs,double intfreememoryKBs,double targetmemory
         this.entityType = entityType;
     }
 
-    public VmStatsEntry(double cpuUtilization, double networkReadKBs, double networkWriteKBs, double diskReadKBs, double diskWriteKBs, int numCPUs, String entityType) {
-        this.cpuUtilization = cpuUtilization;
-        this.networkReadKBs = networkReadKBs;
-        this.networkWriteKBs = networkWriteKBs;
-        this.diskReadKBs = diskReadKBs;
-        this.diskWriteKBs = diskWriteKBs;
-        this.numCPUs = numCPUs;
-        this.entityType = entityType;
+    public long getVmId() {
+        return vmId;
+    }
+
+    public void setVmId(long vmId) {
+        this.vmId = vmId;
     }
 
     @Override
@@ -166,4 +167,12 @@ public void setEntityType(String entityType) {
         this.entityType = entityType;
     }
 
+    public UserVmVO getUserVmVO() {
+        return userVmVO;
+    }
+
+    public void setUserVmVO(UserVmVO userVmVO) {
+        this.userVmVO = userVmVO;
+    }
+
 }
diff --git a/server/pom.xml b/server/pom.xml
index e2461508932..19873835bd6 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -157,6 +157,11 @@
             <groupId>org.opensaml</groupId>
             <artifactId>opensaml</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.influxdb</groupId>
+            <artifactId>influxdb-java</artifactId>
+            <version>2.8</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/server/src/main/java/com/cloud/server/StatsCollector.java b/server/src/main/java/com/cloud/server/StatsCollector.java
index b66fa5f0600..8e2bc7e12a8 100644
--- a/server/src/main/java/com/cloud/server/StatsCollector.java
+++ b/server/src/main/java/com/cloud/server/StatsCollector.java
@@ -20,6 +20,7 @@
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Calendar;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -46,8 +47,15 @@
 import org.apache.cloudstack.utils.graphite.GraphiteClient;
 import org.apache.cloudstack.utils.graphite.GraphiteException;
 import org.apache.cloudstack.utils.usage.UsageUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.influxdb.dto.Pong;
 import org.springframework.stereotype.Component;
 
 import com.cloud.agent.AgentManager;
@@ -121,6 +129,7 @@
 import com.cloud.utils.db.Transaction;
 import com.cloud.utils.db.TransactionCallbackNoReturn;
 import com.cloud.utils.db.TransactionStatus;
+import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.net.MacAddress;
 import com.cloud.vm.NicVO;
 import com.cloud.vm.UserVmManager;
@@ -140,7 +149,7 @@
 public class StatsCollector extends ManagerBase implements ComponentMethodInterceptable, Configurable {
 
     public static enum ExternalStatsProtocol {
-        NONE("none"), GRAPHITE("graphite");
+        NONE("none"), GRAPHITE("graphite"), INFLUXDB("influxdb");
         String _type;
 
         ExternalStatsProtocol(String type) {
@@ -155,16 +164,52 @@ public String toString() {
 
     public static final Logger s_logger = Logger.getLogger(StatsCollector.class.getName());
 
-    static final ConfigKey<Integer> vmDiskStatsInterval = new ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval", "0",
+    private static final int UNDEFINED_PORT_VALUE = -1;
+
+    /**
+     * Default value for the Graphite connection port: {@value}
+     */
+    private static final int GRAPHITE_DEFAULT_PORT = 2003;
+
+    /**
+     * Default value for the InfluxDB connection port: {@value}
+     */
+    private static final int INFLUXDB_DEFAULT_PORT = 8086;
+
+    private static final String UUID_TAG = "uuid";
+
+    private static final String TOTAL_MEMORY_KBS_FIELD = "total_memory_kb";
+    private static final String FREE_MEMORY_KBS_FIELD = "free_memory_kb";
+    private static final String CPU_UTILIZATION_FIELD = "cpu_utilization";
+    private static final String CPUS_FIELD = "cpus";
+    private static final String CPU_SOCKETS_FIELD = "cpu_sockets";
+    private static final String NETWORK_READ_KBS_FIELD = "network_read_kbs";
+    private static final String NETWORK_WRITE_KBS_FIELD = "network_write_kbs";
+    private static final String MEMORY_TARGET_KBS_FIELD = "memory_target_kbs";
+    private static final String DISK_READ_IOPS_FIELD = "disk_read_iops";
+    private static final String DISK_READ_KBS_FIELD = "disk_read_kbs";
+    private static final String DISK_WRITE_IOPS_FIELD = "disk_write_iops";
+    private static final String DISK_WRITE_KBS_FIELD = "disk_write_kbs";
+
+    private static final String DEFAULT_DATABASE_NAME = "cloudstack";
+    private static final String INFLUXDB_HOST_MEASUREMENT = "host_stats";
+    private static final String INFLUXDB_VM_MEASUREMENT = "vm_stats";
+
+    private static final ConfigKey<Integer> vmDiskStatsInterval = new ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval", "0",
             "Interval (in seconds) to report vm disk statistics. Vm disk statistics will be disabled if this is set to 0 or less than 0.", false);
-    static final ConfigKey<Integer> vmDiskStatsIntervalMin = new ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval.min", "300",
+    private static final ConfigKey<Integer> vmDiskStatsIntervalMin = new ConfigKey<Integer>("Advanced", Integer.class, "vm.disk.stats.interval.min", "300",
             "Minimal interval (in seconds) to report vm disk statistics. If vm.disk.stats.interval is smaller than this, use this to report vm disk statistics.", false);
-    static final ConfigKey<Integer> vmNetworkStatsInterval = new ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval", "0",
+    private static final ConfigKey<Integer> vmNetworkStatsInterval = new ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval", "0",
             "Interval (in seconds) to report vm network statistics (for Shared networks). Vm network statistics will be disabled if this is set to 0 or less than 0.", false);
-    static final ConfigKey<Integer> vmNetworkStatsIntervalMin = new ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval.min", "300",
-            "Minimal Interval (in seconds) to report vm network statistics (for Shared networks). If vm.network.stats.interval is smaller than this, use this to report vm network statistics.", false);
-    static final ConfigKey<Integer> StatsTimeout = new ConfigKey<Integer>("Advanced", Integer.class, "stats.timeout", "60000",
-            "The timeout for stats call in milli seconds.", true, ConfigKey.Scope.Cluster);
+    private static final ConfigKey<Integer> vmNetworkStatsIntervalMin = new ConfigKey<Integer>("Advanced", Integer.class, "vm.network.stats.interval.min", "300",
+            "Minimal Interval (in seconds) to report vm network statistics (for Shared networks). If vm.network.stats.interval is smaller than this, use this to report vm network statistics.",
+            false);
+    private static final ConfigKey<Integer> StatsTimeout = new ConfigKey<Integer>("Advanced", Integer.class, "stats.timeout", "60000",
+            "The timeout for stats call in milli seconds.", true,
+            ConfigKey.Scope.Cluster);
+    private static final ConfigKey<String> statsOutputUri = new ConfigKey<String>("Advanced", String.class, "stats.output.uri", "",
+            "URI to send StatsCollector statistics to. The collector is defined on the URI scheme. Example: graphite://graphite-hostaddress:port or influxdb://influxdb-hostaddress/dbname. Note that the port is optional, if not added the default port for the respective collector (graphite or influxdb) will be used. Additionally, the database name '/dbname' is  also optional; default db name is 'cloudstack'. You must create and configure the database if using influxdb.",
+            true);
 
     private static StatsCollector s_instance = null;
 
@@ -236,20 +281,20 @@ public String toString() {
     private ConcurrentHashMap<Long, StorageStats> _storageStats = new ConcurrentHashMap<Long, StorageStats>();
     private ConcurrentHashMap<Long, StorageStats> _storagePoolStats = new ConcurrentHashMap<Long, StorageStats>();
 
-    long hostStatsInterval = -1L;
-    long hostAndVmStatsInterval = -1L;
-    long storageStatsInterval = -1L;
-    long volumeStatsInterval = -1L;
-    long autoScaleStatsInterval = -1L;
+    private long hostStatsInterval = -1L;
+    private long hostAndVmStatsInterval = -1L;
+    private long storageStatsInterval = -1L;
+    private long volumeStatsInterval = -1L;
+    private long autoScaleStatsInterval = -1L;
 
-    List<Long> hostIds = null;
     private double _imageStoreCapacityThreshold = 0.90;
 
-    String externalStatsPrefix = "";
+    private String externalStatsPrefix = "";
     String externalStatsHost = null;
     int externalStatsPort = -1;
-    boolean externalStatsEnabled = false;
+    private String externalStatsScheme;
     ExternalStatsProtocol externalStatsType = ExternalStatsProtocol.NONE;
+    private String databaseName = DEFAULT_DATABASE_NAME;
 
     private ScheduledExecutorService _diskStatsUpdateExecutor;
     private int _usageAggregationRange = 1440;
@@ -279,7 +324,7 @@ public boolean start() {
         return true;
     }
 
-    private void init(Map<String, String> configs) {
+    protected void init(Map<String, String> configs) {
         _executor = Executors.newScheduledThreadPool(6, new NamedThreadFactory("StatsCollector"));
 
         hostStatsInterval = NumbersUtil.parseLong(configs.get("host.stats.interval"), 60000L);
@@ -288,24 +333,25 @@ private void init(Map<String, String> configs) {
         volumeStatsInterval = NumbersUtil.parseLong(configs.get("volume.stats.interval"), 600000L);
         autoScaleStatsInterval = NumbersUtil.parseLong(configs.get("autoscale.stats.interval"), 60000L);
 
-        /* URI to send statistics to. Currently only Graphite is supported */
-        String externalStatsUri = configs.get("stats.output.uri");
-        if (externalStatsUri != null && !externalStatsUri.equals("")) {
+        String statsUri = statsOutputUri.value();
+        if (StringUtils.isNotBlank(statsUri)) {
             try {
-                URI uri = new URI(externalStatsUri);
-                String scheme = uri.getScheme();
+                URI uri = new URI(statsUri);
+                externalStatsScheme = uri.getScheme();
 
                 try {
-                    externalStatsType = ExternalStatsProtocol.valueOf(scheme.toUpperCase());
+                    externalStatsType = ExternalStatsProtocol.valueOf(externalStatsScheme.toUpperCase());
                 } catch (IllegalArgumentException e) {
-                    s_logger.info(scheme + " is not a valid protocol for external statistics. No statistics will be send.");
+                    s_logger.error(externalStatsScheme + " is not a valid protocol for external statistics. No statistics will be send.");
                 }
 
-                if (!StringUtils.isEmpty(uri.getHost())) {
+                if (StringUtils.isNotEmpty(uri.getHost())) {
                     externalStatsHost = uri.getHost();
                 }
 
-                externalStatsPort = uri.getPort();
+                externalStatsPort = retrieveExternalStatsPortFromUri(uri);
+
+                databaseName = configureDatabaseName(uri);
 
                 if (!StringUtils.isEmpty(uri.getPath())) {
                     externalStatsPrefix = uri.getPath().substring(1);
@@ -318,9 +364,8 @@ private void init(Map<String, String> configs) {
                     externalStatsPrefix = "";
                 }
 
-                externalStatsEnabled = true;
             } catch (URISyntaxException e) {
-                s_logger.debug("Failed to parse external statistics URI: " + e.getMessage());
+                s_logger.error("Failed to parse external statistics URI: ", e);
             }
         }
 
@@ -342,7 +387,8 @@ private void init(Map<String, String> configs) {
 
         if (vmDiskStatsInterval.value() > 0) {
             if (vmDiskStatsInterval.value() < vmDiskStatsIntervalMin.value()) {
-                s_logger.debug("vm.disk.stats.interval - " + vmDiskStatsInterval.value() + " is smaller than vm.disk.stats.interval.min - " + vmDiskStatsIntervalMin.value() + ", so use vm.disk.stats.interval.min");
+                s_logger.debug("vm.disk.stats.interval - " + vmDiskStatsInterval.value() + " is smaller than vm.disk.stats.interval.min - " + vmDiskStatsIntervalMin.value()
+                        + ", so use vm.disk.stats.interval.min");
                 _executor.scheduleAtFixedRate(new VmDiskStatsTask(), vmDiskStatsIntervalMin.value(), vmDiskStatsIntervalMin.value(), TimeUnit.SECONDS);
             } else {
                 _executor.scheduleAtFixedRate(new VmDiskStatsTask(), vmDiskStatsInterval.value(), vmDiskStatsInterval.value(), TimeUnit.SECONDS);
@@ -353,7 +399,8 @@ private void init(Map<String, String> configs) {
 
         if (vmNetworkStatsInterval.value() > 0) {
             if (vmNetworkStatsInterval.value() < vmNetworkStatsIntervalMin.value()) {
-                s_logger.debug("vm.network.stats.interval - " + vmNetworkStatsInterval.value() + " is smaller than vm.network.stats.interval.min - " + vmNetworkStatsIntervalMin.value() + ", so use vm.network.stats.interval.min");
+                s_logger.debug("vm.network.stats.interval - " + vmNetworkStatsInterval.value() + " is smaller than vm.network.stats.interval.min - "
+                        + vmNetworkStatsIntervalMin.value() + ", so use vm.network.stats.interval.min");
                 _executor.scheduleAtFixedRate(new VmNetworkStatsTask(), vmNetworkStatsIntervalMin.value(), vmNetworkStatsIntervalMin.value(), TimeUnit.SECONDS);
             } else {
                 _executor.scheduleAtFixedRate(new VmNetworkStatsTask(), vmNetworkStatsInterval.value(), vmNetworkStatsInterval.value(), TimeUnit.SECONDS);
@@ -411,82 +458,111 @@ private void init(Map<String, String> configs) {
 
     }
 
-    class HostCollector extends ManagedContextRunnable {
+    /**
+     * Configures the database name according to the URI path. For instance, if the URI is as influxdb://address:port/dbname, the database name will be 'dbname'.
+     */
+    protected String configureDatabaseName(URI uri) {
+        String dbname = StringUtils.removeStart(uri.getPath(), "/");
+        if (StringUtils.isBlank(dbname)) {
+            return DEFAULT_DATABASE_NAME;
+        } else {
+            return dbname;
+        }
+    }
+
+    /**
+     * Configures the port to be used when connecting with the stats collector service.
+     * Default values are 8086 for influx DB and 2003 for GraphiteDB.
+     * Throws URISyntaxException in case of non configured port and external StatsType
+     */
+    protected int retrieveExternalStatsPortFromUri(URI uri) throws URISyntaxException {
+        int port = uri.getPort();
+        if (externalStatsType != ExternalStatsProtocol.NONE) {
+            if (port != UNDEFINED_PORT_VALUE) {
+                return port;
+            }
+            if (externalStatsType == ExternalStatsProtocol.GRAPHITE) {
+                return GRAPHITE_DEFAULT_PORT;
+            }
+            if (externalStatsType == ExternalStatsProtocol.INFLUXDB) {
+                return INFLUXDB_DEFAULT_PORT;
+            }
+        }
+        throw new URISyntaxException(uri.toString(), String.format(
+                "Cannot define a port for the Stats Collector host %s://%s:%s or URI scheme is incorrect. The configured URI in stats.output.uri is not supported. Please configure as the following examples: graphite://graphite-hostaddress:port, or influxdb://influxdb-hostaddress:port. Note that the port is optional, if not added the default port for the respective collector (graphite or influxdb) will be used.",
+                externalStatsPrefix, externalStatsHost, externalStatsPort));
+    }
+
+    class HostCollector extends AbstractStatsCollector {
         @Override
         protected void runInContext() {
             try {
                 s_logger.debug("HostStatsCollector is running...");
 
-                SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
-                sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
-                sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance);
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.Storage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.ConsoleProxy.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.SecondaryStorage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.LocalSecondaryStorage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.TrafficMonitor.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.SecondaryStorageVM.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.ExternalFirewall.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.ExternalLoadBalancer.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.NetScalerControlCenter.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.L2Networking.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.BaremetalDhcp.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.BaremetalPxe.toString());
-                ConcurrentHashMap<Long, HostStats> hostStats = new ConcurrentHashMap<Long, HostStats>();
+                SearchCriteria<HostVO> sc = createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance();
+
+                Map<Object, Object> metrics = new HashMap<>();
                 List<HostVO> hosts = _hostDao.search(sc, null);
+
                 for (HostVO host : hosts) {
-                    HostStatsEntry stats = (HostStatsEntry)_resourceMgr.getHostStatistics(host.getId());
-                    if (stats != null) {
-                        hostStats.put(host.getId(), stats);
+                    HostStatsEntry hostStatsEntry = (HostStatsEntry)_resourceMgr.getHostStatistics(host.getId());
+                    if (hostStatsEntry != null) {
+                        hostStatsEntry.setHostVo(host);
+                        metrics.put(hostStatsEntry.getHostId(), hostStatsEntry);
+                        _hostStats.put(host.getId(), hostStatsEntry);
                     } else {
-                        s_logger.warn("Received invalid host stats for host: " + host.getId());
+                        s_logger.warn("The Host stats is null for host: " + host.getId());
                     }
                 }
-                _hostStats = hostStats;
-                // Get a subset of hosts with GPU support from the list of "hosts"
-                List<HostVO> gpuEnabledHosts = new ArrayList<HostVO>();
-                if (hostIds != null) {
-                    for (HostVO host : hosts) {
-                        if (hostIds.contains(host.getId())) {
-                            gpuEnabledHosts.add(host);
-                        }
-                    }
-                } else {
-                    // Check for all the hosts managed by CloudStack.
-                    gpuEnabledHosts = hosts;
-                }
-                for (HostVO host : gpuEnabledHosts) {
-                    HashMap<String, HashMap<String, VgpuTypesInfo>> groupDetails = _resourceMgr.getGPUStatistics(host);
-                    if (groupDetails != null) {
-                        _resourceMgr.updateGPUDetails(host.getId(), groupDetails);
-                    }
+
+                if (externalStatsType == ExternalStatsProtocol.INFLUXDB) {
+                    sendMetricsToInfluxdb(metrics);
                 }
-                hostIds = _hostGpuGroupsDao.listHostIds();
+
+                updateGpuEnabledHostsDetails(hosts);
             } catch (Throwable t) {
                 s_logger.error("Error trying to retrieve host stats", t);
             }
         }
+
+        /**
+         * Updates GPU details on hosts supporting GPU.
+         */
+        private void updateGpuEnabledHostsDetails(List<HostVO> hosts) {
+            List<HostVO> gpuEnabledHosts = new ArrayList<HostVO>();
+            List<Long> hostIds = _hostGpuGroupsDao.listHostIds();
+            if (CollectionUtils.isEmpty(hostIds)) {
+                return;
+            }
+            for (HostVO host : hosts) {
+                if (hostIds.contains(host.getId())) {
+                    gpuEnabledHosts.add(host);
+                }
+            }
+            for (HostVO host : gpuEnabledHosts) {
+                HashMap<String, HashMap<String, VgpuTypesInfo>> groupDetails = _resourceMgr.getGPUStatistics(host);
+                if (MapUtils.isEmpty(groupDetails)) {
+                    _resourceMgr.updateGPUDetails(host.getId(), groupDetails);
+                }
+            }
+        }
+
+        @Override
+        protected Point creteInfluxDbPoint(Object metricsObject) {
+            return createInfluxDbPointForHostMetrics(metricsObject);
+        }
     }
 
-    class VmStatsCollector extends ManagedContextRunnable {
+    class VmStatsCollector extends AbstractStatsCollector {
         @Override
         protected void runInContext() {
             try {
                 s_logger.trace("VmStatsCollector is running...");
 
-                SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
-                sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
-                sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance);
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.Storage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.ConsoleProxy.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.SecondaryStorage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.LocalSecondaryStorage.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.TrafficMonitor.toString());
-                sc.addAnd("type", SearchCriteria.Op.NEQ, Host.Type.SecondaryStorageVM.toString());
+                SearchCriteria<HostVO> sc = createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance();
                 List<HostVO> hosts = _hostDao.search(sc, null);
 
-                /* HashMap for metrics to be send to Graphite */
-                HashMap metrics = new HashMap<String, Integer>();
+                Map<Object, Object> metrics = new HashMap<>();
 
                 for (HostVO host : hosts) {
                     List<UserVmVO> vms = _userVmDao.listRunningByHostId(host.getId());
@@ -497,86 +573,35 @@ protected void runInContext() {
                     }
 
                     try {
-                        HashMap<Long, VmStatsEntry> vmStatsById = _userVmMgr.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds);
+                        Map<Long, VmStatsEntry> vmStatsById = _userVmMgr.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds);
 
                         if (vmStatsById != null) {
-                            VmStatsEntry statsInMemory = null;
-
                             Set<Long> vmIdSet = vmStatsById.keySet();
                             for (Long vmId : vmIdSet) {
                                 VmStatsEntry statsForCurrentIteration = vmStatsById.get(vmId);
-                                statsInMemory = (VmStatsEntry)_VmStats.get(vmId);
+                                statsForCurrentIteration.setVmId(vmId);
+                                UserVmVO userVmVo = _userVmDao.findById(vmId);
+                                statsForCurrentIteration.setUserVmVO(userVmVo);
 
-                                if (statsInMemory == null) {
-                                    //no stats exist for this vm, directly persist
-                                    _VmStats.put(vmId, statsForCurrentIteration);
-                                } else {
-                                    //update each field
-                                    statsInMemory.setCPUUtilization(statsForCurrentIteration.getCPUUtilization());
-                                    statsInMemory.setNumCPUs(statsForCurrentIteration.getNumCPUs());
-                                    statsInMemory.setNetworkReadKBs(statsInMemory.getNetworkReadKBs() + statsForCurrentIteration.getNetworkReadKBs());
-                                    statsInMemory.setNetworkWriteKBs(statsInMemory.getNetworkWriteKBs() + statsForCurrentIteration.getNetworkWriteKBs());
-                                    statsInMemory.setDiskWriteKBs(statsInMemory.getDiskWriteKBs() + statsForCurrentIteration.getDiskWriteKBs());
-                                    statsInMemory.setDiskReadIOs(statsInMemory.getDiskReadIOs() + statsForCurrentIteration.getDiskReadIOs());
-                                    statsInMemory.setDiskWriteIOs(statsInMemory.getDiskWriteIOs() + statsForCurrentIteration.getDiskWriteIOs());
-                                    statsInMemory.setDiskReadKBs(statsInMemory.getDiskReadKBs() + statsForCurrentIteration.getDiskReadKBs());
-                                    statsInMemory.setMemoryKBs(statsForCurrentIteration.getMemoryKBs());
-                                    statsInMemory.setIntFreeMemoryKBs(statsForCurrentIteration.getIntFreeMemoryKBs());
-                                    statsInMemory.setTargetMemoryKBs(statsForCurrentIteration.getTargetMemoryKBs());
-
-                                    _VmStats.put(vmId, statsInMemory);
-                                }
-
-                                /**
-                                 * Add statistics to HashMap only when they should be send to a external stats collector
-                                 * Performance wise it seems best to only append to the HashMap when needed
-                                 */
-                                if (externalStatsEnabled) {
-                                    VMInstanceVO vmVO = _vmInstance.findById(vmId);
-                                    String vmName = vmVO.getUuid();
-
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".cpu.num", statsForCurrentIteration.getNumCPUs());
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".cpu.utilization", statsForCurrentIteration.getCPUUtilization());
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".network.read_kbs", statsForCurrentIteration.getNetworkReadKBs());
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".network.write_kbs", statsForCurrentIteration.getNetworkWriteKBs());
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.write_kbs", statsForCurrentIteration.getDiskWriteKBs());
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.read_kbs", statsForCurrentIteration.getDiskReadKBs());
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.write_iops", statsForCurrentIteration.getDiskWriteIOs());
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.read_iops", statsForCurrentIteration.getDiskReadIOs());
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.total_kbs", statsForCurrentIteration.getMemoryKBs());
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.internalfree_kbs", statsForCurrentIteration.getIntFreeMemoryKBs());
-                                    metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.target_kbs", statsForCurrentIteration.getTargetMemoryKBs());
+                                storeVirtualMachineStatsInMemory(statsForCurrentIteration);
 
+                                if (externalStatsType == ExternalStatsProtocol.GRAPHITE) {
+                                    prepareVmMetricsForGraphite(metrics, statsForCurrentIteration);
+                                } else {
+                                    metrics.put(statsForCurrentIteration.getVmId(), statsForCurrentIteration);
                                 }
-
                             }
 
-                            /**
-                             * Send the metrics to a external stats collector
-                             * We send it on a per-host basis to prevent that we flood the host
-                             * Currently only Graphite is supported
-                             */
                             if (!metrics.isEmpty()) {
-                                if (externalStatsType != null && externalStatsType == ExternalStatsProtocol.GRAPHITE) {
-
-                                    if (externalStatsPort == -1) {
-                                        externalStatsPort = 2003;
-                                    }
-
-                                    s_logger.debug("Sending VmStats of host " + host.getId() + " to Graphite host " + externalStatsHost + ":" + externalStatsPort);
-
-                                    try {
-                                        GraphiteClient g = new GraphiteClient(externalStatsHost, externalStatsPort);
-                                        g.sendMetrics(metrics);
-                                    } catch (GraphiteException e) {
-                                        s_logger.debug("Failed sending VmStats to Graphite host " + externalStatsHost + ":" + externalStatsPort + ": " + e.getMessage());
-                                    }
-
-                                    metrics.clear();
+                                if (externalStatsType == ExternalStatsProtocol.GRAPHITE) {
+                                    sendVmMetricsToGraphiteHost(metrics, host);
+                                } else if (externalStatsType == ExternalStatsProtocol.INFLUXDB) {
+                                    sendMetricsToInfluxdb(metrics);
                                 }
                             }
-                        }
 
+                            metrics.clear();
+                        }
                     } catch (Exception e) {
                         s_logger.debug("Failed to get VM stats for host with ID: " + host.getId());
                         continue;
@@ -587,6 +612,11 @@ protected void runInContext() {
                 s_logger.error("Error trying to retrieve VM stats", t);
             }
         }
+
+        @Override
+        protected Point creteInfluxDbPoint(Object metricsObject) {
+            return createInfluxDbPointForVmMetrics(metricsObject);
+        }
     }
 
     public VmStats getVmStats(long id) {
@@ -646,7 +676,7 @@ protected void runInContext() {
             //Check for ownership
             //msHost in UP state with min id should run the job
             ManagementServerHostVO msHost = _msHostDao.findOneInUpState(new Filter(ManagementServerHostVO.class, "id", true, 0L, 1L));
-            if(msHost == null || (msHost.getMsid() != mgmtSrvrId)){
+            if (msHost == null || (msHost.getMsid() != mgmtSrvrId)) {
                 s_logger.debug("Skipping collect vm disk stats from hosts");
                 return;
             }
@@ -658,11 +688,7 @@ protected void runInContext() {
                     public void doInTransactionWithoutResult(TransactionStatus status) {
                         s_logger.debug("VmDiskStatsTask is running...");
 
-                        SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
-                        sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
-                        sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance,
-                                ResourceState.ErrorInMaintenance);
-                        sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.Routing.toString());
+                        SearchCriteria<HostVO> sc = createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance();
                         sc.addAnd("hypervisorType", SearchCriteria.Op.EQ, HypervisorType.KVM); // support KVM only util 2013.06.25
                         List<HostVO> hosts = _hostDao.search(sc, null);
 
@@ -689,67 +715,64 @@ public void doInTransactionWithoutResult(TransactionStatus status) {
                                     SearchCriteria<VolumeVO> sc_volume = _volsDao.createSearchCriteria();
                                     sc_volume.addAnd("path", SearchCriteria.Op.EQ, vmDiskStat.getPath());
                                     List<VolumeVO> volumes = _volsDao.search(sc_volume, null);
-                                    if ((volumes == null) || (volumes.size() == 0))
+
+                                    if (CollectionUtils.isEmpty(volumes))
                                         break;
+
                                     VolumeVO volume = volumes.get(0);
-                                    VmDiskStatisticsVO previousVmDiskStats =
-                                            _vmDiskStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), vmId, volume.getId());
+                                    VmDiskStatisticsVO previousVmDiskStats = _vmDiskStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), vmId, volume.getId());
                                     VmDiskStatisticsVO vmDiskStat_lock = _vmDiskStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), vmId, volume.getId());
 
-                                    if ((vmDiskStat.getBytesRead() == 0) && (vmDiskStat.getBytesWrite() == 0) && (vmDiskStat.getIORead() == 0) &&
-                                            (vmDiskStat.getIOWrite() == 0)) {
+                                    if (areAllDiskStatsZero(vmDiskStat)) {
                                         s_logger.debug("IO/bytes read and write are all 0. Not updating vm_disk_statistics");
                                         continue;
                                     }
 
                                     if (vmDiskStat_lock == null) {
-                                        s_logger.warn("unable to find vm disk stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId() +
-                                                " and volumeId:" + volume.getId());
+                                        s_logger.warn("unable to find vm disk stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId()
+                                                + " and volumeId:" + volume.getId());
                                         continue;
                                     }
 
-                                    if (previousVmDiskStats != null &&
-                                            ((previousVmDiskStats.getCurrentBytesRead() != vmDiskStat_lock.getCurrentBytesRead()) ||
-                                                    (previousVmDiskStats.getCurrentBytesWrite() != vmDiskStat_lock.getCurrentBytesWrite()) ||
-                                                    (previousVmDiskStats.getCurrentIORead() != vmDiskStat_lock.getCurrentIORead()) || (previousVmDiskStats.getCurrentIOWrite() != vmDiskStat_lock.getCurrentIOWrite()))) {
-                                        s_logger.debug("vm disk stats changed from the time GetVmDiskStatsCommand was sent. " + "Ignoring current answer. Host: " +
-                                                host.getName() + " . VM: " + vmDiskStat.getVmName() + " Read(Bytes): " + vmDiskStat.getBytesRead() + " write(Bytes): " +
-                                                vmDiskStat.getBytesWrite() + " Read(IO): " + vmDiskStat.getIORead() + " write(IO): " + vmDiskStat.getIOWrite());
+                                    if (isCurrentVmDiskStatsDifferentFromPrevious(previousVmDiskStats, vmDiskStat_lock)) {
+                                        s_logger.debug("vm disk stats changed from the time GetVmDiskStatsCommand was sent. " + "Ignoring current answer. Host: " + host.getName()
+                                                + " . VM: " + vmDiskStat.getVmName() + " Read(Bytes): " + vmDiskStat.getBytesRead() + " write(Bytes): " + vmDiskStat.getBytesWrite()
+                                                + " Read(IO): " + vmDiskStat.getIORead() + " write(IO): " + vmDiskStat.getIOWrite());
                                         continue;
                                     }
 
                                     if (vmDiskStat_lock.getCurrentBytesRead() > vmDiskStat.getBytesRead()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Read # of bytes that's less than the last one.  " +
-                                                    "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() +
-                                                    " Reported: " + vmDiskStat.getBytesRead() + " Stored: " + vmDiskStat_lock.getCurrentBytesRead());
+                                            s_logger.debug("Read # of bytes that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
+                                                    + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getBytesRead() + " Stored: "
+                                                    + vmDiskStat_lock.getCurrentBytesRead());
                                         }
                                         vmDiskStat_lock.setNetBytesRead(vmDiskStat_lock.getNetBytesRead() + vmDiskStat_lock.getCurrentBytesRead());
                                     }
                                     vmDiskStat_lock.setCurrentBytesRead(vmDiskStat.getBytesRead());
                                     if (vmDiskStat_lock.getCurrentBytesWrite() > vmDiskStat.getBytesWrite()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Write # of bytes that's less than the last one.  " +
-                                                    "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() +
-                                                    " Reported: " + vmDiskStat.getBytesWrite() + " Stored: " + vmDiskStat_lock.getCurrentBytesWrite());
+                                            s_logger.debug("Write # of bytes that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
+                                                    + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getBytesWrite() + " Stored: "
+                                                    + vmDiskStat_lock.getCurrentBytesWrite());
                                         }
                                         vmDiskStat_lock.setNetBytesWrite(vmDiskStat_lock.getNetBytesWrite() + vmDiskStat_lock.getCurrentBytesWrite());
                                     }
                                     vmDiskStat_lock.setCurrentBytesWrite(vmDiskStat.getBytesWrite());
                                     if (vmDiskStat_lock.getCurrentIORead() > vmDiskStat.getIORead()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Read # of IO that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: " +
-                                                    host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIORead() + " Stored: " +
-                                                    vmDiskStat_lock.getCurrentIORead());
+                                            s_logger.debug("Read # of IO that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
+                                                    + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIORead() + " Stored: "
+                                                    + vmDiskStat_lock.getCurrentIORead());
                                         }
                                         vmDiskStat_lock.setNetIORead(vmDiskStat_lock.getNetIORead() + vmDiskStat_lock.getCurrentIORead());
                                     }
                                     vmDiskStat_lock.setCurrentIORead(vmDiskStat.getIORead());
                                     if (vmDiskStat_lock.getCurrentIOWrite() > vmDiskStat.getIOWrite()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Write # of IO that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: " +
-                                                    host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIOWrite() + " Stored: " +
-                                                    vmDiskStat_lock.getCurrentIOWrite());
+                                            s_logger.debug("Write # of IO that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
+                                                    + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIOWrite() + " Stored: "
+                                                    + vmDiskStat_lock.getCurrentIOWrite());
                                         }
                                         vmDiskStat_lock.setNetIOWrite(vmDiskStat_lock.getNetIOWrite() + vmDiskStat_lock.getCurrentIOWrite());
                                     }
@@ -781,7 +804,7 @@ protected void runInContext() {
             //Check for ownership
             //msHost in UP state with min id should run the job
             ManagementServerHostVO msHost = _msHostDao.findOneInUpState(new Filter(ManagementServerHostVO.class, "id", true, 0L, 1L));
-            if(msHost == null || (msHost.getMsid() != mgmtSrvrId)){
+            if (msHost == null || (msHost.getMsid() != mgmtSrvrId)) {
                 s_logger.debug("Skipping collect vm network stats from hosts");
                 return;
             }
@@ -792,14 +815,10 @@ protected void runInContext() {
                     public void doInTransactionWithoutResult(TransactionStatus status) {
                         s_logger.debug("VmNetworkStatsTask is running...");
 
-                        SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
-                        sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
-                        sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance);
-                        sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.Routing.toString());
+                        SearchCriteria<HostVO> sc = createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance();
                         List<HostVO> hosts = _hostDao.search(sc, null);
 
-                        for (HostVO host : hosts)
-                        {
+                        for (HostVO host : hosts) {
                             List<UserVmVO> vms = _userVmDao.listRunningByHostId(host.getId());
                             List<Long> vmIds = new ArrayList<Long>();
 
@@ -813,8 +832,7 @@ public void doInTransactionWithoutResult(TransactionStatus status) {
                                 continue;
 
                             Set<Long> vmIdSet = vmNetworkStatsById.keySet();
-                            for(Long vmId : vmIdSet)
-                            {
+                            for (Long vmId : vmIdSet) {
                                 List<VmNetworkStatsEntry> vmNetworkStats = vmNetworkStatsById.get(vmId);
                                 if (vmNetworkStats == null)
                                     continue;
@@ -823,20 +841,24 @@ public void doInTransactionWithoutResult(TransactionStatus status) {
                                     s_logger.debug("Cannot find uservm with id: " + vmId + " , continue");
                                     continue;
                                 }
-                                s_logger.debug("Now we are updating the user_statistics table for VM: " + userVm.getInstanceName() + " after collecting vm network statistics from host: " + host.getName());
-                                for (VmNetworkStatsEntry vmNetworkStat:vmNetworkStats) {
+                                s_logger.debug("Now we are updating the user_statistics table for VM: " + userVm.getInstanceName()
+                                        + " after collecting vm network statistics from host: " + host.getName());
+                                for (VmNetworkStatsEntry vmNetworkStat : vmNetworkStats) {
                                     SearchCriteria<NicVO> sc_nic = _nicDao.createSearchCriteria();
                                     sc_nic.addAnd("macAddress", SearchCriteria.Op.EQ, vmNetworkStat.getMacAddress());
                                     NicVO nic = _nicDao.search(sc_nic, null).get(0);
                                     List<VlanVO> vlan = _vlanDao.listVlansByNetworkId(nic.getNetworkId());
                                     if (vlan == null || vlan.size() == 0 || vlan.get(0).getVlanType() != VlanType.DirectAttached)
                                         continue; // only get network statistics for DirectAttached network (shared networks in Basic zone and Advanced zone with/without SG)
-                                    UserStatisticsVO previousvmNetworkStats = _userStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), nic.getNetworkId(), nic.getIPv4Address(), vmId, "UserVm");
+                                    UserStatisticsVO previousvmNetworkStats = _userStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), nic.getNetworkId(),
+                                            nic.getIPv4Address(), vmId, "UserVm");
                                     if (previousvmNetworkStats == null) {
-                                        previousvmNetworkStats = new UserStatisticsVO(userVm.getAccountId(), userVm.getDataCenterId(),nic.getIPv4Address(), vmId, "UserVm", nic.getNetworkId());
+                                        previousvmNetworkStats = new UserStatisticsVO(userVm.getAccountId(), userVm.getDataCenterId(), nic.getIPv4Address(), vmId, "UserVm",
+                                                nic.getNetworkId());
                                         _userStatsDao.persist(previousvmNetworkStats);
                                     }
-                                    UserStatisticsVO vmNetworkStat_lock = _userStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), nic.getNetworkId(), nic.getIPv4Address(), vmId, "UserVm");
+                                    UserStatisticsVO vmNetworkStat_lock = _userStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), nic.getNetworkId(),
+                                            nic.getIPv4Address(), vmId, "UserVm");
 
                                     if ((vmNetworkStat.getBytesSent() == 0) && (vmNetworkStat.getBytesReceived() == 0)) {
                                         s_logger.debug("bytes sent and received are all 0. Not updating user_statistics");
@@ -844,24 +866,24 @@ public void doInTransactionWithoutResult(TransactionStatus status) {
                                     }
 
                                     if (vmNetworkStat_lock == null) {
-                                        s_logger.warn("unable to find vm network stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId()+ " and nicId:" + nic.getId());
+                                        s_logger.warn("unable to find vm network stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId()
+                                                + " and nicId:" + nic.getId());
                                         continue;
                                     }
 
-                                    if (previousvmNetworkStats != null
-                                            && ((previousvmNetworkStats.getCurrentBytesSent() != vmNetworkStat_lock.getCurrentBytesSent())
+                                    if (previousvmNetworkStats != null && ((previousvmNetworkStats.getCurrentBytesSent() != vmNetworkStat_lock.getCurrentBytesSent())
                                             || (previousvmNetworkStats.getCurrentBytesReceived() != vmNetworkStat_lock.getCurrentBytesReceived()))) {
-                                        s_logger.debug("vm network stats changed from the time GetNmNetworkStatsCommand was sent. " +
-                                                "Ignoring current answer. Host: " + host.getName()  + " . VM: " + vmNetworkStat.getVmName() +
-                                                " Sent(Bytes): " + vmNetworkStat.getBytesSent() + " Received(Bytes): " + vmNetworkStat.getBytesReceived());
+                                        s_logger.debug("vm network stats changed from the time GetNmNetworkStatsCommand was sent. " + "Ignoring current answer. Host: "
+                                                + host.getName() + " . VM: " + vmNetworkStat.getVmName() + " Sent(Bytes): " + vmNetworkStat.getBytesSent() + " Received(Bytes): "
+                                                + vmNetworkStat.getBytesReceived());
                                         continue;
                                     }
 
                                     if (vmNetworkStat_lock.getCurrentBytesSent() > vmNetworkStat.getBytesSent()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Sent # of bytes that's less than the last one.  " +
-                                                    "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmNetworkStat.getVmName() +
-                                                    " Reported: " + vmNetworkStat.getBytesSent() + " Stored: " + vmNetworkStat_lock.getCurrentBytesSent());
+                                            s_logger.debug("Sent # of bytes that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
+                                                    + host.getName() + " . VM: " + vmNetworkStat.getVmName() + " Reported: " + vmNetworkStat.getBytesSent() + " Stored: "
+                                                    + vmNetworkStat_lock.getCurrentBytesSent());
                                         }
                                         vmNetworkStat_lock.setNetBytesSent(vmNetworkStat_lock.getNetBytesSent() + vmNetworkStat_lock.getCurrentBytesSent());
                                     }
@@ -869,15 +891,15 @@ public void doInTransactionWithoutResult(TransactionStatus status) {
 
                                     if (vmNetworkStat_lock.getCurrentBytesReceived() > vmNetworkStat.getBytesReceived()) {
                                         if (s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Received # of bytes that's less than the last one.  " +
-                                                    "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmNetworkStat.getVmName() +
-                                                    " Reported: " + vmNetworkStat.getBytesReceived() + " Stored: " + vmNetworkStat_lock.getCurrentBytesReceived());
+                                            s_logger.debug("Received # of bytes that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
+                                                    + host.getName() + " . VM: " + vmNetworkStat.getVmName() + " Reported: " + vmNetworkStat.getBytesReceived() + " Stored: "
+                                                    + vmNetworkStat_lock.getCurrentBytesReceived());
                                         }
                                         vmNetworkStat_lock.setNetBytesReceived(vmNetworkStat_lock.getNetBytesReceived() + vmNetworkStat_lock.getCurrentBytesReceived());
                                     }
                                     vmNetworkStat_lock.setCurrentBytesReceived(vmNetworkStat.getBytesReceived());
 
-                                    if (! _dailyOrHourly) {
+                                    if (!_dailyOrHourly) {
                                         //update agg bytes
                                         vmNetworkStat_lock.setAggBytesReceived(vmNetworkStat_lock.getNetBytesReceived() + vmNetworkStat_lock.getCurrentBytesReceived());
                                         vmNetworkStat_lock.setAggBytesSent(vmNetworkStat_lock.getNetBytesSent() + vmNetworkStat_lock.getCurrentBytesSent());
@@ -895,7 +917,6 @@ public void doInTransactionWithoutResult(TransactionStatus status) {
         }
     }
 
-
     class VolumeStatsTask extends ManagedContextRunnable {
         @Override
         protected void runInContext() {
@@ -905,18 +926,15 @@ protected void runInContext() {
                 for (StoragePoolVO pool : pools) {
                     List<VolumeVO> volumes = _volsDao.findByPoolId(pool.getId(), null);
                     List<String> volumeLocators = new ArrayList<String>();
-                    for (VolumeVO volume: volumes){
+                    for (VolumeVO volume : volumes) {
                         if (volume.getFormat() == ImageFormat.QCOW2) {
                             volumeLocators.add(volume.getUuid());
-                        }
-                        else if (volume.getFormat() == ImageFormat.VHD){
+                        } else if (volume.getFormat() == ImageFormat.VHD) {
                             volumeLocators.add(volume.getPath());
-                        }
-                        else if (volume.getFormat() == ImageFormat.OVA){
+                        } else if (volume.getFormat() == ImageFormat.OVA) {
                             volumeLocators.add(volume.getChainInfo());
-                        }
-                        else {
-                            s_logger.warn("Volume stats not implemented for this format type " + volume.getFormat() );
+                        } else {
+                            s_logger.warn("Volume stats not implemented for this format type " + volume.getFormat());
                             break;
                         }
                     }
@@ -924,8 +942,9 @@ else if (volume.getFormat() == ImageFormat.OVA){
                         Map<String, VolumeStatsEntry> volumeStatsByUuid;
                         if (pool.getScope() == ScopeType.ZONE) {
                             volumeStatsByUuid = new HashMap<>();
-                            for (final Cluster cluster: _clusterDao.listByZoneId(pool.getDataCenterId())) {
-                                final Map<String, VolumeStatsEntry> volumeStatsForCluster = _userVmMgr.getVolumeStatistics(cluster.getId(), pool.getUuid(), pool.getPoolType(), volumeLocators, StatsTimeout.value());
+                            for (final Cluster cluster : _clusterDao.listByZoneId(pool.getDataCenterId())) {
+                                final Map<String, VolumeStatsEntry> volumeStatsForCluster = _userVmMgr.getVolumeStatistics(cluster.getId(), pool.getUuid(), pool.getPoolType(),
+                                        volumeLocators, StatsTimeout.value());
                                 if (volumeStatsForCluster != null) {
                                     volumeStatsByUuid.putAll(volumeStatsForCluster);
                                 }
@@ -933,7 +952,7 @@ else if (volume.getFormat() == ImageFormat.OVA){
                         } else {
                             volumeStatsByUuid = _userVmMgr.getVolumeStatistics(pool.getClusterId(), pool.getUuid(), pool.getPoolType(), volumeLocators, StatsTimeout.value());
                         }
-                        if (volumeStatsByUuid != null){
+                        if (volumeStatsByUuid != null) {
                             for (final Map.Entry<String, VolumeStatsEntry> entry : volumeStatsByUuid.entrySet()) {
                                 if (entry == null || entry.getKey() == null || entry.getValue() == null) {
                                     continue;
@@ -985,8 +1004,7 @@ protected void runInContext() {
                     Answer answer = ssAhost.sendMessage(command);
                     if (answer != null && answer.getResult()) {
                         storageStats.put(storeId, (StorageStats)answer);
-                        s_logger.trace("HostId: " + storeId + " Used: " + ((StorageStats)answer).getByteUsed() + " Total Available: " +
-                                ((StorageStats)answer).getCapacityBytes());
+                        s_logger.trace("HostId: " + storeId + " Used: " + ((StorageStats)answer).getByteUsed() + " Total Available: " + ((StorageStats)answer).getCapacityBytes());
                     }
                 }
                 _storageStats = storageStats;
@@ -1047,8 +1065,7 @@ protected void runInContext() {
                         //check interval
                         long now = (new Date()).getTime();
                         if (asGroup.getLastInterval() != null)
-                            if ((now - asGroup.getLastInterval().getTime()) < asGroup
-                                    .getInterval()) {
+                            if ((now - asGroup.getLastInterval().getTime()) < asGroup.getInterval()) {
                                 continue;
                             }
 
@@ -1213,7 +1230,6 @@ private String getAutoscaleAction(HashMap<Long, Double> avgCounter, long groupId
                                 long thresholdValue = conditionVO.getThreshold();
                                 Double thresholdPercent = (double)thresholdValue / 100;
                                 CounterVO counterVO = _asCounterDao.findById(conditionVO.getCounterid());
-//Double sum = avgCounter.get(conditionVO.getCounterid());
                                 long counter_count = 1;
                                 do {
                                     String counter_param = params.get("counter" + String.valueOf(counter_count));
@@ -1260,8 +1276,7 @@ private String getAutoscaleAction(HashMap<Long, Double> avgCounter, long groupId
             return lstResult;
         }
 
-        public List<Pair<String, Integer>> getPairofCounternameAndDuration(
-                long groupId) {
+        public List<Pair<String, Integer>> getPairofCounternameAndDuration(long groupId) {
             AutoScaleVmGroupVO groupVo = _asGroupDao.findById(groupId);
             if (groupVo == null)
                 return null;
@@ -1307,14 +1322,231 @@ public String getCounternamebyCondition(long conditionId) {
         }
     }
 
+    /**
+     * This class allows to writing metrics in InfluxDB for the table that matches the Collector extending it.
+     * Thus, VmStatsCollector and HostCollector can use same method to write on different measures (host_stats or vm_stats table).
+     */
+    abstract class AbstractStatsCollector extends ManagedContextRunnable {
+        /**
+         * Sends metrics to influxdb host. This method supports both VM and Host metrics
+         */
+        protected void sendMetricsToInfluxdb(Map<Object, Object> metrics) {
+            InfluxDB influxDbConnection = createInfluxDbConnection();
+
+            Pong response = influxDbConnection.ping();
+            if (response.getVersion().equalsIgnoreCase("unknown")) {
+                throw new CloudRuntimeException(String.format("Cannot ping influxdb host %s:%s.", externalStatsHost, externalStatsPort));
+            }
+
+            Collection<Object> metricsObjects = metrics.values();
+            List<Point> points = new ArrayList<>();
+
+            s_logger.debug(String.format("Sending stats to %s host %s:%s", externalStatsType, externalStatsHost, externalStatsPort));
+
+            for (Object metricsObject : metricsObjects) {
+                Point vmPoint = creteInfluxDbPoint(metricsObject);
+                points.add(vmPoint);
+            }
+            writeBatches(influxDbConnection, databaseName, points);
+        }
+
+        /**
+         * Creates a InfluxDB point for the given stats collector (VmStatsCollector, or HostCollector).
+         */
+        protected abstract Point creteInfluxDbPoint(Object metricsObject);
+    }
+
     public boolean imageStoreHasEnoughCapacity(DataStore imageStore) {
         StorageStats imageStoreStats = _storageStats.get(imageStore.getId());
-        if (imageStoreStats != null && (imageStoreStats.getByteUsed()/(imageStoreStats.getCapacityBytes()*1.0)) <= _imageStoreCapacityThreshold) {
+        if (imageStoreStats != null && (imageStoreStats.getByteUsed() / (imageStoreStats.getCapacityBytes() * 1.0)) <= _imageStoreCapacityThreshold) {
             return true;
         }
         return false;
     }
 
+    /**
+     * Sends VMs metrics to the configured graphite host.
+     */
+    protected void sendVmMetricsToGraphiteHost(Map<Object, Object> metrics, HostVO host) {
+        s_logger.debug(String.format("Sending VmStats of host %s to %s host %s:%s", host.getId(), externalStatsType, externalStatsHost, externalStatsPort));
+        try {
+            GraphiteClient g = new GraphiteClient(externalStatsHost, externalStatsPort);
+            g.sendMetrics(metrics);
+        } catch (GraphiteException e) {
+            s_logger.debug("Failed sending VmStats to Graphite host " + externalStatsHost + ":" + externalStatsPort + ": " + e.getMessage());
+        }
+    }
+
+    /**
+     * Prepares metrics for Graphite.
+     * @note this method must only be executed in case the configured stats collector is a Graphite host;
+     * otherwise, it will compromise the map of metrics used by another type of collector (e.g. InfluxDB).
+     */
+    private void prepareVmMetricsForGraphite(Map<Object, Object> metrics, VmStatsEntry statsForCurrentIteration) {
+        VMInstanceVO vmVO = _vmInstance.findById(statsForCurrentIteration.getVmId());
+        String vmName = vmVO.getUuid();
+
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".cpu.num", statsForCurrentIteration.getNumCPUs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".cpu.utilization", statsForCurrentIteration.getCPUUtilization());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".network.read_kbs", statsForCurrentIteration.getNetworkReadKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".network.write_kbs", statsForCurrentIteration.getNetworkWriteKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.write_kbs", statsForCurrentIteration.getDiskWriteKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.read_kbs", statsForCurrentIteration.getDiskReadKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.write_iops", statsForCurrentIteration.getDiskWriteIOs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".disk.read_iops", statsForCurrentIteration.getDiskReadIOs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.total_kbs", statsForCurrentIteration.getMemoryKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.internalfree_kbs", statsForCurrentIteration.getIntFreeMemoryKBs());
+        metrics.put(externalStatsPrefix + "cloudstack.stats.instances." + vmName + ".memory.target_kbs", statsForCurrentIteration.getTargetMemoryKBs());
+    }
+
+    /**
+     * Stores virtual machine stats in memory (map of {@link VmStatsEntry}).
+     */
+    private void storeVirtualMachineStatsInMemory(VmStatsEntry statsForCurrentIteration) {
+        VmStatsEntry statsInMemory = (VmStatsEntry)_VmStats.get(statsForCurrentIteration.getVmId());
+
+        if (statsInMemory == null) {
+            //no stats exist for this vm, directly persist
+            _VmStats.put(statsForCurrentIteration.getVmId(), statsForCurrentIteration);
+        } else {
+            //update each field
+            statsInMemory.setCPUUtilization(statsForCurrentIteration.getCPUUtilization());
+            statsInMemory.setNumCPUs(statsForCurrentIteration.getNumCPUs());
+            statsInMemory.setNetworkReadKBs(statsInMemory.getNetworkReadKBs() + statsForCurrentIteration.getNetworkReadKBs());
+            statsInMemory.setNetworkWriteKBs(statsInMemory.getNetworkWriteKBs() + statsForCurrentIteration.getNetworkWriteKBs());
+            statsInMemory.setDiskWriteKBs(statsInMemory.getDiskWriteKBs() + statsForCurrentIteration.getDiskWriteKBs());
+            statsInMemory.setDiskReadIOs(statsInMemory.getDiskReadIOs() + statsForCurrentIteration.getDiskReadIOs());
+            statsInMemory.setDiskWriteIOs(statsInMemory.getDiskWriteIOs() + statsForCurrentIteration.getDiskWriteIOs());
+            statsInMemory.setDiskReadKBs(statsInMemory.getDiskReadKBs() + statsForCurrentIteration.getDiskReadKBs());
+            statsInMemory.setMemoryKBs(statsForCurrentIteration.getMemoryKBs());
+            statsInMemory.setIntFreeMemoryKBs(statsForCurrentIteration.getIntFreeMemoryKBs());
+            statsInMemory.setTargetMemoryKBs(statsForCurrentIteration.getTargetMemoryKBs());
+
+            _VmStats.put(statsForCurrentIteration.getVmId(), statsInMemory);
+        }
+    }
+
+    /**
+     * Sends host metrics to a configured InfluxDB host. The metrics respects the following specification.</br>
+     * <b>Tags:</b>vm_id, uuid, instance_name, data_center_id, host_id</br>
+     * <b>Fields:</b>memory_total_kb, memory_internal_free_kbs, memory_target_kbs, cpu_utilization, cpus, network_write_kb, disk_read_iops, disk_read_kbs, disk_write_iops, disk_write_kbs
+     */
+    protected Point createInfluxDbPointForHostMetrics(Object metricsObject) {
+        HostStatsEntry hostStatsEntry = (HostStatsEntry)metricsObject;
+
+        Map<String, String> tagsToAdd = new HashMap<>();
+        tagsToAdd.put(UUID_TAG, hostStatsEntry.getHostVo().getUuid());
+
+        Map<String, Object> fieldsToAdd = new HashMap<>();
+        fieldsToAdd.put(TOTAL_MEMORY_KBS_FIELD, hostStatsEntry.getTotalMemoryKBs());
+        fieldsToAdd.put(FREE_MEMORY_KBS_FIELD, hostStatsEntry.getFreeMemoryKBs());
+        fieldsToAdd.put(CPU_UTILIZATION_FIELD, hostStatsEntry.getCpuUtilization());
+        fieldsToAdd.put(CPUS_FIELD, hostStatsEntry.getHostVo().getCpus());
+        fieldsToAdd.put(CPU_SOCKETS_FIELD, hostStatsEntry.getHostVo().getCpuSockets());
+        fieldsToAdd.put(NETWORK_READ_KBS_FIELD, hostStatsEntry.getNetworkReadKBs());
+        fieldsToAdd.put(NETWORK_WRITE_KBS_FIELD, hostStatsEntry.getNetworkWriteKBs());
+
+        return Point.measurement(INFLUXDB_HOST_MEASUREMENT).tag(tagsToAdd).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).fields(fieldsToAdd).build();
+    }
+
+    /**
+     * Sends VMs metrics to a configured InfluxDB host. The metrics respects the following specification.</br>
+     * <b>Tags:</b>vm_id, uuid, instance_name, data_center_id, host_id</br>
+     * <b>Fields:</b>memory_total_kb, memory_internal_free_kbs, memory_target_kbs, cpu_utilization, cpus, network_write_kb, disk_read_iops, disk_read_kbs, disk_write_iops, disk_write_kbs
+     */
+    protected Point createInfluxDbPointForVmMetrics(Object metricsObject) {
+        VmStatsEntry vmStatsEntry = (VmStatsEntry)metricsObject;
+        UserVmVO userVmVO = vmStatsEntry.getUserVmVO();
+
+        Map<String, String> tagsToAdd = new HashMap<>();
+        tagsToAdd.put(UUID_TAG, userVmVO.getUuid());
+
+        Map<String, Object> fieldsToAdd = new HashMap<>();
+        fieldsToAdd.put(TOTAL_MEMORY_KBS_FIELD, vmStatsEntry.getMemoryKBs());
+        fieldsToAdd.put(FREE_MEMORY_KBS_FIELD, vmStatsEntry.getIntFreeMemoryKBs());
+        fieldsToAdd.put(MEMORY_TARGET_KBS_FIELD, vmStatsEntry.getTargetMemoryKBs());
+        fieldsToAdd.put(CPU_UTILIZATION_FIELD, vmStatsEntry.getCPUUtilization());
+        fieldsToAdd.put(CPUS_FIELD, vmStatsEntry.getNumCPUs());
+        fieldsToAdd.put(NETWORK_READ_KBS_FIELD, vmStatsEntry.getNetworkReadKBs());
+        fieldsToAdd.put(NETWORK_WRITE_KBS_FIELD, vmStatsEntry.getNetworkWriteKBs());
+        fieldsToAdd.put(DISK_READ_IOPS_FIELD, vmStatsEntry.getDiskReadIOs());
+        fieldsToAdd.put(DISK_READ_KBS_FIELD, vmStatsEntry.getDiskReadKBs());
+        fieldsToAdd.put(DISK_WRITE_IOPS_FIELD, vmStatsEntry.getDiskWriteIOs());
+        fieldsToAdd.put(DISK_WRITE_KBS_FIELD, vmStatsEntry.getDiskWriteKBs());
+
+        return Point.measurement(INFLUXDB_VM_MEASUREMENT).tag(tagsToAdd).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).fields(fieldsToAdd).build();
+    }
+
+    /**
+     * Creates connection to InfluxDB. If the database does not exist, it throws a CloudRuntimeException. </br>
+     * @note the user can configure the database name on parameter 'stats.output.influxdb.database.name'; such database must be yet created and configured by the user.
+     * The Default name for the database is 'cloudstack_stats'.
+     */
+    protected InfluxDB createInfluxDbConnection() {
+        String influxDbQueryUrl = String.format("http://%s:%s/", externalStatsHost, externalStatsPort);
+        InfluxDB influxDbConnection = InfluxDBFactory.connect(influxDbQueryUrl);
+
+        if (!influxDbConnection.databaseExists(databaseName)) {
+            throw new CloudRuntimeException(String.format("Database with name %s does not exist in influxdb host %s:%s", databaseName, externalStatsHost, externalStatsPort));
+        }
+
+        return influxDbConnection;
+    }
+
+    /**
+     * Writes batches of InfluxDB database points into a given database.
+     */
+    protected void writeBatches(InfluxDB influxDbConnection, String dbName, List<Point> points) {
+        BatchPoints batchPoints = BatchPoints.database(dbName).build();
+
+        for (Point point : points) {
+            batchPoints.point(point);
+        }
+
+        influxDbConnection.write(batchPoints);
+    }
+
+    /**
+     * Returns true if at least one of the current disk stats is different from the previous.</br>
+     * The considered disk stats are the following: bytes read, bytes write, IO read,  and IO write.
+     */
+    protected boolean isCurrentVmDiskStatsDifferentFromPrevious(VmDiskStatisticsVO previousVmDiskStats, VmDiskStatisticsVO currentVmDiskStats) {
+        if (previousVmDiskStats != null) {
+            boolean bytesReadDifferentFromPrevious = previousVmDiskStats.getCurrentBytesRead() != currentVmDiskStats.getCurrentBytesRead();
+            boolean bytesWriteDifferentFromPrevious = previousVmDiskStats.getCurrentBytesWrite() != currentVmDiskStats.getCurrentBytesWrite();
+            boolean ioReadDifferentFromPrevious = previousVmDiskStats.getCurrentIORead() != currentVmDiskStats.getCurrentIORead();
+            boolean ioWriteDifferentFromPrevious = previousVmDiskStats.getCurrentIOWrite() != currentVmDiskStats.getCurrentIOWrite();
+            return bytesReadDifferentFromPrevious || bytesWriteDifferentFromPrevious || ioReadDifferentFromPrevious || ioWriteDifferentFromPrevious;
+        }
+        if (currentVmDiskStats == null) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Returns true if all the VmDiskStatsEntry are Zeros (Bytes read, Bytes write, IO read, and IO write must be all equals to zero)
+     */
+    protected boolean areAllDiskStatsZero(VmDiskStatsEntry vmDiskStat) {
+        return (vmDiskStat.getBytesRead() == 0) && (vmDiskStat.getBytesWrite() == 0) && (vmDiskStat.getIORead() == 0) && (vmDiskStat.getIOWrite() == 0);
+    }
+
+    /**
+     * Creates a HostVO SearchCriteria where:
+     * <ul>
+     *  <li>"status" is Up;</li>
+     *  <li>"resourceState" is not in Maintenance, PrepareForMaintenance, or ErrorInMaintenance; and</li>
+     *  <li>"type" is Routing.</li>
+     * </ul>
+     */
+    private SearchCriteria<HostVO> createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance() {
+        SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
+        sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
+        sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance);
+        sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.Routing.toString());
+        return sc;
+    }
+
     public StorageStats getStorageStats(long id) {
         return _storageStats.get(id);
     }
@@ -1334,6 +1566,6 @@ public String getConfigComponentName() {
 
     @Override
     public ConfigKey<?>[] getConfigKeys() {
-        return new ConfigKey<?>[] { vmDiskStatsInterval, vmDiskStatsIntervalMin, vmNetworkStatsInterval, vmNetworkStatsIntervalMin, StatsTimeout };
+        return new ConfigKey<?>[] {vmDiskStatsInterval, vmDiskStatsIntervalMin, vmNetworkStatsInterval, vmNetworkStatsIntervalMin, StatsTimeout, statsOutputUri};
     }
 }
diff --git a/server/src/test/java/com/cloud/server/StatsCollectorTest.java b/server/src/test/java/com/cloud/server/StatsCollectorTest.java
new file mode 100644
index 00000000000..040d08de1bf
--- /dev/null
+++ b/server/src/test/java/com/cloud/server/StatsCollectorTest.java
@@ -0,0 +1,227 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package com.cloud.server;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.BatchPoints.Builder;
+import org.influxdb.dto.Point;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import com.cloud.agent.api.VmDiskStatsEntry;
+import com.cloud.server.StatsCollector.ExternalStatsProtocol;
+import com.cloud.user.VmDiskStatisticsVO;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.tngtech.java.junit.dataprovider.DataProvider;
+import com.tngtech.java.junit.dataprovider.DataProviderRunner;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(DataProviderRunner.class)
+@PrepareForTest({InfluxDBFactory.class, BatchPoints.class})
+public class StatsCollectorTest {
+    private StatsCollector statsCollector = Mockito.spy(new StatsCollector());
+
+    private static final int GRAPHITE_DEFAULT_PORT = 2003;
+    private static final int INFLUXDB_DEFAULT_PORT = 8086;
+    private static final String HOST_ADDRESS = "192.168.16.10";
+    private static final String URL = String.format("http://%s:%s/", HOST_ADDRESS, INFLUXDB_DEFAULT_PORT);
+
+    private static final String DEFAULT_DATABASE_NAME = "cloudstack";
+
+    @Test
+    public void createInfluxDbConnectionTest() {
+        configureAndTestCreateInfluxDbConnection(true);
+    }
+
+    @Test(expected = CloudRuntimeException.class)
+    public void createInfluxDbConnectionTestExpectException() {
+        configureAndTestCreateInfluxDbConnection(false);
+    }
+
+    private void configureAndTestCreateInfluxDbConnection(boolean databaseExists) {
+        statsCollector.externalStatsHost = HOST_ADDRESS;
+        statsCollector.externalStatsPort = INFLUXDB_DEFAULT_PORT;
+        InfluxDB influxDbConnection = Mockito.mock(InfluxDB.class);
+        Mockito.when(influxDbConnection.databaseExists(DEFAULT_DATABASE_NAME)).thenReturn(databaseExists);
+        PowerMockito.mockStatic(InfluxDBFactory.class);
+        PowerMockito.when(InfluxDBFactory.connect(URL)).thenReturn(influxDbConnection);
+
+        InfluxDB returnedConnection = statsCollector.createInfluxDbConnection();
+
+        Assert.assertEquals(influxDbConnection, returnedConnection);
+    }
+
+    @Test
+    public void writeBatchesTest() {
+        InfluxDB influxDbConnection = Mockito.mock(InfluxDB.class);
+        Mockito.doNothing().when(influxDbConnection).write(Mockito.any(Point.class));
+        Builder builder = Mockito.mock(Builder.class);
+        BatchPoints batchPoints = Mockito.mock(BatchPoints.class);
+        PowerMockito.mockStatic(BatchPoints.class);
+        PowerMockito.when(BatchPoints.database(DEFAULT_DATABASE_NAME)).thenReturn(builder);
+        Mockito.when(builder.build()).thenReturn(batchPoints);
+        Map<String, String> tagsToAdd = new HashMap<>();
+        tagsToAdd.put("hostId", "1");
+        Map<String, Object> fieldsToAdd = new HashMap<>();
+        fieldsToAdd.put("total_memory_kbs", 10000000);
+        Point point = Point.measurement("measure").tag(tagsToAdd).time(System.currentTimeMillis(), TimeUnit.MILLISECONDS).fields(fieldsToAdd).build();
+        List<Point> points = new ArrayList<>();
+        points.add(point);
+        Mockito.when(batchPoints.point(point)).thenReturn(batchPoints);
+
+        statsCollector.writeBatches(influxDbConnection, DEFAULT_DATABASE_NAME, points);
+
+        Mockito.verify(influxDbConnection).write(batchPoints);
+    }
+
+    @Test
+    public void configureExternalStatsPortTestGraphitePort() throws URISyntaxException {
+        URI uri = new URI(HOST_ADDRESS);
+        statsCollector.externalStatsType = ExternalStatsProtocol.GRAPHITE;
+        int port = statsCollector.retrieveExternalStatsPortFromUri(uri);
+        Assert.assertEquals(GRAPHITE_DEFAULT_PORT, port);
+    }
+
+    @Test
+    public void configureExternalStatsPortTestInfluxdbPort() throws URISyntaxException {
+        URI uri = new URI(HOST_ADDRESS);
+        statsCollector.externalStatsType = ExternalStatsProtocol.INFLUXDB;
+        int port = statsCollector.retrieveExternalStatsPortFromUri(uri);
+        Assert.assertEquals(INFLUXDB_DEFAULT_PORT, port);
+    }
+
+    @Test(expected = URISyntaxException.class)
+    public void configureExternalStatsPortTestExpectException() throws URISyntaxException {
+        statsCollector.externalStatsType = ExternalStatsProtocol.NONE;
+        URI uri = new URI(HOST_ADDRESS);
+        statsCollector.retrieveExternalStatsPortFromUri(uri);
+    }
+
+    @Test
+    public void configureExternalStatsPortTestInfluxDbCustomizedPort() throws URISyntaxException {
+        statsCollector.externalStatsType = ExternalStatsProtocol.INFLUXDB;
+        URI uri = new URI("test://" + HOST_ADDRESS + ":1234");
+        int port = statsCollector.retrieveExternalStatsPortFromUri(uri);
+        Assert.assertEquals(1234, port);
+    }
+
+    @Test
+    public void configureDatabaseNameTestDefaultDbName() throws URISyntaxException {
+        URI uri = new URI(URL);
+        String dbName = statsCollector.configureDatabaseName(uri);
+        Assert.assertEquals(DEFAULT_DATABASE_NAME, dbName);
+    }
+
+    @Test
+    public void configureDatabaseNameTestCustomDbName() throws URISyntaxException {
+        String configuredDbName = "dbName";
+        URI uri = new URI(URL + configuredDbName);
+        String dbName = statsCollector.configureDatabaseName(uri);
+        Assert.assertEquals(configuredDbName, dbName);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestNull() {
+        VmDiskStatisticsVO currentVmDiskStatisticsVO = new VmDiskStatisticsVO(1l, 1l, 1l, 1l);
+        boolean result = statsCollector.isCurrentVmDiskStatsDifferentFromPrevious(null, currentVmDiskStatisticsVO);
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestBothNull() {
+        boolean result = statsCollector.isCurrentVmDiskStatsDifferentFromPrevious(null, null);
+        Assert.assertFalse(result);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestDifferentIoWrite() {
+        configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 123l, 123l, 12l, true);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestDifferentIoRead() {
+        configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 123l, 12l, 123l, true);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestDifferentBytesRead() {
+        configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(12l, 123l, 123l, 123l, true);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestDifferentBytesWrite() {
+        configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 12l, 123l, 123l, true);
+    }
+
+    @Test
+    public void isCurrentVmDiskStatsDifferentFromPreviousTestAllEqual() {
+        configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(123l, 123l, 123l, 123l, false);
+    }
+
+    private void configureAndTestisCurrentVmDiskStatsDifferentFromPrevious(long bytesRead, long bytesWrite, long ioRead, long ioWrite, boolean expectedResult) {
+        VmDiskStatisticsVO previousVmDiskStatisticsVO = new VmDiskStatisticsVO(1l, 1l, 1l, 1l);
+        previousVmDiskStatisticsVO.setCurrentBytesRead(123l);
+        previousVmDiskStatisticsVO.setCurrentBytesWrite(123l);
+        previousVmDiskStatisticsVO.setCurrentIORead(123l);
+        previousVmDiskStatisticsVO.setCurrentIOWrite(123l);
+
+        VmDiskStatisticsVO currentVmDiskStatisticsVO = new VmDiskStatisticsVO(1l, 1l, 1l, 1l);
+        currentVmDiskStatisticsVO.setCurrentBytesRead(bytesRead);
+        currentVmDiskStatisticsVO.setCurrentBytesWrite(bytesWrite);
+        currentVmDiskStatisticsVO.setCurrentIORead(ioRead);
+        currentVmDiskStatisticsVO.setCurrentIOWrite(ioWrite);
+
+        boolean result = statsCollector.isCurrentVmDiskStatsDifferentFromPrevious(previousVmDiskStatisticsVO, currentVmDiskStatisticsVO);
+        Assert.assertEquals(expectedResult, result);
+    }
+
+    @Test
+    @DataProvider({
+        "0,0,0,0,true", "1,0,0,0,false", "0,1,0,0,false", "0,0,1,0,false",
+        "0,0,0,1,false", "1,0,0,1,false", "1,0,1,0,false", "1,1,0,0,false",
+        "0,1,1,0,false", "0,1,0,1,false", "0,0,1,1,false", "0,1,1,1,false",
+        "1,1,0,1,false", "1,0,1,1,false", "1,1,1,0,false", "1,1,1,1,false",
+    })
+    public void configureAndTestCheckIfDiskStatsAreZero(long bytesRead, long bytesWrite, long ioRead, long ioWrite, boolean expected) {
+        VmDiskStatsEntry vmDiskStatsEntry = new VmDiskStatsEntry();
+        vmDiskStatsEntry.setBytesRead(bytesRead);
+        vmDiskStatsEntry.setBytesWrite(bytesWrite);
+        vmDiskStatsEntry.setIORead(ioRead);
+        vmDiskStatsEntry.setIOWrite(ioWrite);
+
+        boolean result = statsCollector.areAllDiskStatsZero(vmDiskStatsEntry);
+        Assert.assertEquals(expected, result);
+    }
+}
diff --git a/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java b/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java
index 4143f099820..9c9d48a73ca 100644
--- a/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java
+++ b/utils/src/main/java/org/apache/cloudstack/utils/graphite/GraphiteClient.java
@@ -67,7 +67,7 @@ protected long getCurrentSystemTime() {
      *
      * @param metrics the metrics as key-value-pairs
      */
-    public void sendMetrics(Map<String, Integer> metrics) {
+    public void sendMetrics(Map<Object, Object> metrics) {
         sendMetrics(metrics, getCurrentSystemTime());
     }
 
@@ -77,12 +77,12 @@ public void sendMetrics(Map<String, Integer> metrics) {
      * @param metrics the metrics as key-value-pairs
      * @param timeStamp the timestamp
      */
-    public void sendMetrics(Map<String, Integer> metrics, long timeStamp) {
+    public void sendMetrics(Map<Object, Object> metrics, long timeStamp) {
         try (DatagramSocket sock = new DatagramSocket()){
             java.security.Security.setProperty("networkaddress.cache.ttl", "0");
             InetAddress addr = InetAddress.getByName(this.graphiteHost);
 
-            for (Map.Entry<String, Integer> metric: metrics.entrySet()) {
+            for (Map.Entry<Object, Object> metric : metrics.entrySet()) {
                 byte[] message = new String(metric.getKey() + " " + metric.getValue() + " " + timeStamp + "\n").getBytes();
                 DatagramPacket packet = new DatagramPacket(message, message.length, addr, graphitePort);
                 sock.send(packet);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services