You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by da...@apache.org on 2023/01/30 08:48:10 UTC

[cloudstack] branch main updated: ui,server,api: resource metrics improvements (#6803)

This is an automated email from the ASF dual-hosted git repository.

dahn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/main by this push:
     new 028ca74fb6c ui,server,api: resource metrics improvements (#6803)
028ca74fb6c is described below

commit 028ca74fb6c4e426e3dd5221fbb0281d3da28f14
Author: Abhishek Kumar <ab...@gmail.com>
AuthorDate: Mon Jan 30 14:18:03 2023 +0530

    ui,server,api: resource metrics improvements (#6803)
    
    Signed-off-by: Abhishek Kumar <ab...@gmail.com>
    Co-authored-by: Rohit Yadav <ro...@shapeblue.com>
---
 .../org/apache/cloudstack/api/ApiConstants.java    |   5 +
 .../command/user/config/ListCapabilitiesCmd.java   |   5 +
 .../api/response/CapabilitiesResponse.java         |  32 ++
 .../java/com/cloud/agent/api/VmDiskStatsEntry.java |  37 ++
 .../java/com/cloud/agent/api/VmStatsEntry.java     |  12 +-
 .../java/com/cloud/vm/VirtualMachineManager.java   |  22 +
 .../com/cloud/vm/VirtualMachineManagerImpl.java    | 109 ++++-
 .../main/java/com/cloud/storage/VolumeStatsVO.java |  86 ++++
 .../java/com/cloud/storage/dao/VolumeStatsDao.java |  82 ++++
 .../com/cloud/storage/dao/VolumeStatsDaoImpl.java  | 124 +++++
 .../spring-engine-schema-core-daos-context.xml     |   1 +
 .../resources/META-INF/db/schema-41720to41800.sql  |  11 +
 .../kvm/resource/LibvirtComputingResource.java     |  38 +-
 .../api/BaseResourceUsageHistoryCmd.java           |  54 +++
 ...yCmd.java => ListSystemVMsUsageHistoryCmd.java} |  39 +-
 .../cloudstack/api/ListVMsUsageHistoryCmd.java     |  25 +-
 ...oryCmd.java => ListVolumesUsageHistoryCmd.java} |  42 +-
 .../apache/cloudstack/metrics/MetricsService.java  |  11 +-
 .../cloudstack/metrics/MetricsServiceImpl.java     | 235 ++++++++-
 .../response/VolumeMetricsStatsResponse.java       |  54 +++
 .../cloudstack/metrics/MetricsServiceImplTest.java |  20 +-
 .../com/cloud/network/as/AutoScaleManagerImpl.java |  38 +-
 .../com/cloud/server/ManagementServerImpl.java     | 401 +++++++--------
 .../main/java/com/cloud/server/StatsCollector.java | 269 ++++++----
 .../src/main/java/com/cloud/vm/UserVmManager.java  |  20 +-
 .../main/java/com/cloud/vm/UserVmManagerImpl.java  | 108 -----
 .../cloud/network/as/AutoScaleManagerImplTest.java | 139 +++---
 .../java/com/cloud/server/StatsCollectorTest.java  |  74 ++-
 test/integration/smoke/test_metrics_api.py         | 178 ++++++-
 ui/public/locales/en.json                          |  24 +-
 ui/src/components/view/StatsTab.vue                | 540 ++++++++++-----------
 ui/src/components/view/chart/LineChart.vue         |  55 ---
 ui/src/components/view/stats/ResourceStatsInfo.vue |  10 +-
 .../view/stats/ResourceStatsLineChart.vue          | 239 +++++++++
 ui/src/config/section/infra/routers.js             |   5 +
 ui/src/config/section/infra/systemVms.js           |   6 +
 ui/src/config/section/storage.js                   |   6 +
 ui/src/core/lazy_lib/components_use.js             |   4 +-
 ui/src/style/components/view/StatsTab.scss         |   5 +-
 ui/src/views/compute/InstanceTab.vue               |   2 +-
 40 files changed, 2166 insertions(+), 1001 deletions(-)

diff --git a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
index 99e19ab4288..653b38e6ea3 100644
--- a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
+++ b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
@@ -213,6 +213,10 @@ public class ApiConstants {
     public static final String ID = "id";
     public static final String IDS = "ids";
     public static final String INDEX = "index";
+    public static final String INSTANCES_DISKS_STATS_RETENTION_ENABLED = "instancesdisksstatsretentionenabled";
+    public static final String INSTANCES_DISKS_STATS_RETENTION_TIME = "instancesdisksstatsretentiontime";
+    public static final String INSTANCES_STATS_RETENTION_TIME = "instancesstatsretentiontime";
+    public static final String INSTANCES_STATS_USER_ONLY = "instancesstatsuseronly";
     public static final String PREFIX = "prefix";
     public static final String PREVIOUS_ACL_RULE_ID = "previousaclruleid";
     public static final String NEXT_ACL_RULE_ID = "nextaclruleid";
@@ -392,6 +396,7 @@ public class ApiConstants {
     public static final String START_IPV6 = "startipv6";
     public static final String START_PORT = "startport";
     public static final String STATE = "state";
+    public static final String STATS = "stats";
     public static final String STATUS = "status";
     public static final String STORAGE_TYPE = "storagetype";
     public static final String STORAGE_POLICY = "storagepolicy";
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/user/config/ListCapabilitiesCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/user/config/ListCapabilitiesCmd.java
index fa65101778c..4a2711e2092 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/user/config/ListCapabilitiesCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/user/config/ListCapabilitiesCmd.java
@@ -19,6 +19,7 @@ package org.apache.cloudstack.api.command.user.config;
 import java.util.Map;
 
 import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiConstants;
 import org.apache.cloudstack.api.BaseCmd;
 import org.apache.cloudstack.api.response.CapabilitiesResponse;
 import org.apache.cloudstack.config.ApiServiceConfiguration;
@@ -65,6 +66,10 @@ public class ListCapabilitiesCmd extends BaseCmd {
             response.setApiLimitMax((Integer)capabilities.get("apiLimitMax"));
         }
         response.setDefaultUiPageSize((Long)capabilities.get(ApiServiceConfiguration.DefaultUIPageSize.key()));
+        response.setInstancesStatsRetentionTime((Integer) capabilities.get(ApiConstants.INSTANCES_STATS_RETENTION_TIME));
+        response.setInstancesStatsUserOnly((Boolean) capabilities.get(ApiConstants.INSTANCES_STATS_USER_ONLY));
+        response.setInstancesDisksStatsRetentionEnabled((Boolean) capabilities.get(ApiConstants.INSTANCES_DISKS_STATS_RETENTION_ENABLED));
+        response.setInstancesDisksStatsRetentionTime((Integer) capabilities.get(ApiConstants.INSTANCES_DISKS_STATS_RETENTION_TIME));
         response.setObjectName("capability");
         response.setResponseName(getCommandName());
         this.setResponseObject(response);
diff --git a/api/src/main/java/org/apache/cloudstack/api/response/CapabilitiesResponse.java b/api/src/main/java/org/apache/cloudstack/api/response/CapabilitiesResponse.java
index e860baebe6d..d2f7c6b00d0 100644
--- a/api/src/main/java/org/apache/cloudstack/api/response/CapabilitiesResponse.java
+++ b/api/src/main/java/org/apache/cloudstack/api/response/CapabilitiesResponse.java
@@ -104,6 +104,22 @@ public class CapabilitiesResponse extends BaseResponse {
     @Param(description = "default page size in the UI for various views, value set in the configurations", since = "4.15.2")
     private Long defaultUiPageSize;
 
+    @SerializedName(ApiConstants.INSTANCES_STATS_RETENTION_TIME)
+    @Param(description = "the retention time for Instances stats", since = "4.18.0")
+    private Integer instancesStatsRetentionTime;
+
+    @SerializedName(ApiConstants.INSTANCES_STATS_USER_ONLY)
+    @Param(description = "true if stats are collected only for user instances, false if system instance stats are also collected", since = "4.18.0")
+    private Boolean instancesStatsUserOnly;
+
+    @SerializedName(ApiConstants.INSTANCES_DISKS_STATS_RETENTION_ENABLED)
+    @Param(description = "true if stats are retained for instance disks otherwise false", since = "4.18.0")
+    private Boolean instancesDisksStatsRetentionEnabled;
+
+    @SerializedName(ApiConstants.INSTANCES_DISKS_STATS_RETENTION_TIME)
+    @Param(description = "the retention time for Instances disks stats", since = "4.18.0")
+    private Integer instancesDisksStatsRetentionTime;
+
     public void setSecurityGroupsEnabled(boolean securityGroupsEnabled) {
         this.securityGroupsEnabled = securityGroupsEnabled;
     }
@@ -183,4 +199,20 @@ public class CapabilitiesResponse extends BaseResponse {
     public void setDefaultUiPageSize(Long defaultUiPageSize) {
         this.defaultUiPageSize = defaultUiPageSize;
     }
+
+    public void setInstancesStatsRetentionTime(Integer instancesStatsRetentionTime) {
+        this.instancesStatsRetentionTime = instancesStatsRetentionTime;
+    }
+
+    public void setInstancesStatsUserOnly(Boolean instancesStatsUserOnly) {
+        this.instancesStatsUserOnly = instancesStatsUserOnly;
+    }
+
+    public void setInstancesDisksStatsRetentionEnabled(Boolean instancesDisksStatsRetentionEnabled) {
+        this.instancesDisksStatsRetentionEnabled = instancesDisksStatsRetentionEnabled;
+    }
+
+    public void setInstancesDisksStatsRetentionTime(Integer instancesDisksStatsRetentionTime) {
+        this.instancesDisksStatsRetentionTime = instancesDisksStatsRetentionTime;
+    }
 }
diff --git a/core/src/main/java/com/cloud/agent/api/VmDiskStatsEntry.java b/core/src/main/java/com/cloud/agent/api/VmDiskStatsEntry.java
index ead1e80b414..9fbb46f37fe 100644
--- a/core/src/main/java/com/cloud/agent/api/VmDiskStatsEntry.java
+++ b/core/src/main/java/com/cloud/agent/api/VmDiskStatsEntry.java
@@ -30,6 +30,11 @@ public class VmDiskStatsEntry implements VmDiskStats {
     long bytesWrite = 0;
     long bytesRead = 0;
 
+    long deltaIoRead = 0;
+    long deltaIoWrite = 0;
+    long deltaBytesWrite = 0;
+    long deltaBytesRead = 0;
+
     public VmDiskStatsEntry() {
     }
 
@@ -94,4 +99,36 @@ public class VmDiskStatsEntry implements VmDiskStats {
         return ioWrite;
     }
 
+    public long getDeltaIoRead() {
+        return deltaIoRead;
+    }
+
+    public void setDeltaIoRead(long deltaIoRead) {
+        this.deltaIoRead = deltaIoRead;
+    }
+
+    public long getDeltaIoWrite() {
+        return deltaIoWrite;
+    }
+
+    public void setDeltaIoWrite(long deltaIoWrite) {
+        this.deltaIoWrite = deltaIoWrite;
+    }
+
+    public long getDeltaBytesWrite() {
+        return deltaBytesWrite;
+    }
+
+    public void setDeltaBytesWrite(long deltaBytesWrite) {
+        this.deltaBytesWrite = deltaBytesWrite;
+    }
+
+    public long getDeltaBytesRead() {
+        return deltaBytesRead;
+    }
+
+    public void setDeltaBytesRead(long deltaBytesRead) {
+        this.deltaBytesRead = deltaBytesRead;
+    }
+
 }
diff --git a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java b/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
index 9c792b88254..54721a8a10a 100644
--- a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
+++ b/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
@@ -19,11 +19,9 @@
 
 package com.cloud.agent.api;
 
-import com.cloud.vm.UserVmVO;
-
 public class VmStatsEntry extends VmStatsEntryBase {
 
-    private UserVmVO userVmVO;
+    private String vmUuid;
 
     public VmStatsEntry() {
 
@@ -52,12 +50,12 @@ public class VmStatsEntry extends VmStatsEntryBase {
                 entityType);
     }
 
-    public UserVmVO getUserVmVO() {
-        return userVmVO;
+    public String getVmUuid() {
+        return vmUuid;
     }
 
-    public void setUserVmVO(UserVmVO userVmVO) {
-        this.userVmVO = userVmVO;
+    public void setVmUuid(String vmUuid) {
+        this.vmUuid = vmUuid;
     }
 
 }
diff --git a/engine/api/src/main/java/com/cloud/vm/VirtualMachineManager.java b/engine/api/src/main/java/com/cloud/vm/VirtualMachineManager.java
index 35c36d35cd9..4ffb30b493a 100644
--- a/engine/api/src/main/java/com/cloud/vm/VirtualMachineManager.java
+++ b/engine/api/src/main/java/com/cloud/vm/VirtualMachineManager.java
@@ -17,6 +17,7 @@
 package com.cloud.vm;
 
 import java.net.URI;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -263,4 +264,25 @@ public interface VirtualMachineManager extends Manager {
 
     Pair<Long, Long> findClusterAndHostIdForVm(long vmId);
 
+    /**
+     * Obtains statistics for a list of VMs; CPU and network utilization
+     * @param hostId ID of the host
+     * @param hostName name of the host
+     * @param vmIds list of VM IDs
+     * @return map of VM ID and stats entry for the VM
+     */
+    HashMap<Long, ? extends VmStats> getVirtualMachineStatistics(long hostId, String hostName, List<Long> vmIds);
+    /**
+     * Obtains statistics for a list of VMs; CPU and network utilization
+     * @param hostId ID of the host
+     * @param hostName name of the host
+     * @param vmMap map of VM IDs and the corresponding VirtualMachine object
+     * @return map of VM ID and stats entry for the VM
+     */
+    HashMap<Long, ? extends VmStats> getVirtualMachineStatistics(long hostId, String hostName, Map<Long, ? extends VirtualMachine> vmMap);
+
+    HashMap<Long, List<? extends VmDiskStats>> getVmDiskStatistics(long hostId, String hostName, Map<Long, ? extends VirtualMachine> vmMap);
+
+    HashMap<Long, List<? extends VmNetworkStats>> getVmNetworkStatistics(long hostId, String hostName, Map<Long, ? extends VirtualMachine> vmMap);
+
 }
diff --git a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
index 7d441c70b75..d2cdc3da579 100755
--- a/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/main/java/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -17,6 +17,8 @@
 
 package com.cloud.vm;
 
+import static com.cloud.configuration.ConfigurationManagerImpl.MIGRATE_VM_ACROSS_CLUSTERS;
+
 import java.net.URI;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -45,7 +47,6 @@ import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 import javax.persistence.EntityExistsException;
 
-import com.cloud.storage.VolumeApiServiceImpl;
 import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
 import org.apache.cloudstack.annotation.AnnotationService;
 import org.apache.cloudstack.annotation.dao.AnnotationDao;
@@ -98,6 +99,12 @@ import com.cloud.agent.api.CheckVirtualMachineCommand;
 import com.cloud.agent.api.ClusterVMMetaDataSyncAnswer;
 import com.cloud.agent.api.ClusterVMMetaDataSyncCommand;
 import com.cloud.agent.api.Command;
+import com.cloud.agent.api.GetVmDiskStatsAnswer;
+import com.cloud.agent.api.GetVmDiskStatsCommand;
+import com.cloud.agent.api.GetVmNetworkStatsAnswer;
+import com.cloud.agent.api.GetVmNetworkStatsCommand;
+import com.cloud.agent.api.GetVmStatsAnswer;
+import com.cloud.agent.api.GetVmStatsCommand;
 import com.cloud.agent.api.MigrateCommand;
 import com.cloud.agent.api.MigrateVmToPoolAnswer;
 import com.cloud.agent.api.ModifyTargetsCommand;
@@ -122,6 +129,9 @@ import com.cloud.agent.api.StopCommand;
 import com.cloud.agent.api.UnPlugNicAnswer;
 import com.cloud.agent.api.UnPlugNicCommand;
 import com.cloud.agent.api.UnregisterVMCommand;
+import com.cloud.agent.api.VmDiskStatsEntry;
+import com.cloud.agent.api.VmNetworkStatsEntry;
+import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.agent.api.routing.NetworkElementCommand;
 import com.cloud.agent.api.to.DiskTO;
 import com.cloud.agent.api.to.DpdkTO;
@@ -208,6 +218,7 @@ import com.cloud.storage.VMTemplateVO;
 import com.cloud.storage.Volume;
 import com.cloud.storage.Volume.Type;
 import com.cloud.storage.VolumeApiService;
+import com.cloud.storage.VolumeApiServiceImpl;
 import com.cloud.storage.VolumeVO;
 import com.cloud.storage.dao.DiskOfferingDao;
 import com.cloud.storage.dao.GuestOSCategoryDao;
@@ -254,8 +265,6 @@ import com.cloud.vm.snapshot.VMSnapshotManager;
 import com.cloud.vm.snapshot.VMSnapshotVO;
 import com.cloud.vm.snapshot.dao.VMSnapshotDao;
 
-import static com.cloud.configuration.ConfigurationManagerImpl.MIGRATE_VM_ACROSS_CLUSTERS;
-
 public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMachineManager, VmWorkJobHandler, Listener, Configurable {
     private static final Logger s_logger = Logger.getLogger(VirtualMachineManagerImpl.class);
 
@@ -5825,4 +5834,98 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         assert vm != null;
         return vm;
     }
+
+    @Override
+    public HashMap<Long, ? extends VmStats> getVirtualMachineStatistics(long hostId, String hostName, List<Long> vmIds) {
+        HashMap<Long, VmStatsEntry> vmStatsById = new HashMap<>();
+        if (CollectionUtils.isEmpty(vmIds)) {
+            return vmStatsById;
+        }
+        Map<Long, VMInstanceVO> vmMap = new HashMap<>();
+        for (Long vmId : vmIds) {
+            vmMap.put(vmId, _vmDao.findById(vmId));
+        }
+        return getVirtualMachineStatistics(hostId, hostName, vmMap);
+    }
+
+    @Override
+    public HashMap<Long, ? extends VmStats> getVirtualMachineStatistics(long hostId, String hostName, Map<Long, ? extends VirtualMachine> vmMap) {
+        HashMap<Long, VmStatsEntry> vmStatsById = new HashMap<>();
+        if (MapUtils.isEmpty(vmMap)) {
+            return vmStatsById;
+        }
+        Map<String, Long> vmNames = new HashMap<>();
+        for (Map.Entry<Long, ? extends VirtualMachine> vmEntry : vmMap.entrySet()) {
+            vmNames.put(vmEntry.getValue().getInstanceName(), vmEntry.getKey());
+        }
+        Answer answer = _agentMgr.easySend(hostId, new GetVmStatsCommand(new ArrayList<>(vmNames.keySet()), _hostDao.findById(hostId).getGuid(), hostName));
+        if (answer == null || !answer.getResult()) {
+            s_logger.warn("Unable to obtain VM statistics.");
+            return vmStatsById;
+        } else {
+            HashMap<String, VmStatsEntry> vmStatsByName = ((GetVmStatsAnswer)answer).getVmStatsMap();
+            if (vmStatsByName == null) {
+                s_logger.warn("Unable to obtain VM statistics.");
+                return vmStatsById;
+            }
+            for (Map.Entry<String, VmStatsEntry> entry : vmStatsByName.entrySet()) {
+                vmStatsById.put(vmNames.get(entry.getKey()), entry.getValue());
+            }
+        }
+        return vmStatsById;
+    }
+
+    @Override
+    public HashMap<Long, List<? extends VmDiskStats>> getVmDiskStatistics(long hostId, String hostName, Map<Long, ? extends VirtualMachine> vmMap) {
+        HashMap<Long, List<? extends  VmDiskStats>> vmDiskStatsById = new HashMap<>();
+        if (MapUtils.isEmpty(vmMap)) {
+            return vmDiskStatsById;
+        }
+        Map<String, Long> vmNames = new HashMap<>();
+        for (Map.Entry<Long, ? extends VirtualMachine> vmEntry : vmMap.entrySet()) {
+            vmNames.put(vmEntry.getValue().getInstanceName(), vmEntry.getKey());
+        }
+        Answer answer = _agentMgr.easySend(hostId, new GetVmDiskStatsCommand(new ArrayList<>(vmNames.keySet()), _hostDao.findById(hostId).getGuid(), hostName));
+        if (answer == null || !answer.getResult()) {
+            s_logger.warn("Unable to obtain VM disk statistics.");
+            return vmDiskStatsById;
+        } else {
+            HashMap<String, List<VmDiskStatsEntry>> vmDiskStatsByName = ((GetVmDiskStatsAnswer)answer).getVmDiskStatsMap();
+            if (vmDiskStatsByName == null) {
+                s_logger.warn("Unable to obtain VM disk statistics.");
+                return vmDiskStatsById;
+            }
+            for (Map.Entry<String, List<VmDiskStatsEntry>> entry: vmDiskStatsByName.entrySet()) {
+                vmDiskStatsById.put(vmNames.get(entry.getKey()), entry.getValue());
+            }
+        }
+        return vmDiskStatsById;
+    }
+
+    @Override
+    public HashMap<Long, List<? extends VmNetworkStats>> getVmNetworkStatistics(long hostId, String hostName, Map<Long, ? extends VirtualMachine> vmMap) {
+        HashMap<Long, List<? extends VmNetworkStats>> vmNetworkStatsById = new HashMap<>();
+        if (MapUtils.isEmpty(vmMap)) {
+            return vmNetworkStatsById;
+        }
+        Map<String, Long> vmNames = new HashMap<>();
+        for (Map.Entry<Long, ? extends VirtualMachine> vmEntry : vmMap.entrySet()) {
+            vmNames.put(vmEntry.getValue().getInstanceName(), vmEntry.getKey());
+        }
+        Answer answer = _agentMgr.easySend(hostId, new GetVmNetworkStatsCommand(new ArrayList<>(vmNames.keySet()), _hostDao.findById(hostId).getGuid(), hostName));
+        if (answer == null || !answer.getResult()) {
+            s_logger.warn("Unable to obtain VM network statistics.");
+            return vmNetworkStatsById;
+        } else {
+            HashMap<String, List<VmNetworkStatsEntry>> vmNetworkStatsByName = ((GetVmNetworkStatsAnswer)answer).getVmNetworkStatsMap();
+            if (vmNetworkStatsByName == null) {
+                s_logger.warn("Unable to obtain VM network statistics.");
+                return vmNetworkStatsById;
+            }
+            for (Map.Entry<String, List<VmNetworkStatsEntry>> entry: vmNetworkStatsByName.entrySet()) {
+                vmNetworkStatsById.put(vmNames.get(entry.getKey()), entry.getValue());
+            }
+        }
+        return vmNetworkStatsById;
+    }
 }
diff --git a/engine/schema/src/main/java/com/cloud/storage/VolumeStatsVO.java b/engine/schema/src/main/java/com/cloud/storage/VolumeStatsVO.java
new file mode 100644
index 00000000000..2d181742123
--- /dev/null
+++ b/engine/schema/src/main/java/com/cloud/storage/VolumeStatsVO.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.storage;
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+import org.apache.cloudstack.utils.reflectiontostringbuilderutils.ReflectionToStringBuilderUtils;
+
+@Entity
+@Table(name = "volume_stats")
+public class VolumeStatsVO {
+
+    @Id
+    @Column(name = "id", updatable = false, nullable = false)
+    protected long id;
+
+    @Column(name = "volume_id", updatable = false, nullable = false)
+    protected Long volumeId;
+
+    @Column(name = "mgmt_server_id", updatable = false, nullable = false)
+    protected Long mgmtServerId;
+
+    @Column(name= "timestamp", updatable = false)
+    @Temporal(value = TemporalType.TIMESTAMP)
+    protected Date timestamp;
+
+    @Column(name = "volume_stats_data", updatable = false, nullable = false, length = 65535)
+    protected String volumeStatsData;
+
+    public VolumeStatsVO(Long volumeId, Long mgmtServerId, Date timestamp, String volumeStatsData) {
+        this.volumeId = volumeId;
+        this.mgmtServerId = mgmtServerId;
+        this.timestamp = timestamp;
+        this.volumeStatsData = volumeStatsData;
+    }
+
+    public VolumeStatsVO() {
+
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public Long getVolumeId() {
+        return volumeId;
+    }
+
+    public Long getMgmtServerId() {
+        return mgmtServerId;
+    }
+
+    public Date getTimestamp() {
+        return timestamp;
+    }
+
+    public String getVolumeStatsData() {
+        return volumeStatsData;
+    }
+
+    @Override
+    public String toString() {
+        return ReflectionToStringBuilderUtils.reflectOnlySelectedFields(this, "vmId", "mgmtServerId", "timestamp", "volumeStatsData");
+    }
+}
diff --git a/engine/schema/src/main/java/com/cloud/storage/dao/VolumeStatsDao.java b/engine/schema/src/main/java/com/cloud/storage/dao/VolumeStatsDao.java
new file mode 100644
index 00000000000..7eb6c025446
--- /dev/null
+++ b/engine/schema/src/main/java/com/cloud/storage/dao/VolumeStatsDao.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.storage.dao;
+
+import java.util.Date;
+import java.util.List;
+
+import com.cloud.utils.db.GenericDao;
+import com.cloud.storage.VolumeStatsVO;
+
+/**
+ * Data Access Object for volume_stats table.
+ */
+public interface VolumeStatsDao extends GenericDao<VolumeStatsVO, Long> {
+
+    /**
+     * Finds Volume stats by Volume ID.
+     * @param volumeId the Volume ID.
+     * @return list of stats for the specified Volume.
+     */
+    List<VolumeStatsVO> findByVolumeId(long volumeId);
+
+    /**
+     * Finds Volume stats by Volume ID. The result is sorted by timestamp in descending order.
+     * @param volumeId the Volume ID.
+     * @return ordered list of stats for the specified Volume.
+     */
+    List<VolumeStatsVO> findByVolumeIdOrderByTimestampDesc(long volumeId);
+
+    /**
+     * Finds stats by Volume ID and timestamp >= a given time.
+     * @param volumeId the specific Volume.
+     * @param time the specific time.
+     * @return list of stats for the specified Volume, with timestamp >= the specified time.
+     */
+    List<VolumeStatsVO> findByVolumeIdAndTimestampGreaterThanEqual(long volumeId, Date time);
+
+    /**
+     * Finds stats by Volume ID and timestamp <= a given time.
+     * @param volumeId the specific Volume.
+     * @param time the specific time.
+     * @return list of stats for the specified Volume, with timestamp <= the specified time.
+     */
+    List<VolumeStatsVO> findByVolumeIdAndTimestampLessThanEqual(long volumeId, Date time);
+
+    /**
+     * Finds stats by Volume ID and timestamp between a given time range.
+     * @param volumeId the specific Volume.
+     * @param startTime the start time.
+     * @param endTime the start time.
+     * @return list of stats for the specified Volume, between the specified start and end times.
+     */
+    List<VolumeStatsVO> findByVolumeIdAndTimestampBetween(long volumeId, Date startTime, Date endTime);
+
+    /**
+     * Removes (expunges) all stats of the specified Volume.
+     * @param volumeId the Volume ID to remove stats.
+     */
+    void removeAllByVolumeId(long volumeId);
+
+    /**
+     * Removes (expunges) all Volume stats with {@code timestamp} less than
+     * a given Date.
+     * @param limit the maximum date to keep stored. Records that exceed this limit will be removed.
+     */
+    void removeAllByTimestampLessThan(Date limit);
+
+}
\ No newline at end of file
diff --git a/engine/schema/src/main/java/com/cloud/storage/dao/VolumeStatsDaoImpl.java b/engine/schema/src/main/java/com/cloud/storage/dao/VolumeStatsDaoImpl.java
new file mode 100644
index 00000000000..5d0d3c8921c
--- /dev/null
+++ b/engine/schema/src/main/java/com/cloud/storage/dao/VolumeStatsDaoImpl.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.storage.dao;
+
+import java.util.Date;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.stereotype.Component;
+
+import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.storage.VolumeStatsVO;
+
+@Component
+public class VolumeStatsDaoImpl extends GenericDaoBase<VolumeStatsVO, Long> implements VolumeStatsDao {
+
+    protected SearchBuilder<VolumeStatsVO> volumeIdSearch;
+    protected SearchBuilder<VolumeStatsVO> volumeIdTimestampGreaterThanEqualSearch;
+    protected SearchBuilder<VolumeStatsVO> volumeIdTimestampLessThanEqualSearch;
+    protected SearchBuilder<VolumeStatsVO> volumeIdTimestampBetweenSearch;
+    protected SearchBuilder<VolumeStatsVO> timestampSearch;
+
+    private final static String VOLUME_ID = "volumeId";
+    private final static String TIMESTAMP = "timestamp";
+
+    @PostConstruct
+    protected void init() {
+        volumeIdSearch = createSearchBuilder();
+        volumeIdSearch.and(VOLUME_ID, volumeIdSearch.entity().getVolumeId(), Op.EQ);
+        volumeIdSearch.done();
+
+        volumeIdTimestampGreaterThanEqualSearch = createSearchBuilder();
+        volumeIdTimestampGreaterThanEqualSearch.and(VOLUME_ID, volumeIdTimestampGreaterThanEqualSearch.entity().getVolumeId(), Op.EQ);
+        volumeIdTimestampGreaterThanEqualSearch.and(TIMESTAMP, volumeIdTimestampGreaterThanEqualSearch.entity().getTimestamp(), Op.GTEQ);
+        volumeIdTimestampGreaterThanEqualSearch.done();
+
+        volumeIdTimestampLessThanEqualSearch = createSearchBuilder();
+        volumeIdTimestampLessThanEqualSearch.and(VOLUME_ID, volumeIdTimestampLessThanEqualSearch.entity().getVolumeId(), Op.EQ);
+        volumeIdTimestampLessThanEqualSearch.and(TIMESTAMP, volumeIdTimestampLessThanEqualSearch.entity().getTimestamp(), Op.LTEQ);
+        volumeIdTimestampLessThanEqualSearch.done();
+
+        volumeIdTimestampBetweenSearch = createSearchBuilder();
+        volumeIdTimestampBetweenSearch.and(VOLUME_ID, volumeIdTimestampBetweenSearch.entity().getVolumeId(), Op.EQ);
+        volumeIdTimestampBetweenSearch.and(TIMESTAMP, volumeIdTimestampBetweenSearch.entity().getTimestamp(), Op.BETWEEN);
+        volumeIdTimestampBetweenSearch.done();
+
+        timestampSearch = createSearchBuilder();
+        timestampSearch.and(TIMESTAMP, timestampSearch.entity().getTimestamp(), Op.LT);
+        timestampSearch.done();
+
+    }
+
+    @Override
+    public List<VolumeStatsVO> findByVolumeId(long volumeId) {
+        SearchCriteria<VolumeStatsVO> sc = volumeIdSearch.create();
+        sc.setParameters(VOLUME_ID, volumeId);
+        return listBy(sc);
+    }
+
+    @Override
+    public List<VolumeStatsVO> findByVolumeIdOrderByTimestampDesc(long volumeId) {
+        SearchCriteria<VolumeStatsVO> sc = volumeIdSearch.create();
+        sc.setParameters(VOLUME_ID, volumeId);
+        Filter orderByFilter = new Filter(VolumeStatsVO.class, TIMESTAMP, false, null, null);
+        return search(sc, orderByFilter, null, false);
+    }
+
+    @Override
+    public List<VolumeStatsVO> findByVolumeIdAndTimestampGreaterThanEqual(long volumeId, Date time) {
+        SearchCriteria<VolumeStatsVO> sc = volumeIdTimestampGreaterThanEqualSearch.create();
+        sc.setParameters(VOLUME_ID, volumeId);
+        sc.setParameters(TIMESTAMP, time);
+        return listBy(sc);
+    }
+
+    @Override
+    public List<VolumeStatsVO> findByVolumeIdAndTimestampLessThanEqual(long volumeId, Date time) {
+        SearchCriteria<VolumeStatsVO> sc = volumeIdTimestampLessThanEqualSearch.create();
+        sc.setParameters(VOLUME_ID, volumeId);
+        sc.setParameters(TIMESTAMP, time);
+        return listBy(sc);
+    }
+
+    @Override
+    public List<VolumeStatsVO> findByVolumeIdAndTimestampBetween(long volumeId, Date startTime, Date endTime) {
+        SearchCriteria<VolumeStatsVO> sc = volumeIdTimestampBetweenSearch.create();
+        sc.setParameters(VOLUME_ID, volumeId);
+        sc.setParameters(TIMESTAMP, startTime, endTime);
+        return listBy(sc);
+    }
+
+    @Override
+    public void removeAllByVolumeId(long volumeId) {
+        SearchCriteria<VolumeStatsVO> sc = volumeIdSearch.create();
+        sc.setParameters(VOLUME_ID, volumeId);
+        expunge(sc);
+    }
+
+    @Override
+    public void removeAllByTimestampLessThan(Date limit) {
+        SearchCriteria<VolumeStatsVO> sc = timestampSearch.create();
+        sc.setParameters(TIMESTAMP, limit);
+        expunge(sc);
+    }
+}
diff --git a/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml b/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml
index cec86d2d7be..ebda8235fa4 100644
--- a/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml
+++ b/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml
@@ -226,6 +226,7 @@
   <bean id="volumeDetailsDaoImpl" class="com.cloud.storage.dao.VolumeDetailsDaoImpl" />
   <bean id="volumeJoinDaoImpl" class="com.cloud.api.query.dao.VolumeJoinDaoImpl" />
   <bean id="volumeReservationDaoImpl" class="org.apache.cloudstack.engine.cloud.entity.api.db.dao.VolumeReservationDaoImpl" />
+  <bean id="volumeStatsDaoImpl" class="com.cloud.storage.dao.VolumeStatsDaoImpl" />
   <bean id="vpcDaoImpl" class="com.cloud.network.vpc.dao.VpcDaoImpl" />
   <bean id="vpcGatewayDaoImpl" class="com.cloud.network.vpc.dao.VpcGatewayDaoImpl" />
   <bean id="vpcOfferingDaoImpl" class="com.cloud.network.vpc.dao.VpcOfferingDaoImpl" />
diff --git a/engine/schema/src/main/resources/META-INF/db/schema-41720to41800.sql b/engine/schema/src/main/resources/META-INF/db/schema-41720to41800.sql
index 34f138c7540..cc400143fb2 100644
--- a/engine/schema/src/main/resources/META-INF/db/schema-41720to41800.sql
+++ b/engine/schema/src/main/resources/META-INF/db/schema-41720to41800.sql
@@ -912,6 +912,17 @@ SET     description = "Use SSL method used to encrypt copy traffic between zones
 generating links for external access."
 WHERE   name = 'secstorage.encrypt.copy';
 
+-- Create table to persist volume stats.
+DROP TABLE IF EXISTS `cloud`.`volume_stats`;
+CREATE TABLE `cloud`.`volume_stats` (
+    `id` bigint unsigned NOT NULL auto_increment COMMENT 'id',
+    `volume_id` bigint unsigned NOT NULL,
+    `mgmt_server_id` bigint unsigned NOT NULL,
+    `timestamp` datetime NOT NULL,
+    `volume_stats_data` text NOT NULL,
+    PRIMARY KEY(`id`)
+  ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
 -- allow isolated networks without services to be used as is.
 UPDATE `cloud`.`networks` ntwk
   SET ntwk.state = 'Implemented'
diff --git a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java
index 11f19142833..178f67d3da2 100644
--- a/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java
+++ b/plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/LibvirtComputingResource.java
@@ -16,6 +16,8 @@
 // under the License.
 package com.cloud.hypervisor.kvm.resource;
 
+import static com.cloud.host.Host.HOST_VOLUME_ENCRYPTION;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -31,6 +33,7 @@ import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -53,7 +56,6 @@ import org.apache.cloudstack.storage.to.TemplateObjectTO;
 import org.apache.cloudstack.storage.to.VolumeObjectTO;
 import org.apache.cloudstack.utils.bytescale.ByteScaleUtils;
 import org.apache.cloudstack.utils.cryptsetup.CryptSetup;
-
 import org.apache.cloudstack.utils.hypervisor.HypervisorUtils;
 import org.apache.cloudstack.utils.linux.CPUStat;
 import org.apache.cloudstack.utils.linux.KVMHostInfo;
@@ -86,8 +88,8 @@ import org.libvirt.MemoryStatistic;
 import org.libvirt.Network;
 import org.libvirt.SchedParameter;
 import org.libvirt.SchedUlongParameter;
-import org.libvirt.VcpuInfo;
 import org.libvirt.Secret;
+import org.libvirt.VcpuInfo;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
@@ -188,18 +190,17 @@ import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.Pair;
 import com.cloud.utils.PropertiesUtil;
 import com.cloud.utils.Ternary;
+import com.cloud.utils.UuidUtils;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.net.NetUtils;
 import com.cloud.utils.script.OutputInterpreter;
 import com.cloud.utils.script.OutputInterpreter.AllLinesParser;
 import com.cloud.utils.script.Script;
 import com.cloud.utils.ssh.SshHelper;
-import com.cloud.utils.UuidUtils;
 import com.cloud.vm.VirtualMachine;
 import com.cloud.vm.VirtualMachine.PowerState;
 import com.cloud.vm.VmDetailConstants;
-
-import static com.cloud.host.Host.HOST_VOLUME_ENCRYPTION;
+import com.google.gson.Gson;
 
 /**
  * LibvirtComputingResource execute requests on the computing/routing host using
@@ -417,6 +418,8 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
     private final Map <String, String> _pifs = new HashMap<String, String>();
     private final Map<String, VmStats> _vmStats = new ConcurrentHashMap<String, VmStats>();
 
+    private final Map<String, DomainBlockStats> vmDiskStats = new ConcurrentHashMap<>();
+
     protected static final HashMap<DomainState, PowerState> s_powerStatesTable;
     static {
         s_powerStatesTable = new HashMap<DomainState, PowerState>();
@@ -448,6 +451,8 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
 
     protected LibvirtDomainXMLParser parser = new LibvirtDomainXMLParser();
 
+    private static Gson gson = new Gson();
+
     /**
      * Virsh command to set the memory balloon stats period.<br><br>
      * 1st parameter: the VM ID or name;<br>
@@ -4035,7 +4040,7 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
         try {
             dm = getDomain(conn, vmName);
 
-            final List<VmDiskStatsEntry> stats = new ArrayList<VmDiskStatsEntry>();
+            final List<VmDiskStatsEntry> stats = new ArrayList<>();
 
             final List<DiskDef> disks = getDisks(conn, vmName);
 
@@ -4047,7 +4052,27 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
                 String diskPath = getDiskPathFromDiskDef(disk);
                 if (diskPath != null) {
                     final VmDiskStatsEntry stat = new VmDiskStatsEntry(vmName, diskPath, blockStats.wr_req, blockStats.rd_req, blockStats.wr_bytes, blockStats.rd_bytes);
+                    final DomainBlockStats oldStats = vmDiskStats.get(String.format("%s-%s", vmName, diskPath));
+                    if (oldStats != null) {
+                        final long deltaiord = blockStats.rd_req - oldStats.rd_req;
+                        if (deltaiord > 0) {
+                            stat.setDeltaIoRead(deltaiord);
+                        }
+                        final long deltaiowr = blockStats.wr_req - oldStats.wr_req;
+                        if (deltaiowr > 0) {
+                            stat.setDeltaIoWrite(deltaiowr);
+                        }
+                        final long deltabytesrd = blockStats.rd_bytes - oldStats.rd_bytes;
+                        if (deltabytesrd > 0) {
+                            stat.setDeltaBytesRead(deltabytesrd);
+                        }
+                        final long deltabyteswr = blockStats.wr_bytes - oldStats.wr_bytes;
+                        if (deltabyteswr > 0) {
+                            stat.setDeltaBytesWrite(deltabyteswr);
+                        }
+                    }
                     stats.add(stat);
+                    vmDiskStats.put(String.format("%s-%s", vmName, diskPath), blockStats);
                 }
             }
 
@@ -4155,6 +4180,7 @@ public class LibvirtComputingResource extends ServerResourceBase implements Serv
                     continue;
                 }
                 final DomainBlockStats blockStats = dm.blockStats(disk.getDiskLabel());
+                s_logger.info(String.format("STATS_LOG getVm****Stat @ %s: Disk: %s---------------%s", new Date(), disk.getDiskLabel(), gson.toJson(blockStats)));
                 io_rd += blockStats.rd_req;
                 io_wr += blockStats.wr_req;
                 bytes_rd += blockStats.rd_bytes;
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/api/BaseResourceUsageHistoryCmd.java b/plugins/metrics/src/main/java/org/apache/cloudstack/api/BaseResourceUsageHistoryCmd.java
new file mode 100644
index 00000000000..815852d11c7
--- /dev/null
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/api/BaseResourceUsageHistoryCmd.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.api;
+
+import java.util.Date;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.metrics.MetricsService;
+
+public abstract class BaseResourceUsageHistoryCmd extends BaseListCmd {
+
+    @Inject
+    protected MetricsService metricsService;
+
+    // ///////////////////////////////////////////////////
+    // /// BaseResourceUsageHistoryCmd API parameters ////
+    // ///////////////////////////////////////////////////
+
+    @Parameter(name = ApiConstants.START_DATE, type = CommandType.DATE, description = "start date to filter stats."
+            + "Use format \"yyyy-MM-dd hh:mm:ss\")")
+    private Date startDate;
+
+    @Parameter(name = ApiConstants.END_DATE, type = CommandType.DATE, description = "end date to filter stats."
+            + "Use format \"yyyy-MM-dd hh:mm:ss\")")
+    private Date endDate;
+
+    // ///////////////////////////////////////////////////
+    // ///////////////// Accessors ///////////////////////
+    // ///////////////////////////////////////////////////
+
+    public Date getStartDate() {
+        return startDate;
+    }
+
+    public Date getEndDate() {
+        return endDate;
+    }
+}
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java b/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListSystemVMsUsageHistoryCmd.java
similarity index 67%
copy from plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java
copy to plugins/metrics/src/main/java/org/apache/cloudstack/api/ListSystemVMsUsageHistoryCmd.java
index 2b2fa310d2e..e2d3af24aef 100644
--- a/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListSystemVMsUsageHistoryCmd.java
@@ -17,46 +17,31 @@
 
 package org.apache.cloudstack.api;
 
-import java.util.Date;
 import java.util.List;
 
-import javax.inject.Inject;
-
 import org.apache.cloudstack.acl.RoleType;
 import org.apache.cloudstack.api.response.ListResponse;
 import org.apache.cloudstack.api.response.UserVmResponse;
-import org.apache.cloudstack.metrics.MetricsService;
 import org.apache.cloudstack.response.VmMetricsStatsResponse;
 
-@APICommand(name = "listVirtualMachinesUsageHistory", description = "Lists VM stats", responseObject = VmMetricsStatsResponse.class,
-        requestHasSensitiveInfo = false, responseHasSensitiveInfo = false, since = "4.17",
-        authorized = {RoleType.Admin,  RoleType.ResourceAdmin, RoleType.DomainAdmin, RoleType.User})
-public class ListVMsUsageHistoryCmd extends BaseListCmd {
-
-    @Inject
-    private MetricsService metricsService;
+@APICommand(name = "listSystemVmsUsageHistory", description = "Lists System VM stats", responseObject = VmMetricsStatsResponse.class,
+        requestHasSensitiveInfo = false, responseHasSensitiveInfo = false, since = "4.18.0",
+        authorized = {RoleType.Admin,  RoleType.ResourceAdmin, RoleType.DomainAdmin})
+public class ListSystemVMsUsageHistoryCmd extends BaseResourceUsageHistoryCmd {
 
     /////////////////////////////////////////////////////
     //////////////// API parameters /////////////////////
     /////////////////////////////////////////////////////
 
-    @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = UserVmResponse.class, description = "the ID of the virtual machine.")
+    @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = UserVmResponse.class, description = "the ID of the system VM.")
     private Long id;
 
-    @Parameter(name=ApiConstants.IDS, type=CommandType.LIST, collectionType=CommandType.UUID, entityType=UserVmResponse.class, description="the IDs of the virtual machines, mutually exclusive with id.")
+    @Parameter(name=ApiConstants.IDS, type=CommandType.LIST, collectionType=CommandType.UUID, entityType=UserVmResponse.class, description="the IDs of the system VMs, mutually exclusive with id.")
     private List<Long> ids;
 
-    @Parameter(name = ApiConstants.NAME, type = CommandType.STRING, description = "name of the virtual machine (a substring match is made against the parameter value returning the data for all matching VMs).")
+    @Parameter(name = ApiConstants.NAME, type = CommandType.STRING, description = "name of the system VMs (a substring match is made against the parameter value returning the data for all matching VMs).")
     private String name;
 
-    @Parameter(name = ApiConstants.START_DATE, type = CommandType.DATE, description = "start date to filter VM stats."
-            + "Use format \"yyyy-MM-dd hh:mm:ss\")")
-    private Date startDate;
-
-    @Parameter(name = ApiConstants.END_DATE, type = CommandType.DATE, description = "end date to filter VM stats."
-            + "Use format \"yyyy-MM-dd hh:mm:ss\")")
-    private Date endDate;
-
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -73,21 +58,13 @@ public class ListVMsUsageHistoryCmd extends BaseListCmd {
         return name;
     }
 
-    public Date getStartDate() {
-        return startDate;
-    }
-
-    public Date getEndDate() {
-        return endDate;
-    }
-
     /////////////////////////////////////////////////////
     /////////////// API Implementation///////////////////
     /////////////////////////////////////////////////////
 
     @Override
     public void execute() {
-        ListResponse<VmMetricsStatsResponse> response = metricsService.searchForVmMetricsStats(this);
+        ListResponse<VmMetricsStatsResponse> response = metricsService.searchForSystemVmMetricsStats(this);
         response.setResponseName(getCommandName());
         setResponseObject(response);
     }
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java b/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java
index 2b2fa310d2e..860c130fe92 100644
--- a/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java
@@ -17,24 +17,17 @@
 
 package org.apache.cloudstack.api;
 
-import java.util.Date;
 import java.util.List;
 
-import javax.inject.Inject;
-
 import org.apache.cloudstack.acl.RoleType;
 import org.apache.cloudstack.api.response.ListResponse;
 import org.apache.cloudstack.api.response.UserVmResponse;
-import org.apache.cloudstack.metrics.MetricsService;
 import org.apache.cloudstack.response.VmMetricsStatsResponse;
 
 @APICommand(name = "listVirtualMachinesUsageHistory", description = "Lists VM stats", responseObject = VmMetricsStatsResponse.class,
         requestHasSensitiveInfo = false, responseHasSensitiveInfo = false, since = "4.17",
         authorized = {RoleType.Admin,  RoleType.ResourceAdmin, RoleType.DomainAdmin, RoleType.User})
-public class ListVMsUsageHistoryCmd extends BaseListCmd {
-
-    @Inject
-    private MetricsService metricsService;
+public class ListVMsUsageHistoryCmd extends BaseResourceUsageHistoryCmd {
 
     /////////////////////////////////////////////////////
     //////////////// API parameters /////////////////////
@@ -49,14 +42,6 @@ public class ListVMsUsageHistoryCmd extends BaseListCmd {
     @Parameter(name = ApiConstants.NAME, type = CommandType.STRING, description = "name of the virtual machine (a substring match is made against the parameter value returning the data for all matching VMs).")
     private String name;
 
-    @Parameter(name = ApiConstants.START_DATE, type = CommandType.DATE, description = "start date to filter VM stats."
-            + "Use format \"yyyy-MM-dd hh:mm:ss\")")
-    private Date startDate;
-
-    @Parameter(name = ApiConstants.END_DATE, type = CommandType.DATE, description = "end date to filter VM stats."
-            + "Use format \"yyyy-MM-dd hh:mm:ss\")")
-    private Date endDate;
-
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -73,14 +58,6 @@ public class ListVMsUsageHistoryCmd extends BaseListCmd {
         return name;
     }
 
-    public Date getStartDate() {
-        return startDate;
-    }
-
-    public Date getEndDate() {
-        return endDate;
-    }
-
     /////////////////////////////////////////////////////
     /////////////// API Implementation///////////////////
     /////////////////////////////////////////////////////
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java b/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVolumesUsageHistoryCmd.java
similarity index 60%
copy from plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java
copy to plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVolumesUsageHistoryCmd.java
index 2b2fa310d2e..4e9191a16f8 100644
--- a/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVolumesUsageHistoryCmd.java
@@ -17,46 +17,32 @@
 
 package org.apache.cloudstack.api;
 
-import java.util.Date;
 import java.util.List;
 
-import javax.inject.Inject;
-
 import org.apache.cloudstack.acl.RoleType;
 import org.apache.cloudstack.api.response.ListResponse;
-import org.apache.cloudstack.api.response.UserVmResponse;
-import org.apache.cloudstack.metrics.MetricsService;
-import org.apache.cloudstack.response.VmMetricsStatsResponse;
+import org.apache.cloudstack.api.response.SystemVmResponse;
+import org.apache.cloudstack.api.response.VolumeResponse;
+import org.apache.cloudstack.response.VolumeMetricsStatsResponse;
 
-@APICommand(name = "listVirtualMachinesUsageHistory", description = "Lists VM stats", responseObject = VmMetricsStatsResponse.class,
-        requestHasSensitiveInfo = false, responseHasSensitiveInfo = false, since = "4.17",
+@APICommand(name = "listVolumesUsageHistory", description = "Lists volume stats", responseObject = VolumeMetricsStatsResponse.class,
+        requestHasSensitiveInfo = false, responseHasSensitiveInfo = false, since = "4.18.0",
         authorized = {RoleType.Admin,  RoleType.ResourceAdmin, RoleType.DomainAdmin, RoleType.User})
-public class ListVMsUsageHistoryCmd extends BaseListCmd {
-
-    @Inject
-    private MetricsService metricsService;
+public class ListVolumesUsageHistoryCmd extends BaseResourceUsageHistoryCmd {
 
     /////////////////////////////////////////////////////
     //////////////// API parameters /////////////////////
     /////////////////////////////////////////////////////
 
-    @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = UserVmResponse.class, description = "the ID of the virtual machine.")
+    @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = VolumeResponse.class, description = "the ID of the volume.")
     private Long id;
 
-    @Parameter(name=ApiConstants.IDS, type=CommandType.LIST, collectionType=CommandType.UUID, entityType=UserVmResponse.class, description="the IDs of the virtual machines, mutually exclusive with id.")
+    @Parameter(name=ApiConstants.IDS, type=CommandType.LIST, collectionType=CommandType.UUID, entityType= SystemVmResponse.class, description="the IDs of the volumes, mutually exclusive with id.")
     private List<Long> ids;
 
-    @Parameter(name = ApiConstants.NAME, type = CommandType.STRING, description = "name of the virtual machine (a substring match is made against the parameter value returning the data for all matching VMs).")
+    @Parameter(name = ApiConstants.NAME, type = CommandType.STRING, description = "name of the volume (a substring match is made against the parameter value returning the data for all matching Volumes).")
     private String name;
 
-    @Parameter(name = ApiConstants.START_DATE, type = CommandType.DATE, description = "start date to filter VM stats."
-            + "Use format \"yyyy-MM-dd hh:mm:ss\")")
-    private Date startDate;
-
-    @Parameter(name = ApiConstants.END_DATE, type = CommandType.DATE, description = "end date to filter VM stats."
-            + "Use format \"yyyy-MM-dd hh:mm:ss\")")
-    private Date endDate;
-
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -73,21 +59,13 @@ public class ListVMsUsageHistoryCmd extends BaseListCmd {
         return name;
     }
 
-    public Date getStartDate() {
-        return startDate;
-    }
-
-    public Date getEndDate() {
-        return endDate;
-    }
-
     /////////////////////////////////////////////////////
     /////////////// API Implementation///////////////////
     /////////////////////////////////////////////////////
 
     @Override
     public void execute() {
-        ListResponse<VmMetricsStatsResponse> response = metricsService.searchForVmMetricsStats(this);
+        ListResponse<VolumeMetricsStatsResponse> response = metricsService.searchForVolumeMetricsStats(this);
         response.setResponseName(getCommandName());
         setResponseObject(response);
     }
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsService.java b/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsService.java
index ee67fe86ba7..48033dd7538 100644
--- a/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsService.java
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsService.java
@@ -17,10 +17,11 @@
 
 package org.apache.cloudstack.metrics;
 
-import com.cloud.utils.Pair;
-import com.cloud.utils.component.PluggableService;
+import java.util.List;
 
+import org.apache.cloudstack.api.ListSystemVMsUsageHistoryCmd;
 import org.apache.cloudstack.api.ListVMsUsageHistoryCmd;
+import org.apache.cloudstack.api.ListVolumesUsageHistoryCmd;
 import org.apache.cloudstack.api.response.ClusterResponse;
 import org.apache.cloudstack.api.response.HostResponse;
 import org.apache.cloudstack.api.response.ListResponse;
@@ -39,14 +40,18 @@ import org.apache.cloudstack.response.UsageServerMetricsResponse;
 import org.apache.cloudstack.response.VmMetricsResponse;
 import org.apache.cloudstack.response.VmMetricsStatsResponse;
 import org.apache.cloudstack.response.VolumeMetricsResponse;
+import org.apache.cloudstack.response.VolumeMetricsStatsResponse;
 import org.apache.cloudstack.response.ZoneMetricsResponse;
 
-import java.util.List;
+import com.cloud.utils.Pair;
+import com.cloud.utils.component.PluggableService;
 
 public interface MetricsService extends PluggableService {
     InfrastructureResponse listInfrastructure();
 
     ListResponse<VmMetricsStatsResponse> searchForVmMetricsStats(ListVMsUsageHistoryCmd cmd);
+    ListResponse<VmMetricsStatsResponse> searchForSystemVmMetricsStats(ListSystemVMsUsageHistoryCmd cmd);
+    ListResponse<VolumeMetricsStatsResponse> searchForVolumeMetricsStats(ListVolumesUsageHistoryCmd cmd);
     List<VolumeMetricsResponse> listVolumeMetrics(List<VolumeResponse> volumeResponses);
     List<VmMetricsResponse> listVmMetrics(List<UserVmResponse> vmResponses);
     List<StoragePoolMetricsResponse> listStoragePoolMetrics(List<StoragePoolResponse> poolResponses);
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsServiceImpl.java b/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsServiceImpl.java
index 079a2d4fc85..7d1f74ed924 100644
--- a/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsServiceImpl.java
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsServiceImpl.java
@@ -37,11 +37,13 @@ import org.apache.cloudstack.api.ListHostsMetricsCmd;
 import org.apache.cloudstack.api.ListInfrastructureCmd;
 import org.apache.cloudstack.api.ListMgmtsMetricsCmd;
 import org.apache.cloudstack.api.ListStoragePoolsMetricsCmd;
+import org.apache.cloudstack.api.ListSystemVMsUsageHistoryCmd;
 import org.apache.cloudstack.api.ListUsageServerMetricsCmd;
 import org.apache.cloudstack.api.ListVMsMetricsCmd;
 import org.apache.cloudstack.api.ListVMsMetricsCmdByAdmin;
 import org.apache.cloudstack.api.ListVMsUsageHistoryCmd;
 import org.apache.cloudstack.api.ListVolumesMetricsCmd;
+import org.apache.cloudstack.api.ListVolumesUsageHistoryCmd;
 import org.apache.cloudstack.api.ListZonesMetricsCmd;
 import org.apache.cloudstack.api.ServerApiException;
 import org.apache.cloudstack.api.response.ClusterResponse;
@@ -66,9 +68,11 @@ import org.apache.cloudstack.response.UsageServerMetricsResponse;
 import org.apache.cloudstack.response.VmMetricsResponse;
 import org.apache.cloudstack.response.VmMetricsStatsResponse;
 import org.apache.cloudstack.response.VolumeMetricsResponse;
+import org.apache.cloudstack.response.VolumeMetricsStatsResponse;
 import org.apache.cloudstack.response.ZoneMetricsResponse;
 import org.apache.cloudstack.storage.datastore.db.ImageStoreDao;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.utils.bytescale.ByteScaleUtils;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -76,6 +80,7 @@ import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.log4j.Logger;
 
+import com.cloud.agent.api.VmDiskStatsEntry;
 import com.cloud.agent.api.VmStatsEntryBase;
 import com.cloud.alert.AlertManager;
 import com.cloud.alert.dao.AlertDao;
@@ -105,6 +110,10 @@ import com.cloud.org.Managed;
 import com.cloud.server.DbStatsCollection;
 import com.cloud.server.ManagementServerHostStats;
 import com.cloud.server.StatsCollector;
+import com.cloud.storage.VolumeStatsVO;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.VolumeDao;
+import com.cloud.storage.dao.VolumeStatsDao;
 import com.cloud.usage.UsageJobVO;
 import com.cloud.usage.dao.UsageJobDao;
 import com.cloud.user.Account;
@@ -162,6 +171,10 @@ public class MetricsServiceImpl extends MutualExclusiveIdsManagerBase implements
     protected VmStatsDao vmStatsDao;
     @Inject
     private UsageJobDao usageJobDao;
+    @Inject
+    private VolumeDao volumeDao;
+    @Inject
+    private VolumeStatsDao volumeStatsDao;
 
     private static Gson gson = new Gson();
 
@@ -197,8 +210,34 @@ public class MetricsServiceImpl extends MutualExclusiveIdsManagerBase implements
     @Override
     public ListResponse<VmMetricsStatsResponse> searchForVmMetricsStats(ListVMsUsageHistoryCmd cmd) {
         Pair<List<UserVmVO>, Integer> userVmList = searchForUserVmsInternal(cmd);
-        Map<Long,List<VmStatsVO>> vmStatsList = searchForVmMetricsStatsInternal(cmd, userVmList.first());
-        return createVmMetricsStatsResponse(userVmList, vmStatsList);
+        Map<Long,List<VmStatsVO>> vmStatsList = searchForVmMetricsStatsInternal(cmd.getStartDate(), cmd.getEndDate(), userVmList.first());
+        return createVmMetricsStatsResponse(userVmList.first(), vmStatsList);
+    }
+
+    /**
+     * Searches for VM stats based on the {@code ListVMsUsageHistoryCmd} parameters.
+     *
+     * @param cmd the {@link ListVMsUsageHistoryCmd} specifying what should be searched.
+     * @return the list of VM metrics stats found.
+     */
+    @Override
+    public ListResponse<VmMetricsStatsResponse> searchForSystemVmMetricsStats(ListSystemVMsUsageHistoryCmd cmd) {
+        Pair<List<VMInstanceVO>, Integer> vmList = searchForSystemVmsInternal(cmd);
+        Map<Long,List<VmStatsVO>> vmStatsList = searchForVmMetricsStatsInternal(cmd.getStartDate(), cmd.getEndDate(), vmList.first());
+        return createVmMetricsStatsResponse(vmList.first(), vmStatsList);
+    }
+
+    /**
+     * Searches for Volume stats based on the {@code ListVolumesUsageHistoryCmd} parameters.
+     *
+     * @param cmd the {@link ListVolumesUsageHistoryCmd} specifying what should be searched.
+     * @return the list of VM metrics stats found.
+     */
+    @Override
+    public ListResponse<VolumeMetricsStatsResponse> searchForVolumeMetricsStats(ListVolumesUsageHistoryCmd cmd) {
+        Pair<List<VolumeVO>, Integer> volumeList = searchForVolumesInternal(cmd);
+        Map<Long,List<VolumeStatsVO>> volumeStatsList = searchForVolumeMetricsStatsInternal(cmd, volumeList.first());
+        return createVolumeMetricsStatsResponse(volumeList, volumeStatsList);
     }
 
     /**
@@ -235,23 +274,109 @@ public class MetricsServiceImpl extends MutualExclusiveIdsManagerBase implements
         return userVmDao.searchAndCount(sc, searchFilter);
     }
 
+    /**
+     * Searches System VMs based on {@code ListSystemVMsUsageHistoryCmd} parameters.
+     *
+     * @param cmd the {@link ListSystemVMsUsageHistoryCmd} specifying the parameters.
+     * @return the list of VMs.
+     */
+    protected Pair<List<VMInstanceVO>, Integer> searchForSystemVmsInternal(ListSystemVMsUsageHistoryCmd cmd) {
+        Filter searchFilter = new Filter(VMInstanceVO.class, "id", true, cmd.getStartIndex(), cmd.getPageSizeVal());
+        List<Long> ids = getIdsListFromCmd(cmd.getId(), cmd.getIds());
+        String keyword = cmd.getKeyword();
+
+        SearchBuilder<VMInstanceVO> sb =  vmInstanceDao.createSearchBuilder();
+        sb.and("idIN", sb.entity().getId(), SearchCriteria.Op.IN);
+        sb.and("name", sb.entity().getName(), SearchCriteria.Op.LIKE);
+        sb.and("state", sb.entity().getState(), SearchCriteria.Op.EQ);
+        sb.and("type", sb.entity().getType(), SearchCriteria.Op.NEQ);
+
+        SearchCriteria<VMInstanceVO> sc = sb.create();
+        sc.setParameters("type", VirtualMachine.Type.User.toString());
+        if (CollectionUtils.isNotEmpty(ids)) {
+            sc.setParameters("idIN", ids.toArray());
+        }
+        if (StringUtils.isNotBlank(keyword)) {
+            SearchCriteria<VMInstanceVO> ssc = vmInstanceDao.createSearchCriteria();
+            ssc.addOr("name", SearchCriteria.Op.LIKE, "%" + keyword + "%");
+            ssc.addOr("state", SearchCriteria.Op.EQ, keyword);
+            sc.addAnd("name", SearchCriteria.Op.SC, ssc);
+        }
+
+        return vmInstanceDao.searchAndCount(sc, searchFilter);
+    }
+
     /**
      * Searches stats for a list of VMs, based on date filtering parameters.
      *
-     * @param cmd the {@link ListVMsUsageHistoryCmd} specifying the filtering parameters.
-     * @param userVmList the list of VMs for which stats should be searched.
+     * @param startDate the start date for which stats should be searched.
+     * @param endDate the end date for which stats should be searched.
+     * @param vmList the list of VMs for which stats should be searched.
      * @return the key-value map in which keys are VM IDs and values are lists of VM stats.
      */
-    protected Map<Long,List<VmStatsVO>> searchForVmMetricsStatsInternal(ListVMsUsageHistoryCmd cmd, List<UserVmVO> userVmList) {
-        Map<Long,List<VmStatsVO>> vmStatsVOList = new HashMap<Long,List<VmStatsVO>>();
+    protected Map<Long,List<VmStatsVO>> searchForVmMetricsStatsInternal(Date startDate, Date endDate, List<? extends VMInstanceVO> vmList) {
+        Map<Long,List<VmStatsVO>> vmStatsVOList = new HashMap<>();
+        validateDateParams(startDate, endDate);
+
+        for (VMInstanceVO vmInstanceVO : vmList) {
+            Long vmId = vmInstanceVO.getId();
+            vmStatsVOList.put(vmId, findVmStatsAccordingToDateParams(vmId, startDate, endDate));
+        }
+
+        return vmStatsVOList;
+    }
+
+    /**
+     * Searches Volumes based on {@code ListVolumesUsageHistoryCmd} parameters.
+     *
+     * @param cmd the {@link ListVolumesUsageHistoryCmd} specifying the parameters.
+     * @return the list of VMs.
+     */
+    protected Pair<List<VolumeVO>, Integer> searchForVolumesInternal(ListVolumesUsageHistoryCmd cmd) {
+        Filter searchFilter = new Filter(VolumeVO.class, "id", true, cmd.getStartIndex(), cmd.getPageSizeVal());
+        List<Long> ids = getIdsListFromCmd(cmd.getId(), cmd.getIds());
+        String name = cmd.getName();
+        String keyword = cmd.getKeyword();
+
+        SearchBuilder<VolumeVO> sb =  volumeDao.createSearchBuilder();
+        sb.and("idIN", sb.entity().getId(), SearchCriteria.Op.IN);
+        sb.and("name", sb.entity().getName(), SearchCriteria.Op.LIKE);
+        sb.and("state", sb.entity().getState(), SearchCriteria.Op.EQ);
+
+        SearchCriteria<VolumeVO> sc = sb.create();
+        if (CollectionUtils.isNotEmpty(ids)) {
+            sc.setParameters("idIN", ids.toArray());
+        }
+        if (StringUtils.isNotBlank(name)) {
+            sc.setParameters("name", "%" + name + "%");
+        }
+        if (StringUtils.isNotBlank(keyword)) {
+            SearchCriteria<VolumeVO> ssc = volumeDao.createSearchCriteria();
+            ssc.addOr("name", SearchCriteria.Op.LIKE, "%" + keyword + "%");
+            ssc.addOr("state", SearchCriteria.Op.EQ, keyword);
+            sc.addAnd("name", SearchCriteria.Op.SC, ssc);
+        }
+
+        return volumeDao.searchAndCount(sc, searchFilter);
+    }
+
+    /**
+     * Searches stats for a list of Volumes, based on date filtering parameters.
+     *
+     * @param cmd the {@link ListVolumesUsageHistoryCmd} specifying the filtering parameters.
+     * @param volumeList the list of Volumes for which stats should be searched.
+     * @return the key-value map in which keys are Volume IDs and values are lists of Volume stats.
+     */
+    protected Map<Long,List<VolumeStatsVO>> searchForVolumeMetricsStatsInternal(ListVolumesUsageHistoryCmd cmd, List<VolumeVO> volumeList) {
+        Map<Long,List<VolumeStatsVO>> vmStatsVOList = new HashMap<>();
         Date startDate = cmd.getStartDate();
         Date endDate = cmd.getEndDate();
 
         validateDateParams(startDate, endDate);
 
-        for (UserVmVO userVmVO : userVmList) {
-            Long vmId = userVmVO.getId();
-            vmStatsVOList.put(vmId, findVmStatsAccordingToDateParams(vmId, startDate, endDate));
+        for (VolumeVO volumeVO : volumeList) {
+            Long volumeId = volumeVO.getId();
+            vmStatsVOList.put(volumeId, findVolumeStatsAccordingToDateParams(volumeId, startDate, endDate));
         }
 
         return vmStatsVOList;
@@ -295,21 +420,22 @@ public class MetricsServiceImpl extends MutualExclusiveIdsManagerBase implements
      * Creates a {@code ListResponse<VmMetricsStatsResponse>}. For each VM, this joins essential VM info
      * with its respective list of stats.
      *
-     * @param userVmList the list of VMs.
+     * @param vmList the list of VMs.
      * @param vmStatsList the respective list of stats.
      * @return the list of responses that was created.
      */
-    protected ListResponse<VmMetricsStatsResponse> createVmMetricsStatsResponse(Pair<List<UserVmVO>, Integer> userVmList,
+    protected ListResponse<VmMetricsStatsResponse> createVmMetricsStatsResponse(List<? extends VMInstanceVO> vmList,
             Map<Long,List<VmStatsVO>> vmStatsList) {
-        List<VmMetricsStatsResponse> responses = new ArrayList<VmMetricsStatsResponse>();
-        for (UserVmVO userVmVO : userVmList.first()) {
+        List<VmMetricsStatsResponse> responses = new ArrayList<>();
+        for (VMInstanceVO vmVO : vmList) {
             VmMetricsStatsResponse vmMetricsStatsResponse = new VmMetricsStatsResponse();
             vmMetricsStatsResponse.setObjectName("virtualmachine");
-            vmMetricsStatsResponse.setId(userVmVO.getUuid());
-            vmMetricsStatsResponse.setName(userVmVO.getName());
-            vmMetricsStatsResponse.setDisplayName(userVmVO.getDisplayName());
-
-            vmMetricsStatsResponse.setStats(createStatsResponse(vmStatsList.get(userVmVO.getId())));
+            vmMetricsStatsResponse.setId(vmVO.getUuid());
+            vmMetricsStatsResponse.setName(vmVO.getName());
+            if (vmVO instanceof UserVmVO) {
+                vmMetricsStatsResponse.setDisplayName(((UserVmVO) vmVO).getDisplayName());
+            }
+            vmMetricsStatsResponse.setStats(createStatsResponse(vmStatsList.get(vmVO.getId())));
             responses.add(vmMetricsStatsResponse);
         }
 
@@ -318,6 +444,7 @@ public class MetricsServiceImpl extends MutualExclusiveIdsManagerBase implements
         return response;
     }
 
+
     /**
      * Creates a {@code Set<StatsResponse>} from a given {@code List<VmStatsVO>}.
      *
@@ -351,6 +478,75 @@ public class MetricsServiceImpl extends MutualExclusiveIdsManagerBase implements
         return statsResponseList;
     }
 
+    /**
+     * Finds stats for a specific Volume based on date parameters.
+     *
+     * @param volumeId the specific Volume.
+     * @param startDate the start date to filtering.
+     * @param endDate the end date to filtering.
+     * @return the list of stats for the specified Volume.
+     */
+    protected List<VolumeStatsVO> findVolumeStatsAccordingToDateParams(Long volumeId, Date startDate, Date endDate){
+        if (startDate != null && endDate != null) {
+            return volumeStatsDao.findByVolumeIdAndTimestampBetween(volumeId, startDate, endDate);
+        }
+        if (startDate != null) {
+            return volumeStatsDao.findByVolumeIdAndTimestampGreaterThanEqual(volumeId, startDate);
+        }
+        if (endDate != null) {
+            return volumeStatsDao.findByVolumeIdAndTimestampLessThanEqual(volumeId, endDate);
+        }
+        return volumeStatsDao.findByVolumeId(volumeId);
+    }
+
+    /**
+     * Creates a {@code ListResponse<VmMetricsStatsResponse>}. For each VM, this joins essential VM info
+     * with its respective list of stats.
+     *
+     * @param volumeList the list of VMs.
+     * @param volumeStatsList the respective list of stats.
+     * @return the list of responses that was created.
+     */
+    protected ListResponse<VolumeMetricsStatsResponse> createVolumeMetricsStatsResponse(Pair<List<VolumeVO>, Integer> volumeList,
+                                                                                        Map<Long,List<VolumeStatsVO>> volumeStatsList) {
+        List<VolumeMetricsStatsResponse> responses = new ArrayList<>();
+        for (VolumeVO volumeVO : volumeList.first()) {
+            VolumeMetricsStatsResponse volumeMetricsStatsResponse = new VolumeMetricsStatsResponse();
+            volumeMetricsStatsResponse.setObjectName("volume");
+            volumeMetricsStatsResponse.setId(volumeVO.getUuid());
+            volumeMetricsStatsResponse.setName(volumeVO.getName());
+
+            volumeMetricsStatsResponse.setStats(createVolumeStatsResponse(volumeStatsList.get(volumeVO.getId())));
+            responses.add(volumeMetricsStatsResponse);
+        }
+
+        ListResponse<VolumeMetricsStatsResponse> response = new ListResponse<>();
+        response.setResponses(responses);
+        return response;
+    }
+
+
+    /**
+     * Creates a {@code Set<StatsResponse>} from a given {@code List<VmStatsVO>}.
+     *
+     * @param volumeStatsList the list of VM stats.
+     * @return the set of responses that was created.
+     */
+    protected List<StatsResponse> createVolumeStatsResponse(List<VolumeStatsVO> volumeStatsList) {
+        List<StatsResponse> statsResponseList = new ArrayList<>();
+        for (VolumeStatsVO volumeStats : volumeStatsList) {
+            StatsResponse response = new StatsResponse();
+            response.setTimestamp(volumeStats.getTimestamp());
+            VmDiskStatsEntry statsEntry = gson.fromJson(volumeStats.getVolumeStatsData(), VmDiskStatsEntry.class);
+            response.setDiskKbsRead(ByteScaleUtils.bytesToKibibytes(statsEntry.getBytesRead()));
+            response.setDiskKbsWrite(ByteScaleUtils.bytesToKibibytes(statsEntry.getBytesWrite()));
+            response.setDiskIORead(statsEntry.getIORead());
+            response.setDiskIOWrite(statsEntry.getIOWrite());
+            statsResponseList.add(response);
+        }
+        return statsResponseList;
+    }
+
 
     @Override
     public InfrastructureResponse listInfrastructure() {
@@ -879,7 +1075,8 @@ public class MetricsServiceImpl extends MutualExclusiveIdsManagerBase implements
         cmdList.add(ListVolumesMetricsCmd.class);
         cmdList.add(ListZonesMetricsCmd.class);
         cmdList.add(ListVMsUsageHistoryCmd.class);
-
+        cmdList.add(ListSystemVMsUsageHistoryCmd.class);
+        cmdList.add(ListVolumesUsageHistoryCmd.class);
         // separate Admin commands
         cmdList.add(ListVMsMetricsCmdByAdmin.class);
         return cmdList;
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/response/VolumeMetricsStatsResponse.java b/plugins/metrics/src/main/java/org/apache/cloudstack/response/VolumeMetricsStatsResponse.java
new file mode 100644
index 00000000000..598f78a53d2
--- /dev/null
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/response/VolumeMetricsStatsResponse.java
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.response;
+
+import java.util.List;
+
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseResponse;
+import org.apache.cloudstack.api.response.StatsResponse;
+
+import com.cloud.serializer.Param;
+import com.google.gson.annotations.SerializedName;
+
+public class VolumeMetricsStatsResponse extends BaseResponse {
+    @SerializedName(ApiConstants.ID)
+    @Param(description = "the ID of the volume")
+    private String id;
+
+    @SerializedName(ApiConstants.NAME)
+    @Param(description = "the name of the volume")
+    private String name;
+
+    @SerializedName("stats")
+    @Param(description = "the list of VM stats")
+    private List<StatsResponse> stats;
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setStats(List<StatsResponse> stats) {
+        this.stats = stats;
+    }
+
+}
diff --git a/plugins/metrics/src/test/java/org/apache/cloudstack/metrics/MetricsServiceImplTest.java b/plugins/metrics/src/test/java/org/apache/cloudstack/metrics/MetricsServiceImplTest.java
index ddc4d8b30bf..ec4add4141d 100644
--- a/plugins/metrics/src/test/java/org/apache/cloudstack/metrics/MetricsServiceImplTest.java
+++ b/plugins/metrics/src/test/java/org/apache/cloudstack/metrics/MetricsServiceImplTest.java
@@ -44,6 +44,7 @@ import com.cloud.utils.Pair;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.vm.UserVmVO;
+import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.VmStatsVO;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.VmStatsDao;
@@ -84,10 +85,10 @@ public class MetricsServiceImplTest {
     ArgumentCaptor<SearchCriteria.Op> opCaptor;
     long fakeVmId1 = 1L, fakeVmId2 = 2L;
 
-    Pair<List<UserVmVO>, Integer> expectedVmListAndCounter;
+    Pair<List<? extends VMInstanceVO>, Integer> expectedVmListAndCounter;
 
     @Mock
-    Pair<List<UserVmVO>, Integer> expectedVmListAndCounterMock;
+    Pair<List<? extends VMInstanceVO>, Integer> expectedVmListAndCounterMock;
 
     @Mock
     Map<Long,List<VmStatsVO>> vmStatsMapMock;
@@ -100,7 +101,7 @@ public class MetricsServiceImplTest {
     }
 
     private void preparesearchForUserVmsInternalTest() {
-        expectedVmListAndCounter = new Pair<List<UserVmVO>, Integer>(Arrays.asList(userVmVOMock), 1);
+        expectedVmListAndCounter = new Pair<>(Arrays.asList(userVmVOMock), 1);
 
         Mockito.doReturn(1L).when(listVMsUsageHistoryCmdMock).getStartIndex();
         Mockito.doReturn(2L).when(listVMsUsageHistoryCmdMock).getPageSizeVal();
@@ -196,9 +197,11 @@ public class MetricsServiceImplTest {
         Mockito.doReturn(fakeVmId1).when(userVmVOMock).getId();
         Map<Long,List<VmStatsVO>> expected = new HashMap<Long,List<VmStatsVO>>();
         expected.put(fakeVmId1, new ArrayList<VmStatsVO>());
+        Date startDate = Mockito.mock(Date.class);
+        Date endDate = Mockito.mock(Date.class);
 
         Map<Long,List<VmStatsVO>> result = spy.searchForVmMetricsStatsInternal(
-                listVMsUsageHistoryCmdMock, Arrays.asList(userVmVOMock));
+                startDate, endDate, Arrays.asList(userVmVOMock));
 
         Mockito.verify(userVmVOMock).getId();
         Mockito.verify(spy).findVmStatsAccordingToDateParams(
@@ -210,9 +213,10 @@ public class MetricsServiceImplTest {
     public void searchForVmMetricsStatsInternalTestWithAnEmptyListOfVms() {
         Mockito.doNothing().when(spy).validateDateParams(Mockito.any(), Mockito.any());
         Map<Long,List<VmStatsVO>> expected = new HashMap<Long,List<VmStatsVO>>();
-
+        Date startDate = Mockito.mock(Date.class);
+        Date endDate = Mockito.mock(Date.class);
         Map<Long,List<VmStatsVO>> result = spy.searchForVmMetricsStatsInternal(
-                listVMsUsageHistoryCmdMock, new ArrayList<UserVmVO>());
+                startDate, endDate, new ArrayList<UserVmVO>());
 
         Mockito.verify(userVmVOMock, Mockito.never()).getId();
         Mockito.verify(spy, Mockito.never()).findVmStatsAccordingToDateParams(
@@ -287,7 +291,7 @@ public class MetricsServiceImplTest {
         Mockito.doReturn(null).when(spy).createStatsResponse(Mockito.any());
 
         ListResponse<VmMetricsStatsResponse> result = spy.createVmMetricsStatsResponse(
-                expectedVmListAndCounterMock, vmStatsMapMock);
+                expectedVmListAndCounterMock.first(), vmStatsMapMock);
 
         Assert.assertEquals(Integer.valueOf(1), result.getCount());
     }
@@ -299,7 +303,7 @@ public class MetricsServiceImplTest {
 
     @Test(expected = NullPointerException.class)
     public void createVmMetricsStatsResponseTestWithNoVmStatsList() {
-        spy.createVmMetricsStatsResponse(expectedVmListAndCounterMock, null);
+        spy.createVmMetricsStatsResponse(expectedVmListAndCounterMock.first(), null);
     }
 
     @Test
diff --git a/server/src/main/java/com/cloud/network/as/AutoScaleManagerImpl.java b/server/src/main/java/com/cloud/network/as/AutoScaleManagerImpl.java
index bd522b23891..29cc4cead0b 100644
--- a/server/src/main/java/com/cloud/network/as/AutoScaleManagerImpl.java
+++ b/server/src/main/java/com/cloud/network/as/AutoScaleManagerImpl.java
@@ -37,15 +37,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.inject.Inject;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
-
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
-
 import org.apache.cloudstack.acl.ControlledEntity;
 import org.apache.cloudstack.affinity.AffinityGroupVO;
 import org.apache.cloudstack.affinity.dao.AffinityGroupDao;
@@ -79,6 +70,11 @@ import org.apache.cloudstack.framework.config.ConfigKey;
 import org.apache.cloudstack.framework.config.Configurable;
 import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
 import org.apache.cloudstack.managed.context.ManagedContextRunnable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
 
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.PerformanceMonitorAnswer;
@@ -86,9 +82,9 @@ import com.cloud.agent.api.PerformanceMonitorCommand;
 import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.agent.api.routing.GetAutoScaleMetricsAnswer;
 import com.cloud.agent.api.routing.GetAutoScaleMetricsCommand;
-import com.cloud.agent.api.to.LoadBalancerTO.AutoScaleVmProfileTO;
-import com.cloud.agent.api.to.LoadBalancerTO.AutoScaleVmGroupTO;
 import com.cloud.agent.api.to.LoadBalancerTO.AutoScalePolicyTO;
+import com.cloud.agent.api.to.LoadBalancerTO.AutoScaleVmGroupTO;
+import com.cloud.agent.api.to.LoadBalancerTO.AutoScaleVmProfileTO;
 import com.cloud.agent.api.to.LoadBalancerTO.ConditionTO;
 import com.cloud.agent.api.to.LoadBalancerTO.CounterTO;
 import com.cloud.api.ApiDBUtils;
@@ -173,9 +169,9 @@ import com.cloud.utils.db.GenericSearchBuilder;
 import com.cloud.utils.db.JoinBuilder;
 import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
-import com.cloud.utils.db.TransactionCallback;
 import com.cloud.utils.db.SearchCriteria.Op;
 import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallback;
 import com.cloud.utils.db.TransactionStatus;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.utils.net.NetUtils;
@@ -185,10 +181,14 @@ import com.cloud.vm.UserVmService;
 import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.VirtualMachineManager;
 import com.cloud.vm.VmDetailConstants;
+import com.cloud.vm.VmStats;
 import com.cloud.vm.dao.DomainRouterDao;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.VMInstanceDao;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 
 public class AutoScaleManagerImpl extends ManagerBase implements AutoScaleManager, AutoScaleService, Configurable {
     private static final Logger s_logger = Logger.getLogger(AutoScaleManagerImpl.class);
@@ -273,6 +273,8 @@ public class AutoScaleManagerImpl extends ManagerBase implements AutoScaleManage
     private AffinityGroupDao affinityGroupDao;
     @Inject
     private NetworkOfferingDao networkOfferingDao;
+    @Inject
+    private VirtualMachineManager virtualMachineManager;
 
     private static final String PARAM_ROOT_DISK_SIZE = "rootdisksize";
     private static final String PARAM_DISK_OFFERING_ID = "diskofferingid";
@@ -2653,21 +2655,21 @@ public class AutoScaleManagerImpl extends ManagerBase implements AutoScaleManage
             List<Long> vmIds = hostAndVmIds.getValue();
 
             if (!DEFAULT_HOST_ID.equals(hostId)) {
-                Map<Long, VmStatsEntry> vmStatsById = getVmStatsByIdFromHost(hostId, vmIds);
+                Map<Long, ? extends VmStats> vmStatsById = getVmStatsByIdFromHost(hostId, vmIds);
                 processVmStatsByIdFromHost(groupTO, vmIds, vmStatsById, policyCountersMap);
             }
         }
     }
 
-    protected Map<Long, VmStatsEntry> getVmStatsByIdFromHost(Long hostId, List<Long> vmIds) {
-        Map<Long, VmStatsEntry> vmStatsById = new HashMap<>();
+    protected Map<Long, ? extends VmStats> getVmStatsByIdFromHost(Long hostId, List<Long> vmIds) {
+        Map<Long, ? extends VmStats> vmStatsById = new HashMap<>();
         HostVO host = hostDao.findById(hostId);
         if (host == null) {
             s_logger.debug("Failed to get VM stats from non-existing host : " + hostId);
             return vmStatsById;
         }
         try {
-            vmStatsById = userVmMgr.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds);
+            vmStatsById = virtualMachineManager.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds);
             if (MapUtils.isEmpty(vmStatsById)) {
                 s_logger.warn("Got empty result for virtual machine statistics from host: " + host);
             }
@@ -2677,10 +2679,10 @@ public class AutoScaleManagerImpl extends ManagerBase implements AutoScaleManage
         return vmStatsById;
     }
 
-    protected void processVmStatsByIdFromHost(AutoScaleVmGroupTO groupTO, List<Long> vmIds, Map<Long, VmStatsEntry> vmStatsById, Map<Long, List<CounterTO>> policyCountersMap) {
+    protected void processVmStatsByIdFromHost(AutoScaleVmGroupTO groupTO, List<Long> vmIds, Map<Long, ? extends VmStats> vmStatsById, Map<Long, List<CounterTO>> policyCountersMap) {
         Date timestamp = new Date();
         for (Long vmId : vmIds) {
-            VmStatsEntry vmStats = vmStatsById == null ? null : vmStatsById.get(vmId);
+            VmStatsEntry vmStats = vmStatsById == null ? null : (VmStatsEntry)vmStatsById.get(vmId);
             for (Map.Entry<Long, List<CounterTO>> policyCounters : policyCountersMap.entrySet()) {
                 Long policyId = policyCounters.getKey();
                 List<CounterTO> counters = policyCounters.getValue();
diff --git a/server/src/main/java/com/cloud/server/ManagementServerImpl.java b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
index 9922c275ed7..d2375ef8d1c 100644
--- a/server/src/main/java/com/cloud/server/ManagementServerImpl.java
+++ b/server/src/main/java/com/cloud/server/ManagementServerImpl.java
@@ -16,6 +16,9 @@
 // under the License.
 package com.cloud.server;
 
+import static com.cloud.configuration.ConfigurationManagerImpl.VM_USERDATA_MAX_LENGTH;
+import static com.cloud.vm.UserVmManager.MAX_USER_DATA_LENGTH_BYTES;
+
 import java.io.UnsupportedEncodingException;
 import java.lang.reflect.Field;
 import java.net.URLDecoder;
@@ -45,202 +48,6 @@ import javax.crypto.spec.SecretKeySpec;
 import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
-import com.cloud.agent.AgentManager;
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.GetVncPortAnswer;
-import com.cloud.agent.api.GetVncPortCommand;
-import com.cloud.agent.api.PatchSystemVmAnswer;
-import com.cloud.agent.api.PatchSystemVmCommand;
-import com.cloud.agent.api.proxy.AllowConsoleAccessCommand;
-import com.cloud.agent.api.routing.NetworkElementCommand;
-import com.cloud.agent.manager.Commands;
-import com.cloud.agent.manager.allocator.HostAllocator;
-import com.cloud.alert.Alert;
-import com.cloud.alert.AlertManager;
-import com.cloud.alert.AlertVO;
-import com.cloud.alert.dao.AlertDao;
-import com.cloud.api.ApiDBUtils;
-import com.cloud.api.query.dao.StoragePoolJoinDao;
-import com.cloud.api.query.vo.StoragePoolJoinVO;
-import com.cloud.capacity.Capacity;
-import com.cloud.capacity.CapacityVO;
-import com.cloud.capacity.dao.CapacityDao;
-import com.cloud.capacity.dao.CapacityDaoImpl.SummedCapacity;
-import com.cloud.cluster.ClusterManager;
-import com.cloud.configuration.Config;
-import com.cloud.configuration.ConfigurationManagerImpl;
-import com.cloud.consoleproxy.ConsoleProxyManagementState;
-import com.cloud.consoleproxy.ConsoleProxyManager;
-import com.cloud.dc.AccountVlanMapVO;
-import com.cloud.dc.ClusterVO;
-import com.cloud.dc.DataCenterVO;
-import com.cloud.dc.DomainVlanMapVO;
-import com.cloud.dc.HostPodVO;
-import com.cloud.dc.Pod;
-import com.cloud.dc.PodVlanMapVO;
-import com.cloud.dc.Vlan;
-import com.cloud.dc.Vlan.VlanType;
-import com.cloud.dc.VlanVO;
-import com.cloud.dc.dao.AccountVlanMapDao;
-import com.cloud.dc.dao.ClusterDao;
-import com.cloud.dc.dao.DataCenterDao;
-import com.cloud.dc.dao.DomainVlanMapDao;
-import com.cloud.dc.dao.HostPodDao;
-import com.cloud.dc.dao.PodVlanMapDao;
-import com.cloud.dc.dao.VlanDao;
-import com.cloud.deploy.DataCenterDeployment;
-import com.cloud.deploy.DeploymentPlanner;
-import com.cloud.deploy.DeploymentPlanner.ExcludeList;
-import com.cloud.deploy.DeploymentPlanningManager;
-import com.cloud.domain.DomainVO;
-import com.cloud.domain.dao.DomainDao;
-import com.cloud.event.ActionEvent;
-import com.cloud.event.ActionEventUtils;
-import com.cloud.event.EventTypes;
-import com.cloud.event.EventVO;
-import com.cloud.event.dao.EventDao;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.exception.ConcurrentOperationException;
-import com.cloud.exception.InsufficientAddressCapacityException;
-import com.cloud.exception.InvalidParameterValueException;
-import com.cloud.exception.ManagementServerException;
-import com.cloud.exception.OperationTimedoutException;
-import com.cloud.exception.PermissionDeniedException;
-import com.cloud.exception.ResourceUnavailableException;
-import com.cloud.exception.VirtualMachineMigrationException;
-import com.cloud.gpu.GPU;
-import com.cloud.ha.HighAvailabilityManager;
-import com.cloud.host.DetailVO;
-import com.cloud.host.Host;
-import com.cloud.host.Host.Type;
-import com.cloud.host.HostTagVO;
-import com.cloud.host.HostVO;
-import com.cloud.host.dao.HostDao;
-import com.cloud.host.dao.HostDetailsDao;
-import com.cloud.host.dao.HostTagsDao;
-import com.cloud.hypervisor.Hypervisor;
-import com.cloud.hypervisor.Hypervisor.HypervisorType;
-import com.cloud.hypervisor.HypervisorCapabilities;
-import com.cloud.hypervisor.HypervisorCapabilitiesVO;
-import com.cloud.hypervisor.dao.HypervisorCapabilitiesDao;
-import com.cloud.hypervisor.kvm.dpdk.DpdkHelper;
-import com.cloud.info.ConsoleProxyInfo;
-import com.cloud.network.IpAddress;
-import com.cloud.network.IpAddressManager;
-import com.cloud.network.IpAddressManagerImpl;
-import com.cloud.network.Network;
-import com.cloud.network.NetworkModel;
-import com.cloud.network.Networks;
-import com.cloud.network.dao.IPAddressDao;
-import com.cloud.network.dao.IPAddressVO;
-import com.cloud.network.dao.LoadBalancerDao;
-import com.cloud.network.dao.LoadBalancerVO;
-import com.cloud.network.dao.NetworkAccountDao;
-import com.cloud.network.dao.NetworkAccountVO;
-import com.cloud.network.dao.NetworkDao;
-import com.cloud.network.dao.NetworkDomainDao;
-import com.cloud.network.dao.NetworkDomainVO;
-import com.cloud.network.dao.NetworkVO;
-import com.cloud.network.vpc.dao.VpcDao;
-import com.cloud.org.Cluster;
-import com.cloud.org.Grouping.AllocationState;
-import com.cloud.projects.Project;
-import com.cloud.projects.Project.ListProjectResourcesCriteria;
-import com.cloud.projects.ProjectManager;
-import com.cloud.resource.ResourceManager;
-import com.cloud.server.ResourceTag.ResourceObjectType;
-import com.cloud.server.auth.UserAuthenticator;
-import com.cloud.service.ServiceOfferingVO;
-import com.cloud.service.dao.ServiceOfferingDao;
-import com.cloud.service.dao.ServiceOfferingDetailsDao;
-import com.cloud.storage.DiskOfferingVO;
-import com.cloud.storage.GuestOS;
-import com.cloud.storage.GuestOSCategoryVO;
-import com.cloud.storage.GuestOSHypervisor;
-import com.cloud.storage.GuestOSHypervisorVO;
-import com.cloud.storage.GuestOSVO;
-import com.cloud.storage.GuestOsCategory;
-import com.cloud.storage.ScopeType;
-import com.cloud.storage.Storage;
-import com.cloud.storage.StorageManager;
-import com.cloud.storage.StoragePool;
-import com.cloud.storage.StoragePoolStatus;
-import com.cloud.storage.VMTemplateVO;
-import com.cloud.storage.Volume;
-import com.cloud.storage.VolumeApiServiceImpl;
-import com.cloud.storage.VolumeVO;
-import com.cloud.storage.dao.DiskOfferingDao;
-import com.cloud.storage.dao.GuestOSCategoryDao;
-import com.cloud.storage.dao.GuestOSDao;
-import com.cloud.storage.dao.GuestOSHypervisorDao;
-import com.cloud.storage.dao.VMTemplateDao;
-import com.cloud.storage.dao.VolumeDao;
-import com.cloud.storage.secondary.SecondaryStorageVmManager;
-import com.cloud.tags.ResourceTagVO;
-import com.cloud.tags.dao.ResourceTagDao;
-import com.cloud.template.TemplateManager;
-import com.cloud.user.Account;
-import com.cloud.user.AccountManager;
-import com.cloud.user.AccountService;
-import com.cloud.user.SSHKeyPair;
-import com.cloud.user.SSHKeyPairVO;
-import com.cloud.user.User;
-import com.cloud.user.UserData;
-import com.cloud.user.UserDataVO;
-import com.cloud.user.UserVO;
-import com.cloud.user.dao.AccountDao;
-import com.cloud.user.dao.SSHKeyPairDao;
-import com.cloud.user.dao.UserDao;
-import com.cloud.user.dao.UserDataDao;
-import com.cloud.utils.NumbersUtil;
-import com.cloud.utils.Pair;
-import com.cloud.utils.PasswordGenerator;
-import com.cloud.utils.Ternary;
-import com.cloud.utils.component.ComponentLifecycle;
-import com.cloud.utils.component.ManagerBase;
-import com.cloud.utils.concurrency.NamedThreadFactory;
-import com.cloud.utils.crypt.DBEncryptionUtil;
-import com.cloud.utils.db.DB;
-import com.cloud.utils.db.Filter;
-import com.cloud.utils.db.GlobalLock;
-import com.cloud.utils.db.JoinBuilder;
-import com.cloud.utils.db.JoinBuilder.JoinType;
-import com.cloud.utils.db.SearchBuilder;
-import com.cloud.utils.db.SearchCriteria;
-import com.cloud.utils.db.Transaction;
-import com.cloud.utils.db.TransactionCallbackNoReturn;
-import com.cloud.utils.db.TransactionStatus;
-import com.cloud.utils.db.UUIDManager;
-import com.cloud.utils.exception.CloudRuntimeException;
-import com.cloud.utils.fsm.StateMachine2;
-import com.cloud.utils.net.MacAddress;
-import com.cloud.utils.net.NetUtils;
-import com.cloud.utils.ssh.SSHKeysHelper;
-import com.cloud.vm.ConsoleProxyVO;
-import com.cloud.vm.DiskProfile;
-import com.cloud.vm.DomainRouterVO;
-import com.cloud.vm.InstanceGroupVO;
-import com.cloud.vm.NicVO;
-import com.cloud.vm.SecondaryStorageVmVO;
-import com.cloud.vm.UserVmDetailVO;
-import com.cloud.vm.UserVmManager;
-import com.cloud.vm.UserVmVO;
-import com.cloud.vm.VMInstanceVO;
-import com.cloud.vm.VirtualMachine;
-import com.cloud.vm.VirtualMachine.State;
-import com.cloud.vm.VirtualMachineManager;
-import com.cloud.vm.VirtualMachineProfile;
-import com.cloud.vm.VirtualMachineProfileImpl;
-import com.cloud.vm.dao.ConsoleProxyDao;
-import com.cloud.vm.dao.DomainRouterDao;
-import com.cloud.vm.dao.InstanceGroupDao;
-import com.cloud.vm.dao.NicDao;
-import com.cloud.vm.dao.SecondaryStorageVmDao;
-import com.cloud.vm.dao.UserVmDao;
-import com.cloud.vm.dao.UserVmDetailsDao;
-import com.cloud.vm.dao.VMInstanceDao;
-
 import org.apache.cloudstack.acl.ControlledEntity;
 import org.apache.cloudstack.acl.SecurityChecker;
 import org.apache.cloudstack.affinity.AffinityGroupProcessor;
@@ -802,9 +609,201 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 
-
-import static com.cloud.configuration.ConfigurationManagerImpl.VM_USERDATA_MAX_LENGTH;
-import static com.cloud.vm.UserVmManager.MAX_USER_DATA_LENGTH_BYTES;
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.Command;
+import com.cloud.agent.api.GetVncPortAnswer;
+import com.cloud.agent.api.GetVncPortCommand;
+import com.cloud.agent.api.PatchSystemVmAnswer;
+import com.cloud.agent.api.PatchSystemVmCommand;
+import com.cloud.agent.api.proxy.AllowConsoleAccessCommand;
+import com.cloud.agent.api.routing.NetworkElementCommand;
+import com.cloud.agent.manager.Commands;
+import com.cloud.agent.manager.allocator.HostAllocator;
+import com.cloud.alert.Alert;
+import com.cloud.alert.AlertManager;
+import com.cloud.alert.AlertVO;
+import com.cloud.alert.dao.AlertDao;
+import com.cloud.api.ApiDBUtils;
+import com.cloud.api.query.dao.StoragePoolJoinDao;
+import com.cloud.api.query.vo.StoragePoolJoinVO;
+import com.cloud.capacity.Capacity;
+import com.cloud.capacity.CapacityVO;
+import com.cloud.capacity.dao.CapacityDao;
+import com.cloud.capacity.dao.CapacityDaoImpl.SummedCapacity;
+import com.cloud.cluster.ClusterManager;
+import com.cloud.configuration.Config;
+import com.cloud.configuration.ConfigurationManagerImpl;
+import com.cloud.consoleproxy.ConsoleProxyManagementState;
+import com.cloud.consoleproxy.ConsoleProxyManager;
+import com.cloud.dc.AccountVlanMapVO;
+import com.cloud.dc.ClusterVO;
+import com.cloud.dc.DataCenterVO;
+import com.cloud.dc.DomainVlanMapVO;
+import com.cloud.dc.HostPodVO;
+import com.cloud.dc.Pod;
+import com.cloud.dc.PodVlanMapVO;
+import com.cloud.dc.Vlan;
+import com.cloud.dc.Vlan.VlanType;
+import com.cloud.dc.VlanVO;
+import com.cloud.dc.dao.AccountVlanMapDao;
+import com.cloud.dc.dao.ClusterDao;
+import com.cloud.dc.dao.DataCenterDao;
+import com.cloud.dc.dao.DomainVlanMapDao;
+import com.cloud.dc.dao.HostPodDao;
+import com.cloud.dc.dao.PodVlanMapDao;
+import com.cloud.dc.dao.VlanDao;
+import com.cloud.deploy.DataCenterDeployment;
+import com.cloud.deploy.DeploymentPlanner;
+import com.cloud.deploy.DeploymentPlanner.ExcludeList;
+import com.cloud.deploy.DeploymentPlanningManager;
+import com.cloud.domain.DomainVO;
+import com.cloud.domain.dao.DomainDao;
+import com.cloud.event.ActionEvent;
+import com.cloud.event.ActionEventUtils;
+import com.cloud.event.EventTypes;
+import com.cloud.event.EventVO;
+import com.cloud.event.dao.EventDao;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.ConcurrentOperationException;
+import com.cloud.exception.InsufficientAddressCapacityException;
+import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.exception.ManagementServerException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.exception.PermissionDeniedException;
+import com.cloud.exception.ResourceUnavailableException;
+import com.cloud.exception.VirtualMachineMigrationException;
+import com.cloud.gpu.GPU;
+import com.cloud.ha.HighAvailabilityManager;
+import com.cloud.host.DetailVO;
+import com.cloud.host.Host;
+import com.cloud.host.Host.Type;
+import com.cloud.host.HostTagVO;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.host.dao.HostDetailsDao;
+import com.cloud.host.dao.HostTagsDao;
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.hypervisor.HypervisorCapabilities;
+import com.cloud.hypervisor.HypervisorCapabilitiesVO;
+import com.cloud.hypervisor.dao.HypervisorCapabilitiesDao;
+import com.cloud.hypervisor.kvm.dpdk.DpdkHelper;
+import com.cloud.info.ConsoleProxyInfo;
+import com.cloud.network.IpAddress;
+import com.cloud.network.IpAddressManager;
+import com.cloud.network.IpAddressManagerImpl;
+import com.cloud.network.Network;
+import com.cloud.network.NetworkModel;
+import com.cloud.network.Networks;
+import com.cloud.network.dao.IPAddressDao;
+import com.cloud.network.dao.IPAddressVO;
+import com.cloud.network.dao.LoadBalancerDao;
+import com.cloud.network.dao.LoadBalancerVO;
+import com.cloud.network.dao.NetworkAccountDao;
+import com.cloud.network.dao.NetworkAccountVO;
+import com.cloud.network.dao.NetworkDao;
+import com.cloud.network.dao.NetworkDomainDao;
+import com.cloud.network.dao.NetworkDomainVO;
+import com.cloud.network.dao.NetworkVO;
+import com.cloud.network.vpc.dao.VpcDao;
+import com.cloud.org.Cluster;
+import com.cloud.org.Grouping.AllocationState;
+import com.cloud.projects.Project;
+import com.cloud.projects.Project.ListProjectResourcesCriteria;
+import com.cloud.projects.ProjectManager;
+import com.cloud.resource.ResourceManager;
+import com.cloud.server.ResourceTag.ResourceObjectType;
+import com.cloud.server.auth.UserAuthenticator;
+import com.cloud.service.ServiceOfferingVO;
+import com.cloud.service.dao.ServiceOfferingDao;
+import com.cloud.service.dao.ServiceOfferingDetailsDao;
+import com.cloud.storage.DiskOfferingVO;
+import com.cloud.storage.GuestOS;
+import com.cloud.storage.GuestOSCategoryVO;
+import com.cloud.storage.GuestOSHypervisor;
+import com.cloud.storage.GuestOSHypervisorVO;
+import com.cloud.storage.GuestOSVO;
+import com.cloud.storage.GuestOsCategory;
+import com.cloud.storage.ScopeType;
+import com.cloud.storage.Storage;
+import com.cloud.storage.StorageManager;
+import com.cloud.storage.StoragePool;
+import com.cloud.storage.StoragePoolStatus;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.Volume;
+import com.cloud.storage.VolumeApiServiceImpl;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.DiskOfferingDao;
+import com.cloud.storage.dao.GuestOSCategoryDao;
+import com.cloud.storage.dao.GuestOSDao;
+import com.cloud.storage.dao.GuestOSHypervisorDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.storage.dao.VolumeDao;
+import com.cloud.storage.secondary.SecondaryStorageVmManager;
+import com.cloud.tags.ResourceTagVO;
+import com.cloud.tags.dao.ResourceTagDao;
+import com.cloud.template.TemplateManager;
+import com.cloud.user.Account;
+import com.cloud.user.AccountManager;
+import com.cloud.user.AccountService;
+import com.cloud.user.SSHKeyPair;
+import com.cloud.user.SSHKeyPairVO;
+import com.cloud.user.User;
+import com.cloud.user.UserData;
+import com.cloud.user.UserDataVO;
+import com.cloud.user.UserVO;
+import com.cloud.user.dao.AccountDao;
+import com.cloud.user.dao.SSHKeyPairDao;
+import com.cloud.user.dao.UserDao;
+import com.cloud.user.dao.UserDataDao;
+import com.cloud.utils.NumbersUtil;
+import com.cloud.utils.Pair;
+import com.cloud.utils.PasswordGenerator;
+import com.cloud.utils.Ternary;
+import com.cloud.utils.component.ComponentLifecycle;
+import com.cloud.utils.component.ManagerBase;
+import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.crypt.DBEncryptionUtil;
+import com.cloud.utils.db.DB;
+import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.db.JoinBuilder;
+import com.cloud.utils.db.JoinBuilder.JoinType;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallbackNoReturn;
+import com.cloud.utils.db.TransactionStatus;
+import com.cloud.utils.db.UUIDManager;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.fsm.StateMachine2;
+import com.cloud.utils.net.MacAddress;
+import com.cloud.utils.net.NetUtils;
+import com.cloud.utils.ssh.SSHKeysHelper;
+import com.cloud.vm.ConsoleProxyVO;
+import com.cloud.vm.DiskProfile;
+import com.cloud.vm.DomainRouterVO;
+import com.cloud.vm.InstanceGroupVO;
+import com.cloud.vm.NicVO;
+import com.cloud.vm.SecondaryStorageVmVO;
+import com.cloud.vm.UserVmDetailVO;
+import com.cloud.vm.UserVmManager;
+import com.cloud.vm.UserVmVO;
+import com.cloud.vm.VMInstanceVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.VirtualMachine.State;
+import com.cloud.vm.VirtualMachineManager;
+import com.cloud.vm.VirtualMachineProfile;
+import com.cloud.vm.VirtualMachineProfileImpl;
+import com.cloud.vm.dao.ConsoleProxyDao;
+import com.cloud.vm.dao.DomainRouterDao;
+import com.cloud.vm.dao.InstanceGroupDao;
+import com.cloud.vm.dao.NicDao;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
+import com.cloud.vm.dao.UserVmDao;
+import com.cloud.vm.dao.UserVmDetailsDao;
+import com.cloud.vm.dao.VMInstanceDao;
 
 public class ManagementServerImpl extends ManagerBase implements ManagementServer, Configurable {
     public static final Logger s_logger = Logger.getLogger(ManagementServerImpl.class.getName());
@@ -4188,6 +4187,10 @@ public class ManagementServerImpl extends ManagerBase implements ManagementServe
         capabilities.put("kubernetesServiceEnabled", kubernetesServiceEnabled);
         capabilities.put("kubernetesClusterExperimentalFeaturesEnabled", kubernetesClusterExperimentalFeaturesEnabled);
         capabilities.put(ApiServiceConfiguration.DefaultUIPageSize.key(), ApiServiceConfiguration.DefaultUIPageSize.value());
+        capabilities.put(ApiConstants.INSTANCES_STATS_RETENTION_TIME, StatsCollector.vmStatsMaxRetentionTime.value());
+        capabilities.put(ApiConstants.INSTANCES_STATS_USER_ONLY, StatsCollector.vmStatsCollectUserVMOnly.value());
+        capabilities.put(ApiConstants.INSTANCES_DISKS_STATS_RETENTION_ENABLED, StatsCollector.vmDiskStatsRetentionEnabled.value());
+        capabilities.put(ApiConstants.INSTANCES_DISKS_STATS_RETENTION_TIME, StatsCollector.vmDiskStatsMaxRetentionTime.value());
         if (apiLimitEnabled) {
             capabilities.put("apiLimitInterval", apiLimitInterval);
             capabilities.put("apiLimitMax", apiLimitMax);
diff --git a/server/src/main/java/com/cloud/server/StatsCollector.java b/server/src/main/java/com/cloud/server/StatsCollector.java
index cd3a0cc95be..5197ccc3a3c 100644
--- a/server/src/main/java/com/cloud/server/StatsCollector.java
+++ b/server/src/main/java/com/cloud/server/StatsCollector.java
@@ -16,16 +16,17 @@
 // under the License.
 package com.cloud.server;
 
-import javax.inject.Inject;
+import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
+
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
-
 import java.lang.management.RuntimeMXBean;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Date;
@@ -41,7 +42,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import com.cloud.utils.db.DbUtil;
+import javax.inject.Inject;
+
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
 import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreProvider;
@@ -62,9 +64,9 @@ import org.apache.cloudstack.utils.usage.UsageUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateUtils;
-import org.apache.commons.lang3.BooleanUtils;
 import org.apache.log4j.Logger;
 import org.influxdb.BatchOptions;
 import org.influxdb.InfluxDB;
@@ -105,6 +107,7 @@ import com.cloud.host.HostStats;
 import com.cloud.host.HostVO;
 import com.cloud.host.Status;
 import com.cloud.host.dao.HostDao;
+import com.cloud.hypervisor.Hypervisor;
 import com.cloud.hypervisor.Hypervisor.HypervisorType;
 import com.cloud.network.as.AutoScaleManager;
 import com.cloud.org.Cluster;
@@ -118,8 +121,10 @@ import com.cloud.storage.Storage.ImageFormat;
 import com.cloud.storage.StorageManager;
 import com.cloud.storage.StorageStats;
 import com.cloud.storage.VolumeStats;
+import com.cloud.storage.VolumeStatsVO;
 import com.cloud.storage.VolumeVO;
 import com.cloud.storage.dao.VolumeDao;
+import com.cloud.storage.dao.VolumeStatsDao;
 import com.cloud.user.UserStatisticsVO;
 import com.cloud.user.VmDiskStatisticsVO;
 import com.cloud.user.dao.UserStatisticsDao;
@@ -130,6 +135,7 @@ import com.cloud.utils.component.ComponentMethodInterceptable;
 import com.cloud.utils.component.ManagerBase;
 import com.cloud.utils.concurrency.NamedThreadFactory;
 import com.cloud.utils.db.DbProperties;
+import com.cloud.utils.db.DbUtil;
 import com.cloud.utils.db.Filter;
 import com.cloud.utils.db.GlobalLock;
 import com.cloud.utils.db.SearchCriteria;
@@ -144,13 +150,15 @@ import com.cloud.vm.UserVmManager;
 import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.VirtualMachineManager;
+import com.cloud.vm.VmDiskStats;
+import com.cloud.vm.VmNetworkStats;
 import com.cloud.vm.VmStats;
 import com.cloud.vm.VmStatsVO;
 import com.cloud.vm.dao.NicDao;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.VMInstanceDao;
 import com.cloud.vm.dao.VmStatsDao;
-
 import com.codahale.metrics.JvmAttributeGaugeSet;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricRegistry;
@@ -160,12 +168,12 @@ import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
 import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
 import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
 import com.google.gson.JsonParseException;
 import com.google.gson.reflect.TypeToken;
 import com.sun.management.OperatingSystemMXBean;
 
-import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
-
 /**
  * Provides real time stats for various agent resources up to x seconds
  *
@@ -267,15 +275,24 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
     private static final ConfigKey<String> statsOutputUri = new ConfigKey<>("Advanced", String.class, "stats.output.uri", "",
             "URI to send StatsCollector statistics to. The collector is defined on the URI scheme. Example: graphite://graphite-hostaddress:port or influxdb://influxdb-hostaddress/dbname. Note that the port is optional, if not added the default port for the respective collector (graphite or influxdb) will be used. Additionally, the database name '/dbname' is  also optional; default db name is 'cloudstack'. You must create and configure the database if using influxdb.",
             true);
-    protected static ConfigKey<Boolean> vmStatsIncrementMetrics = new ConfigKey<Boolean>("Advanced", Boolean.class, "vm.stats.increment.metrics", "true",
+    protected static ConfigKey<Boolean> vmStatsIncrementMetrics = new ConfigKey<>("Advanced", Boolean.class, "vm.stats.increment.metrics", "true",
             "When set to 'true', VM metrics(NetworkReadKBs, NetworkWriteKBs, DiskWriteKBs, DiskReadKBs, DiskReadIOs and DiskWriteIOs) that are collected from the hypervisor are summed before being returned."
-            + "On the other hand, when set to 'false', the VM metrics API will just display the latest metrics collected.", true);
+                    + "On the other hand, when set to 'false', the VM metrics API will just display the latest metrics collected.", true);
     private static final ConfigKey<Boolean> VM_STATS_INCREMENT_METRICS_IN_MEMORY = new ConfigKey<>("Advanced", Boolean.class, "vm.stats.increment.metrics.in.memory", "true",
             "When set to 'true', VM metrics(NetworkReadKBs, NetworkWriteKBs, DiskWriteKBs, DiskReadKBs, DiskReadIOs and DiskWriteIOs) that are collected from the hypervisor are summed and stored in memory. "
             + "On the other hand, when set to 'false', the VM metrics API will just display the latest metrics collected.", true);
-    protected static ConfigKey<Integer> vmStatsMaxRetentionTime = new ConfigKey<Integer>("Advanced", Integer.class, "vm.stats.max.retention.time", "1",
+    protected static ConfigKey<Integer> vmStatsMaxRetentionTime = new ConfigKey<>("Advanced", Integer.class, "vm.stats.max.retention.time", "720",
             "The maximum time (in minutes) for keeping VM stats records in the database. The VM stats cleanup process will be disabled if this is set to 0 or less than 0.", true);
 
+    protected static ConfigKey<Boolean> vmStatsCollectUserVMOnly = new ConfigKey<>("Advanced", Boolean.class, "vm.stats.user.vm.only", "false",
+            "When set to 'false' stats for system VMs will be collected otherwise stats collection will be done only for user VMs", true);
+
+    protected static ConfigKey<Boolean> vmDiskStatsRetentionEnabled = new ConfigKey<>("Advanced", Boolean.class, "vm.disk.stats.retention.enabled", "false",
+            "When set to 'true' stats for VM disks will be stored in the database otherwise disk stats will not be stored", true);
+
+    protected static ConfigKey<Integer> vmDiskStatsMaxRetentionTime = new ConfigKey<>("Advanced", Integer.class, "vm.disk.stats.max.retention.time", "720",
+            "The maximum time (in minutes) for keeping VM disks stats records in the database. The VM disks stats cleanup process will be disabled if this is set to 0 or less than 0.", true);
+
     private static StatsCollector s_instance = null;
 
     private static Gson gson = new Gson();
@@ -296,6 +313,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
     @Inject
     private VolumeDao _volsDao;
     @Inject
+    protected VolumeStatsDao volumeStatsDao;
+    @Inject
     private PrimaryDataStoreDao _storagePoolDao;
     @Inject
     private StorageManager _storageManager;
@@ -330,6 +349,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
     private ClusterManager clusterManager;
     @Inject
     private ManagementServerStatusDao managementServerStatusDao;
+    @Inject
+    VirtualMachineManager virtualMachineManager;
 
     private final ConcurrentHashMap<String, ManagementServerHostStats> managementServerHostStats = new ConcurrentHashMap<>();
     private final ConcurrentHashMap<String, Object> dbStats = new ConcurrentHashMap<>();
@@ -467,6 +488,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 
         _executor.scheduleWithFixedDelay(new VmStatsCleaner(), DEFAULT_INITIAL_DELAY, 60000L, TimeUnit.MILLISECONDS);
 
+        _executor.scheduleWithFixedDelay(new VolumeStatsCleaner(), DEFAULT_INITIAL_DELAY, 60000L, TimeUnit.MILLISECONDS);
+
         scheduleCollection(MANAGEMENT_SERVER_STATUS_COLLECTION_INTERVAL, new ManagementServerCollector(), 1L);
         scheduleCollection(DATABASE_SERVER_STATUS_COLLECTION_INTERVAL, new DbCollector(), 0L);
 
@@ -604,6 +627,19 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
                 externalStatsPrefix, externalStatsHost, externalStatsPort));
     }
 
+    protected Map<Long, VMInstanceVO> getVmMapForStatsForHost(Host host) {
+        List<VMInstanceVO> vms = _vmInstance.listByHostAndState(host.getId(), VirtualMachine.State.Running);
+        boolean collectUserVMStatsOnly = Boolean.TRUE.equals(vmStatsCollectUserVMOnly.value());
+        Map<Long, VMInstanceVO> vmMap = new HashMap<>();
+        for (VMInstanceVO vm : vms) {
+            if (collectUserVMStatsOnly && !VirtualMachine.Type.User.equals(vm.getType())) {
+                continue;
+            }
+            vmMap.put(vm.getId(), vm);
+        }
+        return vmMap;
+    }
+
     class HostCollector extends AbstractStatsCollector {
         @Override
         protected void runInContext() {
@@ -1156,25 +1192,18 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
                 Map<Object, Object> metrics = new HashMap<>();
 
                 for (HostVO host : hosts) {
-                    List<UserVmVO> vms = _userVmDao.listRunningByHostId(host.getId());
                     Date timestamp = new Date();
-
-                    List<Long> vmIds = new ArrayList<Long>();
-
-                    for (UserVmVO vm : vms) {
-                        vmIds.add(vm.getId());
-                    }
-
+                    Map<Long, VMInstanceVO> vmMap = getVmMapForStatsForHost(host);
                     try {
-                        Map<Long, VmStatsEntry> vmStatsById = _userVmMgr.getVirtualMachineStatistics(host.getId(), host.getName(), vmIds);
+                        Map<Long, ? extends VmStats> vmStatsById = virtualMachineManager.getVirtualMachineStatistics(host.getId(), host.getName(), vmMap);
 
                         if (vmStatsById != null) {
                             Set<Long> vmIdSet = vmStatsById.keySet();
                             for (Long vmId : vmIdSet) {
-                                VmStatsEntry statsForCurrentIteration = vmStatsById.get(vmId);
+                                VmStatsEntry statsForCurrentIteration = (VmStatsEntry)vmStatsById.get(vmId);
                                 statsForCurrentIteration.setVmId(vmId);
-                                UserVmVO userVmVo = _userVmDao.findById(vmId);
-                                statsForCurrentIteration.setUserVmVO(userVmVo);
+                                VMInstanceVO vm = vmMap.get(vmId);
+                                statsForCurrentIteration.setVmUuid(vm.getUuid());
 
                                 persistVirtualMachineStats(statsForCurrentIteration, timestamp);
 
@@ -1226,6 +1255,12 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
         }
     }
 
+    class VolumeStatsCleaner extends ManagedContextRunnable{
+        protected void runInContext() {
+            cleanUpVolumeStats();
+        }
+    }
+
     /**
      * Gets the latest or the accumulation of the stats collected from a given VM.
      *
@@ -1345,12 +1380,20 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
         }
     }
 
+    private void logLessLatestStatDiscrepancy(String prefix, String hostName, String vmName, long reported, long stored, boolean toHumanReadable) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(String.format("%s that's less than the last one.  Assuming something went wrong and persisting it. Host: %s . VM: %s Reported: %s Stored: %s",
+                    prefix, hostName, vmName, toHumanReadable ? toHumanReadableSize(reported) : reported, toHumanReadable ? toHumanReadableSize(stored) : stored));
+        }
+    }
+
     class VmDiskStatsTask extends ManagedContextRunnable {
         @Override
         protected void runInContext() {
             //Check for ownership
             //msHost in UP state with min id should run the job
             ManagementServerHostVO msHost = managementServerHostDao.findOneInUpState(new Filter(ManagementServerHostVO.class, "id", true, 0L, 1L));
+            boolean persistVolumeStats = vmDiskStatsRetentionEnabled.value();
             if (msHost == null || (msHost.getMsid() != mgmtSrvrId)) {
                 LOGGER.debug("Skipping collect vm disk stats from hosts");
                 return;
@@ -1363,94 +1406,77 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
             List<HostVO> hosts = _hostDao.search(sc, null);
 
             for (HostVO host : hosts) {
+                Date timestamp = new Date();
                 try {
                     Transaction.execute(new TransactionCallbackNoReturn() {
                         @Override
                         public void doInTransactionWithoutResult(TransactionStatus status) {
-                            List<UserVmVO> vms = _userVmDao.listRunningByHostId(host.getId());
-                            List<Long> vmIds = new ArrayList<Long>();
-
-                            for (UserVmVO  vm : vms) {
-                                if (vm.getType() == VirtualMachine.Type.User) // user vm
-                                    vmIds.add(vm.getId());
-                            }
-
-                            HashMap<Long, List<VmDiskStatsEntry>> vmDiskStatsById = _userVmMgr.getVmDiskStatistics(host.getId(), host.getName(), vmIds);
+                            Map<Long, VMInstanceVO> vmMap = getVmMapForStatsForHost(host);
+                            HashMap<Long, List<? extends VmDiskStats>> vmDiskStatsById = virtualMachineManager.getVmDiskStatistics(host.getId(), host.getName(), vmMap);
                             if (vmDiskStatsById == null)
                                 return;
 
                             Set<Long> vmIdSet = vmDiskStatsById.keySet();
                             for (Long vmId : vmIdSet) {
-                                List<VmDiskStatsEntry> vmDiskStats = vmDiskStatsById.get(vmId);
+                                List<? extends VmDiskStats> vmDiskStats = vmDiskStatsById.get(vmId);
                                 if (vmDiskStats == null)
                                     continue;
-                                UserVmVO userVm = _userVmDao.findById(vmId);
-                                for (VmDiskStatsEntry vmDiskStat : vmDiskStats) {
+                                VMInstanceVO vm = vmMap.get(vmId);
+                                for (VmDiskStats vmDiskStat : vmDiskStats) {
+                                    VmDiskStatsEntry vmDiskStatEntry = (VmDiskStatsEntry)vmDiskStat;
                                     SearchCriteria<VolumeVO> sc_volume = _volsDao.createSearchCriteria();
-                                    sc_volume.addAnd("path", SearchCriteria.Op.EQ, vmDiskStat.getPath());
+                                    sc_volume.addAnd("path", SearchCriteria.Op.EQ, vmDiskStatEntry.getPath());
                                     List<VolumeVO> volumes = _volsDao.search(sc_volume, null);
 
                                     if (CollectionUtils.isEmpty(volumes))
                                         break;
 
                                     VolumeVO volume = volumes.get(0);
-                                    VmDiskStatisticsVO previousVmDiskStats = _vmDiskStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), vmId, volume.getId());
-                                    VmDiskStatisticsVO vmDiskStat_lock = _vmDiskStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), vmId, volume.getId());
+                                    VmDiskStatisticsVO previousVmDiskStats = _vmDiskStatsDao.findBy(vm.getAccountId(), vm.getDataCenterId(), vmId, volume.getId());
+                                    VmDiskStatisticsVO vmDiskStat_lock = _vmDiskStatsDao.lock(vm.getAccountId(), vm.getDataCenterId(), vmId, volume.getId());
 
-                                    if (areAllDiskStatsZero(vmDiskStat)) {
+                                    if (persistVolumeStats) {
+                                        persistVolumeStats(volume.getId(), vmDiskStatEntry, vm.getHypervisorType(), timestamp);
+                                    }
+
+                                    if (areAllDiskStatsZero(vmDiskStatEntry)) {
                                         LOGGER.debug("IO/bytes read and write are all 0. Not updating vm_disk_statistics");
                                         continue;
                                     }
 
                                     if (vmDiskStat_lock == null) {
-                                        LOGGER.warn("unable to find vm disk stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId()
+                                        LOGGER.warn("unable to find vm disk stats from host for account: " + vm.getAccountId() + " with vmId: " + vm.getId()
                                                 + " and volumeId:" + volume.getId());
                                         continue;
                                     }
 
                                     if (isCurrentVmDiskStatsDifferentFromPrevious(previousVmDiskStats, vmDiskStat_lock)) {
                                         LOGGER.debug("vm disk stats changed from the time GetVmDiskStatsCommand was sent. " + "Ignoring current answer. Host: " + host.getName()
-                                                + " . VM: " + vmDiskStat.getVmName() + " Read(Bytes): " + toHumanReadableSize(vmDiskStat.getBytesRead()) + " write(Bytes): " + toHumanReadableSize(vmDiskStat.getBytesWrite())
-                                                + " Read(IO): " + toHumanReadableSize(vmDiskStat.getIORead()) + " write(IO): " + toHumanReadableSize(vmDiskStat.getIOWrite()));
+                                                + " . VM: " + vmDiskStatEntry.getVmName() + " Read(Bytes): " + toHumanReadableSize(vmDiskStatEntry.getBytesRead()) + " write(Bytes): " + toHumanReadableSize(vmDiskStatEntry.getBytesWrite())
+                                                + " Read(IO): " + toHumanReadableSize(vmDiskStatEntry.getIORead()) + " write(IO): " + toHumanReadableSize(vmDiskStatEntry.getIOWrite()));
                                         continue;
                                     }
 
-                                    if (vmDiskStat_lock.getCurrentBytesRead() > vmDiskStat.getBytesRead()) {
-                                        if (LOGGER.isDebugEnabled()) {
-                                            LOGGER.debug("Read # of bytes that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
-                                                    + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + toHumanReadableSize(vmDiskStat.getBytesRead()) + " Stored: "
-                                                    + vmDiskStat_lock.getCurrentBytesRead());
-                                        }
+                                    if (vmDiskStat_lock.getCurrentBytesRead() > vmDiskStatEntry.getBytesRead()) {
+                                        logLessLatestStatDiscrepancy("Read # of bytes", host.getName(), vmDiskStatEntry.getVmName(), vmDiskStatEntry.getBytesRead(), vmDiskStat_lock.getCurrentBytesRead(), true);
                                         vmDiskStat_lock.setNetBytesRead(vmDiskStat_lock.getNetBytesRead() + vmDiskStat_lock.getCurrentBytesRead());
                                     }
-                                    vmDiskStat_lock.setCurrentBytesRead(vmDiskStat.getBytesRead());
-                                    if (vmDiskStat_lock.getCurrentBytesWrite() > vmDiskStat.getBytesWrite()) {
-                                        if (LOGGER.isDebugEnabled()) {
-                                            LOGGER.debug("Write # of bytes that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
-                                                    + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + toHumanReadableSize(vmDiskStat.getBytesWrite()) + " Stored: "
-                                                    + toHumanReadableSize(vmDiskStat_lock.getCurrentBytesWrite()));
-                                        }
+                                    vmDiskStat_lock.setCurrentBytesRead(vmDiskStatEntry.getBytesRead());
+                                    if (vmDiskStat_lock.getCurrentBytesWrite() > vmDiskStatEntry.getBytesWrite()) {
+                                        logLessLatestStatDiscrepancy("Write # of bytes", host.getName(), vmDiskStatEntry.getVmName(), vmDiskStatEntry.getBytesWrite(), vmDiskStat_lock.getCurrentBytesWrite(), true);
                                         vmDiskStat_lock.setNetBytesWrite(vmDiskStat_lock.getNetBytesWrite() + vmDiskStat_lock.getCurrentBytesWrite());
                                     }
-                                    vmDiskStat_lock.setCurrentBytesWrite(vmDiskStat.getBytesWrite());
-                                    if (vmDiskStat_lock.getCurrentIORead() > vmDiskStat.getIORead()) {
-                                        if (LOGGER.isDebugEnabled()) {
-                                            LOGGER.debug("Read # of IO that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
-                                                    + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIORead() + " Stored: "
-                                                    + vmDiskStat_lock.getCurrentIORead());
-                                        }
+                                    vmDiskStat_lock.setCurrentBytesWrite(vmDiskStatEntry.getBytesWrite());
+                                    if (vmDiskStat_lock.getCurrentIORead() > vmDiskStatEntry.getIORead()) {
+                                        logLessLatestStatDiscrepancy("Read # of IO", host.getName(), vmDiskStatEntry.getVmName(), vmDiskStatEntry.getIORead(), vmDiskStat_lock.getCurrentIORead(), false);
                                         vmDiskStat_lock.setNetIORead(vmDiskStat_lock.getNetIORead() + vmDiskStat_lock.getCurrentIORead());
                                     }
-                                    vmDiskStat_lock.setCurrentIORead(vmDiskStat.getIORead());
-                                    if (vmDiskStat_lock.getCurrentIOWrite() > vmDiskStat.getIOWrite()) {
-                                        if (LOGGER.isDebugEnabled()) {
-                                            LOGGER.debug("Write # of IO that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
-                                                    + host.getName() + " . VM: " + vmDiskStat.getVmName() + " Reported: " + vmDiskStat.getIOWrite() + " Stored: "
-                                                    + vmDiskStat_lock.getCurrentIOWrite());
-                                        }
+                                    vmDiskStat_lock.setCurrentIORead(vmDiskStatEntry.getIORead());
+                                    if (vmDiskStat_lock.getCurrentIOWrite() > vmDiskStatEntry.getIOWrite()) {
+                                        logLessLatestStatDiscrepancy("Write # of IO", host.getName(), vmDiskStatEntry.getVmName(), vmDiskStatEntry.getIOWrite(), vmDiskStat_lock.getCurrentIOWrite(), false);
                                         vmDiskStat_lock.setNetIOWrite(vmDiskStat_lock.getNetIOWrite() + vmDiskStat_lock.getCurrentIOWrite());
                                     }
-                                    vmDiskStat_lock.setCurrentIOWrite(vmDiskStat.getIOWrite());
+                                    vmDiskStat_lock.setCurrentIOWrite(vmDiskStatEntry.getIOWrite());
 
                                     if (!_dailyOrHourly) {
                                         //update agg bytes
@@ -1493,22 +1519,15 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
                     Transaction.execute(new TransactionCallbackNoReturn() {
                         @Override
                         public void doInTransactionWithoutResult(TransactionStatus status) {
-                            List<UserVmVO> vms = _userVmDao.listRunningByHostId(host.getId());
-                            List<Long> vmIds = new ArrayList<Long>();
-
-                            for (UserVmVO vm : vms) {
-                                if (vm.getType() == VirtualMachine.Type.User) // user vm
-                                    vmIds.add(vm.getId());
-                            }
-
-                            HashMap<Long, List<VmNetworkStatsEntry>> vmNetworkStatsById = _userVmMgr.getVmNetworkStatistics(host.getId(), host.getName(), vmIds);
+                            Map<Long, VMInstanceVO> vmMap = getVmMapForStatsForHost(host);
+                            HashMap<Long, List<? extends VmNetworkStats>> vmNetworkStatsById = virtualMachineManager.getVmNetworkStatistics(host.getId(), host.getName(), vmMap);
                             if (vmNetworkStatsById == null)
                                 return;
 
                             Set<Long> vmIdSet = vmNetworkStatsById.keySet();
                             for (Long vmId : vmIdSet) {
-                                List<VmNetworkStatsEntry> vmNetworkStats = vmNetworkStatsById.get(vmId);
-                                if (vmNetworkStats == null)
+                                List<? extends VmNetworkStats> vmNetworkStats = vmNetworkStatsById.get(vmId);
+                                if (CollectionUtils.isEmpty(vmNetworkStats))
                                     continue;
                                 UserVmVO userVm = _userVmDao.findById(vmId);
                                 if (userVm == null) {
@@ -1517,9 +1536,10 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
                                 }
                                 LOGGER.debug("Now we are updating the user_statistics table for VM: " + userVm.getInstanceName()
                                         + " after collecting vm network statistics from host: " + host.getName());
-                                for (VmNetworkStatsEntry vmNetworkStat : vmNetworkStats) {
+                                for (VmNetworkStats vmNetworkStat : vmNetworkStats) {
+                                    VmNetworkStatsEntry vmNetworkStatEntry = (VmNetworkStatsEntry)vmNetworkStat;
                                     SearchCriteria<NicVO> sc_nic = _nicDao.createSearchCriteria();
-                                    sc_nic.addAnd("macAddress", SearchCriteria.Op.EQ, vmNetworkStat.getMacAddress());
+                                    sc_nic.addAnd("macAddress", SearchCriteria.Op.EQ, vmNetworkStatEntry.getMacAddress());
                                     NicVO nic = _nicDao.search(sc_nic, null).get(0);
                                     List<VlanVO> vlan = _vlanDao.listVlansByNetworkId(nic.getNetworkId());
                                     if (vlan == null || vlan.size() == 0 || vlan.get(0).getVlanType() != VlanType.DirectAttached)
@@ -1534,7 +1554,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
                                     UserStatisticsVO vmNetworkStat_lock = _userStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), nic.getNetworkId(),
                                             nic.getIPv4Address(), vmId, "UserVm");
 
-                                    if ((vmNetworkStat.getBytesSent() == 0) && (vmNetworkStat.getBytesReceived() == 0)) {
+                                    if ((vmNetworkStatEntry.getBytesSent() == 0) && (vmNetworkStatEntry.getBytesReceived() == 0)) {
                                         LOGGER.debug("bytes sent and received are all 0. Not updating user_statistics");
                                         continue;
                                     }
@@ -1548,30 +1568,22 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
                                     if (previousvmNetworkStats != null && ((previousvmNetworkStats.getCurrentBytesSent() != vmNetworkStat_lock.getCurrentBytesSent())
                                             || (previousvmNetworkStats.getCurrentBytesReceived() != vmNetworkStat_lock.getCurrentBytesReceived()))) {
                                         LOGGER.debug("vm network stats changed from the time GetNmNetworkStatsCommand was sent. " + "Ignoring current answer. Host: "
-                                                + host.getName() + " . VM: " + vmNetworkStat.getVmName() + " Sent(Bytes): " + vmNetworkStat.getBytesSent() + " Received(Bytes): "
-                                                + vmNetworkStat.getBytesReceived());
+                                                + host.getName() + " . VM: " + vmNetworkStatEntry.getVmName() + " Sent(Bytes): " + vmNetworkStatEntry.getBytesSent() + " Received(Bytes): "
+                                                + vmNetworkStatEntry.getBytesReceived());
                                         continue;
                                     }
 
-                                    if (vmNetworkStat_lock.getCurrentBytesSent() > vmNetworkStat.getBytesSent()) {
-                                        if (LOGGER.isDebugEnabled()) {
-                                            LOGGER.debug("Sent # of bytes that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
-                                                    + host.getName() + " . VM: " + vmNetworkStat.getVmName() + " Reported: " + toHumanReadableSize(vmNetworkStat.getBytesSent()) + " Stored: "
-                                                    + toHumanReadableSize(vmNetworkStat_lock.getCurrentBytesSent()));
-                                        }
+                                    if (vmNetworkStat_lock.getCurrentBytesSent() > vmNetworkStatEntry.getBytesSent()) {
+                                        logLessLatestStatDiscrepancy("Sent # of bytes", host.getName(), vmNetworkStatEntry.getVmName(), vmNetworkStatEntry.getBytesSent(), vmNetworkStat_lock.getCurrentBytesSent(), true);
                                         vmNetworkStat_lock.setNetBytesSent(vmNetworkStat_lock.getNetBytesSent() + vmNetworkStat_lock.getCurrentBytesSent());
                                     }
-                                    vmNetworkStat_lock.setCurrentBytesSent(vmNetworkStat.getBytesSent());
-
-                                    if (vmNetworkStat_lock.getCurrentBytesReceived() > vmNetworkStat.getBytesReceived()) {
-                                        if (LOGGER.isDebugEnabled()) {
-                                            LOGGER.debug("Received # of bytes that's less than the last one.  " + "Assuming something went wrong and persisting it. Host: "
-                                                    + host.getName() + " . VM: " + vmNetworkStat.getVmName() + " Reported: " + toHumanReadableSize(vmNetworkStat.getBytesReceived()) + " Stored: "
-                                                    + toHumanReadableSize(vmNetworkStat_lock.getCurrentBytesReceived()));
-                                        }
+                                    vmNetworkStat_lock.setCurrentBytesSent(vmNetworkStatEntry.getBytesSent());
+
+                                    if (vmNetworkStat_lock.getCurrentBytesReceived() > vmNetworkStatEntry.getBytesReceived()) {
+                                        logLessLatestStatDiscrepancy("Received # of bytes", host.getName(), vmNetworkStatEntry.getVmName(), vmNetworkStatEntry.getBytesReceived(), vmNetworkStat_lock.getCurrentBytesReceived(), true);
                                         vmNetworkStat_lock.setNetBytesReceived(vmNetworkStat_lock.getNetBytesReceived() + vmNetworkStat_lock.getCurrentBytesReceived());
                                     }
-                                    vmNetworkStat_lock.setCurrentBytesReceived(vmNetworkStat.getBytesReceived());
+                                    vmNetworkStat_lock.setCurrentBytesReceived(vmNetworkStatEntry.getBytesReceived());
 
                                     if (!_dailyOrHourly) {
                                         //update agg bytes
@@ -1882,6 +1894,39 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
         vmStatsDao.persist(vmStatsVO);
     }
 
+    private String getVmDiskStatsEntryAsString(VmDiskStatsEntry statsForCurrentIteration, Hypervisor.HypervisorType hypervisorType) {
+        VmDiskStatsEntry entry;
+        if (Hypervisor.HypervisorType.KVM.equals(hypervisorType)) {
+            entry = new VmDiskStatsEntry(statsForCurrentIteration.getVmName(),
+                    statsForCurrentIteration.getPath(),
+                    statsForCurrentIteration.getDeltaIoWrite(),
+                    statsForCurrentIteration.getDeltaIoRead(),
+                    statsForCurrentIteration.getDeltaBytesWrite(),
+                    statsForCurrentIteration.getDeltaBytesRead());
+        } else {
+            entry = statsForCurrentIteration;
+        }
+        JsonElement element = gson.toJsonTree(entry);
+        JsonObject obj = element.getAsJsonObject();
+        for (String key : Arrays.asList("deltaIoRead", "deltaIoWrite", "deltaBytesWrite", "deltaBytesRead")) {
+            obj.remove(key);
+        }
+        return obj.toString();
+    }
+
+    /**
+     * Persists VM disk stats in the database.
+     * @param statsForCurrentIteration the metrics stats data to persist.
+     * @param timestamp the time that will be stamped.
+     */
+    protected void persistVolumeStats(long volumeId, VmDiskStatsEntry statsForCurrentIteration, Hypervisor.HypervisorType hypervisorType, Date timestamp) {
+        VolumeStatsVO volumeStatsVO = new VolumeStatsVO(volumeId, msId, timestamp, getVmDiskStatsEntryAsString(statsForCurrentIteration, hypervisorType));
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(String.format("Recording volume stats: [%s].", volumeStatsVO));
+        }
+        volumeStatsDao.persist(volumeStatsVO);
+    }
+
     /**
      * Removes the oldest VM stats records according to the global
      * parameter {@code vm.stats.max.retention.time}.
@@ -1899,6 +1944,25 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
         vmStatsDao.removeAllByTimestampLessThan(limit);
     }
 
+    /**
+     * Removes the oldest Volume stats records according to the global
+     * parameter {@code vm.disk.stats.max.retention.time}.
+     */
+    protected void cleanUpVolumeStats() {
+        Integer maxRetentionTime = vmDiskStatsMaxRetentionTime.value();
+        if (maxRetentionTime <= 0) {
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(String.format("Skipping Volume stats cleanup. The [%s] parameter [%s] is set to 0 or less than 0.",
+                        vmDiskStatsMaxRetentionTime.scope(), vmDiskStatsMaxRetentionTime.toString()));
+            }
+            return;
+        }
+        LOGGER.trace("Removing older Volume stats records.");
+        Date now = new Date();
+        Date limit = DateUtils.addMinutes(now, -maxRetentionTime);
+        volumeStatsDao.removeAllByTimestampLessThan(limit);
+    }
+
     /**
      * Sends host metrics to a configured InfluxDB host. The metrics respects the following specification.</br>
      * <b>Tags:</b>vm_id, uuid, instance_name, data_center_id, host_id</br>
@@ -1929,10 +1993,9 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
      */
     protected Point createInfluxDbPointForVmMetrics(Object metricsObject) {
         VmStatsEntry vmStatsEntry = (VmStatsEntry)metricsObject;
-        UserVmVO userVmVO = vmStatsEntry.getUserVmVO();
 
         Map<String, String> tagsToAdd = new HashMap<>();
-        tagsToAdd.put(UUID_TAG, userVmVO.getUuid());
+        tagsToAdd.put(UUID_TAG, vmStatsEntry.getVmUuid());
 
         Map<String, Object> fieldsToAdd = new HashMap<>();
         fieldsToAdd.put(TOTAL_MEMORY_KBS_FIELD, vmStatsEntry.getMemoryKBs());
@@ -2053,7 +2116,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
     @Override
     public ConfigKey<?>[] getConfigKeys() {
         return new ConfigKey<?>[] {vmDiskStatsInterval, vmDiskStatsIntervalMin, vmNetworkStatsInterval, vmNetworkStatsIntervalMin, StatsTimeout, statsOutputUri,
-            vmStatsIncrementMetrics, vmStatsMaxRetentionTime,
+            vmStatsIncrementMetrics, vmStatsMaxRetentionTime, vmStatsCollectUserVMOnly, vmDiskStatsRetentionEnabled, vmDiskStatsMaxRetentionTime,
                 VM_STATS_INCREMENT_METRICS_IN_MEMORY,
                 MANAGEMENT_SERVER_STATUS_COLLECTION_INTERVAL,
                 DATABASE_SERVER_STATUS_COLLECTION_INTERVAL,
diff --git a/server/src/main/java/com/cloud/vm/UserVmManager.java b/server/src/main/java/com/cloud/vm/UserVmManager.java
index 5029615b241..39f1e5d2d28 100644
--- a/server/src/main/java/com/cloud/vm/UserVmManager.java
+++ b/server/src/main/java/com/cloud/vm/UserVmManager.java
@@ -20,14 +20,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.cloud.offering.ServiceOffering;
-import com.cloud.template.VirtualMachineTemplate;
 import org.apache.cloudstack.api.BaseCmd.HTTPMethod;
 import org.apache.cloudstack.framework.config.ConfigKey;
 
-import com.cloud.agent.api.VmDiskStatsEntry;
-import com.cloud.agent.api.VmNetworkStatsEntry;
-import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.agent.api.VolumeStatsEntry;
 import com.cloud.exception.ConcurrentOperationException;
 import com.cloud.exception.InsufficientCapacityException;
@@ -35,8 +30,10 @@ import com.cloud.exception.ManagementServerException;
 import com.cloud.exception.ResourceAllocationException;
 import com.cloud.exception.ResourceUnavailableException;
 import com.cloud.exception.VirtualMachineMigrationException;
+import com.cloud.offering.ServiceOffering;
 import com.cloud.service.ServiceOfferingVO;
 import com.cloud.storage.Storage.StoragePoolType;
+import com.cloud.template.VirtualMachineTemplate;
 import com.cloud.uservm.UserVm;
 import com.cloud.utils.Pair;
 
@@ -85,17 +82,6 @@ public interface UserVmManager extends UserVmService {
      */
     boolean stopVirtualMachine(long userId, long vmId);
 
-    /**
-     * Obtains statistics for a list of host or VMs; CPU and network utilization
-     * @param host ID
-     * @param host name
-     * @param list of VM IDs or host id
-     * @return GetVmStatsAnswer
-     */
-    HashMap<Long, VmStatsEntry> getVirtualMachineStatistics(long hostId, String hostName, List<Long> vmIds);
-
-    HashMap<Long, List<VmDiskStatsEntry>> getVmDiskStatistics(long hostId, String hostName, List<Long> vmIds);
-
     HashMap<String, VolumeStatsEntry> getVolumeStatistics(long clusterId, String poolUuid, StoragePoolType poolType, int timeout);
 
     boolean deleteVmGroup(long groupId);
@@ -136,8 +122,6 @@ public interface UserVmManager extends UserVmService {
 
     void persistDeviceBusInfo(UserVmVO paramUserVmVO, String paramString);
 
-    HashMap<Long, List<VmNetworkStatsEntry>> getVmNetworkStatistics(long hostId, String hostName, List<Long> vmIds);
-
     boolean checkIfDynamicScalingCanBeEnabled(VirtualMachine vm, ServiceOffering offering, VirtualMachineTemplate template, Long zoneId);
 
     Boolean getDestroyRootVolumeOnVmDestruction(Long domainId);
diff --git a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
index 8815ea65eb0..ea72a38cbb9 100644
--- a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
+++ b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
@@ -148,8 +148,6 @@ import com.cloud.agent.api.GetVmDiskStatsCommand;
 import com.cloud.agent.api.GetVmIpAddressCommand;
 import com.cloud.agent.api.GetVmNetworkStatsAnswer;
 import com.cloud.agent.api.GetVmNetworkStatsCommand;
-import com.cloud.agent.api.GetVmStatsAnswer;
-import com.cloud.agent.api.GetVmStatsCommand;
 import com.cloud.agent.api.GetVolumeStatsAnswer;
 import com.cloud.agent.api.GetVolumeStatsCommand;
 import com.cloud.agent.api.ModifyTargetsCommand;
@@ -159,7 +157,6 @@ import com.cloud.agent.api.RestoreVMSnapshotCommand;
 import com.cloud.agent.api.StartAnswer;
 import com.cloud.agent.api.VmDiskStatsEntry;
 import com.cloud.agent.api.VmNetworkStatsEntry;
-import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.agent.api.VolumeStatsEntry;
 import com.cloud.agent.api.to.DiskTO;
 import com.cloud.agent.api.to.NicTO;
@@ -1867,41 +1864,6 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
         }
     }
 
-    @Override
-    public HashMap<Long, List<VmDiskStatsEntry>> getVmDiskStatistics(long hostId, String hostName, List<Long> vmIds) throws CloudRuntimeException {
-        HashMap<Long, List<VmDiskStatsEntry>> vmDiskStatsById = new HashMap<Long, List<VmDiskStatsEntry>>();
-
-        if (vmIds.isEmpty()) {
-            return vmDiskStatsById;
-        }
-
-        List<String> vmNames = new ArrayList<String>();
-
-        for (Long vmId : vmIds) {
-            UserVmVO vm = _vmDao.findById(vmId);
-            vmNames.add(vm.getInstanceName());
-        }
-
-        Answer answer = _agentMgr.easySend(hostId, new GetVmDiskStatsCommand(vmNames, _hostDao.findById(hostId).getGuid(), hostName));
-        if (answer == null || !answer.getResult()) {
-            s_logger.warn("Unable to obtain VM disk statistics.");
-            return null;
-        } else {
-            HashMap<String, List<VmDiskStatsEntry>> vmDiskStatsByName = ((GetVmDiskStatsAnswer)answer).getVmDiskStatsMap();
-
-            if (vmDiskStatsByName == null) {
-                s_logger.warn("Unable to obtain VM disk statistics.");
-                return null;
-            }
-
-            for (Map.Entry<String, List<VmDiskStatsEntry>> entry: vmDiskStatsByName.entrySet()) {
-                vmDiskStatsById.put(vmIds.get(vmNames.indexOf(entry.getKey())), entry.getValue());
-            }
-        }
-
-        return vmDiskStatsById;
-    }
-
     @Override
     public boolean upgradeVirtualMachine(Long vmId, Long newServiceOfferingId, Map<String, String> customParameters) throws ResourceUnavailableException,
     ConcurrentOperationException, ManagementServerException, VirtualMachineMigrationException {
@@ -2171,41 +2133,6 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
         }
     }
 
-    @Override
-    public HashMap<Long, VmStatsEntry> getVirtualMachineStatistics(long hostId, String hostName, List<Long> vmIds) throws CloudRuntimeException {
-        HashMap<Long, VmStatsEntry> vmStatsById = new HashMap<Long, VmStatsEntry>();
-
-        if (vmIds.isEmpty()) {
-            return vmStatsById;
-        }
-
-        List<String> vmNames = new ArrayList<String>();
-
-        for (Long vmId : vmIds) {
-            UserVmVO vm = _vmDao.findById(vmId);
-            vmNames.add(vm.getInstanceName());
-        }
-
-        Answer answer = _agentMgr.easySend(hostId, new GetVmStatsCommand(vmNames, _hostDao.findById(hostId).getGuid(), hostName));
-        if (answer == null || !answer.getResult()) {
-            s_logger.warn("Unable to obtain VM statistics.");
-            return null;
-        } else {
-            HashMap<String, VmStatsEntry> vmStatsByName = ((GetVmStatsAnswer)answer).getVmStatsMap();
-
-            if (vmStatsByName == null) {
-                s_logger.warn("Unable to obtain VM statistics.");
-                return null;
-            }
-
-            for (Map.Entry<String, VmStatsEntry> entry : vmStatsByName.entrySet()) {
-                vmStatsById.put(vmIds.get(vmNames.indexOf(entry.getKey())), entry.getValue());
-            }
-        }
-
-        return vmStatsById;
-    }
-
     @Override
     public HashMap<String, VolumeStatsEntry> getVolumeStatistics(long clusterId, String poolUuid, StoragePoolType poolType,  int timeout) {
         List<HostVO> neighbors = _resourceMgr.listHostsInClusterByStatus(clusterId, Status.Up);
@@ -4718,41 +4645,6 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
         }
     }
 
-    @Override
-    public HashMap<Long, List<VmNetworkStatsEntry>> getVmNetworkStatistics(long hostId, String hostName, List<Long> vmIds) {
-        HashMap<Long, List<VmNetworkStatsEntry>> vmNetworkStatsById = new HashMap<Long, List<VmNetworkStatsEntry>>();
-
-        if (vmIds.isEmpty()) {
-            return vmNetworkStatsById;
-        }
-
-        List<String> vmNames = new ArrayList<String>();
-
-        for (Long vmId : vmIds) {
-            UserVmVO vm = _vmDao.findById(vmId);
-            vmNames.add(vm.getInstanceName());
-        }
-
-        Answer answer = _agentMgr.easySend(hostId, new GetVmNetworkStatsCommand(vmNames, _hostDao.findById(hostId).getGuid(), hostName));
-        if (answer == null || !answer.getResult()) {
-            s_logger.warn("Unable to obtain VM network statistics.");
-            return null;
-        } else {
-            HashMap<String, List<VmNetworkStatsEntry>> vmNetworkStatsByName = ((GetVmNetworkStatsAnswer)answer).getVmNetworkStatsMap();
-
-            if (vmNetworkStatsByName == null) {
-                s_logger.warn("Unable to obtain VM network statistics.");
-                return null;
-            }
-
-            for (String vmName : vmNetworkStatsByName.keySet()) {
-                vmNetworkStatsById.put(vmIds.get(vmNames.indexOf(vmName)), vmNetworkStatsByName.get(vmName));
-            }
-        }
-
-        return vmNetworkStatsById;
-    }
-
     @Override
     public void collectVmNetworkStatistics (final UserVm userVm) {
         if (!userVm.getHypervisorType().equals(HypervisorType.KVM)) {
diff --git a/server/src/test/java/com/cloud/network/as/AutoScaleManagerImplTest.java b/server/src/test/java/com/cloud/network/as/AutoScaleManagerImplTest.java
index 870f76d2c36..e60ce86fc3f 100644
--- a/server/src/test/java/com/cloud/network/as/AutoScaleManagerImplTest.java
+++ b/server/src/test/java/com/cloud/network/as/AutoScaleManagerImplTest.java
@@ -16,15 +16,74 @@
 // under the License.
 package com.cloud.network.as;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.matches;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.cloudstack.affinity.AffinityGroupVO;
+import org.apache.cloudstack.affinity.dao.AffinityGroupDao;
+import org.apache.cloudstack.annotation.AnnotationService;
+import org.apache.cloudstack.annotation.dao.AnnotationDao;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.command.admin.autoscale.CreateCounterCmd;
+import org.apache.cloudstack.api.command.user.autoscale.CreateAutoScalePolicyCmd;
+import org.apache.cloudstack.api.command.user.autoscale.CreateAutoScaleVmGroupCmd;
+import org.apache.cloudstack.api.command.user.autoscale.CreateAutoScaleVmProfileCmd;
+import org.apache.cloudstack.api.command.user.autoscale.CreateConditionCmd;
+import org.apache.cloudstack.api.command.user.autoscale.ListCountersCmd;
+import org.apache.cloudstack.api.command.user.autoscale.UpdateAutoScaleVmGroupCmd;
+import org.apache.cloudstack.api.command.user.autoscale.UpdateAutoScaleVmProfileCmd;
+import org.apache.cloudstack.api.command.user.autoscale.UpdateConditionCmd;
+import org.apache.cloudstack.api.command.user.vm.DeployVMCmd;
+import org.apache.cloudstack.config.ApiServiceConfiguration;
+import org.apache.cloudstack.context.CallContext;
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.springframework.test.util.ReflectionTestUtils;
+
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.PerformanceMonitorAnswer;
 import com.cloud.agent.api.PerformanceMonitorCommand;
 import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.agent.api.routing.GetAutoScaleMetricsAnswer;
 import com.cloud.agent.api.routing.GetAutoScaleMetricsCommand;
-import com.cloud.agent.api.to.LoadBalancerTO.AutoScaleVmProfileTO;
-import com.cloud.agent.api.to.LoadBalancerTO.AutoScaleVmGroupTO;
 import com.cloud.agent.api.to.LoadBalancerTO.AutoScalePolicyTO;
+import com.cloud.agent.api.to.LoadBalancerTO.AutoScaleVmGroupTO;
+import com.cloud.agent.api.to.LoadBalancerTO.AutoScaleVmProfileTO;
 import com.cloud.agent.api.to.LoadBalancerTO.ConditionTO;
 import com.cloud.agent.api.to.LoadBalancerTO.CounterTO;
 import com.cloud.api.dispatch.DispatchChain;
@@ -99,67 +158,12 @@ import com.cloud.vm.UserVmManager;
 import com.cloud.vm.UserVmService;
 import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.VirtualMachineManager;
 import com.cloud.vm.VirtualMachineProfile;
+import com.cloud.vm.VmStats;
 import com.cloud.vm.dao.DomainRouterDao;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.VMInstanceDao;
-import org.apache.cloudstack.affinity.AffinityGroupVO;
-import org.apache.cloudstack.affinity.dao.AffinityGroupDao;
-import org.apache.cloudstack.annotation.AnnotationService;
-import org.apache.cloudstack.annotation.dao.AnnotationDao;
-import org.apache.cloudstack.api.ApiConstants;
-import org.apache.cloudstack.api.command.admin.autoscale.CreateCounterCmd;
-import org.apache.cloudstack.api.command.user.autoscale.CreateAutoScalePolicyCmd;
-import org.apache.cloudstack.api.command.user.autoscale.CreateAutoScaleVmGroupCmd;
-import org.apache.cloudstack.api.command.user.autoscale.CreateAutoScaleVmProfileCmd;
-import org.apache.cloudstack.api.command.user.autoscale.CreateConditionCmd;
-import org.apache.cloudstack.api.command.user.autoscale.ListCountersCmd;
-import org.apache.cloudstack.api.command.user.autoscale.UpdateAutoScaleVmGroupCmd;
-import org.apache.cloudstack.api.command.user.autoscale.UpdateAutoScaleVmProfileCmd;
-import org.apache.cloudstack.api.command.user.autoscale.UpdateConditionCmd;
-import org.apache.cloudstack.api.command.user.vm.DeployVMCmd;
-import org.apache.cloudstack.config.ApiServiceConfiguration;
-import org.apache.cloudstack.context.CallContext;
-import org.apache.cloudstack.framework.config.ConfigKey;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.Spy;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.springframework.test.util.ReflectionTestUtils;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.matches;
-import static org.mockito.ArgumentMatchers.nullable;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PowerMockIgnore("javax.management.*")
@@ -251,6 +255,9 @@ public class AutoScaleManagerImplTest {
     @Mock
     VMInstanceDao vmInstanceDao;
 
+    @Mock
+    VirtualMachineManager virtualMachineManager;
+
     AccountVO account;
     UserVO user;
 
@@ -2204,14 +2211,14 @@ public class AutoScaleManagerImplTest {
     @Test
     public void getVmStatsByIdFromHost() {
         List<Long> vmIds = Mockito.mock(ArrayList.class);
-        HashMap<Long, VmStatsEntry> vmStatsById = Mockito.mock(HashMap.class);
-        when(userVmMgr.getVirtualMachineStatistics(anyLong(), anyString(), any())).thenReturn(vmStatsById);
+        Map<Long, VmStatsEntry> vmStatsById = Mockito.mock(HashMap.class);
+        Mockito.doReturn(vmStatsById).when(virtualMachineManager).getVirtualMachineStatistics(anyLong(), anyString(), anyList());
 
-        Map<Long, VmStatsEntry> result = autoScaleManagerImplSpy.getVmStatsByIdFromHost(-1L, vmIds);
+        Map<Long, ? extends VmStats> result = autoScaleManagerImplSpy.getVmStatsByIdFromHost(-1L, vmIds);
 
         Assert.assertEquals(0, result.size());
 
-        Mockito.verify(userVmMgr, never()).getVirtualMachineStatistics(anyLong(), anyString(), any());
+        Mockito.verify(virtualMachineManager, never()).getVirtualMachineStatistics(anyLong(), anyString(), anyList());
     }
 
     @Test
@@ -2223,13 +2230,13 @@ public class AutoScaleManagerImplTest {
         when(hostDao.findById(hostId)).thenReturn(hostMock);
         when(hostMock.getId()).thenReturn(hostId);
         when(hostMock.getName()).thenReturn(hostName);
-        when(userVmMgr.getVirtualMachineStatistics(anyLong(), anyString(), any())).thenReturn(vmStatsById);
+        Mockito.doReturn(vmStatsById).when(virtualMachineManager).getVirtualMachineStatistics(anyLong(), anyString(), anyList());
 
-        Map<Long, VmStatsEntry> result = autoScaleManagerImplSpy.getVmStatsByIdFromHost(hostId, vmIds);
+        Map<Long, ? extends VmStats> result = autoScaleManagerImplSpy.getVmStatsByIdFromHost(hostId, vmIds);
 
         Assert.assertEquals(vmStatsById, result);
 
-        Mockito.verify(userVmMgr).getVirtualMachineStatistics(anyLong(), anyString(), any());
+        Mockito.verify(virtualMachineManager).getVirtualMachineStatistics(anyLong(), anyString(), anyList());
     }
 
     @Test
diff --git a/server/src/test/java/com/cloud/server/StatsCollectorTest.java b/server/src/test/java/com/cloud/server/StatsCollectorTest.java
index 206b6b81f7a..e010c8d1c2a 100644
--- a/server/src/test/java/com/cloud/server/StatsCollectorTest.java
+++ b/server/src/test/java/com/cloud/server/StatsCollectorTest.java
@@ -18,9 +18,10 @@
 //
 package com.cloud.server;
 
+import static org.mockito.Mockito.when;
+
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.concurrent.TimeUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
@@ -28,8 +29,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.commons.collections.CollectionUtils;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
 import org.influxdb.dto.BatchPoints;
@@ -43,6 +46,7 @@ import org.mockito.Captor;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -50,17 +54,19 @@ import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 
 import com.cloud.agent.api.VmDiskStatsEntry;
 import com.cloud.agent.api.VmStatsEntry;
+import com.cloud.hypervisor.Hypervisor;
 import com.cloud.server.StatsCollector.ExternalStatsProtocol;
+import com.cloud.storage.VolumeStatsVO;
+import com.cloud.storage.dao.VolumeStatsDao;
 import com.cloud.user.VmDiskStatisticsVO;
 import com.cloud.utils.exception.CloudRuntimeException;
 import com.cloud.vm.VmStats;
 import com.cloud.vm.VmStatsVO;
 import com.cloud.vm.dao.VmStatsDao;
+import com.google.gson.Gson;
 import com.tngtech.java.junit.dataprovider.DataProvider;
 import com.tngtech.java.junit.dataprovider.DataProviderRunner;
 
-import static org.mockito.Mockito.when;
-
 @RunWith(PowerMockRunner.class)
 @PowerMockRunnerDelegate(DataProviderRunner.class)
 @PrepareForTest({InfluxDBFactory.class, BatchPoints.class})
@@ -97,6 +103,11 @@ public class StatsCollectorTest {
     @Mock
     VmStatsEntry vmStatsEntryMock;
 
+    @Mock
+    VolumeStatsDao volumeStatsDao;
+
+    private static Gson gson = new Gson();
+
     @Test
     public void createInfluxDbConnectionTest() {
         configureAndTestCreateInfluxDbConnection(true);
@@ -421,4 +432,61 @@ public class StatsCollectorTest {
 
         Assert.assertFalse(statsCollector.isDbLocal());
     }
+
+    private void performPersistVolumeStatsTest(Hypervisor.HypervisorType hypervisorType) {
+        Date timestamp = new Date();
+        String vmName= "vm";
+        String path = "path";
+        long ioReadDiff = 100;
+        long ioWriteDiff = 200;
+        long readDiff = 1024;
+        long writeDiff = 0;
+        Long volumeId = 1L;
+        VmDiskStatsEntry statsForCurrentIteration = null;
+        if (Hypervisor.HypervisorType.KVM.equals(hypervisorType)) {
+            statsForCurrentIteration = new VmDiskStatsEntry(vmName, path,
+                    2000 + ioWriteDiff,
+                    1000 + ioReadDiff,
+                    20480 + writeDiff,
+                    10240 + readDiff);
+            statsForCurrentIteration.setDeltaIoRead(ioReadDiff);
+            statsForCurrentIteration.setDeltaIoWrite(ioWriteDiff);
+            statsForCurrentIteration.setDeltaBytesRead(readDiff);
+            statsForCurrentIteration.setDeltaBytesWrite(writeDiff);
+        } else {
+            statsForCurrentIteration = new VmDiskStatsEntry(vmName, path,
+                    ioWriteDiff,
+                    ioReadDiff,
+                    writeDiff,
+                    readDiff);
+        }
+        List<VolumeStatsVO> persistedStats = new ArrayList<>();
+        Mockito.when(volumeStatsDao.persist(Mockito.any(VolumeStatsVO.class))).thenAnswer((Answer<VolumeStatsVO>) invocation -> {
+            VolumeStatsVO statsVO = (VolumeStatsVO)invocation.getArguments()[0];
+            persistedStats.add(statsVO);
+            return statsVO;
+        });
+        statsCollector.persistVolumeStats(volumeId, statsForCurrentIteration, hypervisorType, timestamp);
+        Assert.assertTrue(CollectionUtils.isNotEmpty(persistedStats));
+        Assert.assertNotNull(persistedStats.get(0));
+        VolumeStatsVO stat = persistedStats.get(0);
+        Assert.assertEquals(volumeId, stat.getVolumeId());
+        VmDiskStatsEntry entry = gson.fromJson(stat.getVolumeStatsData(), VmDiskStatsEntry.class);
+        Assert.assertEquals(vmName, entry.getVmName());
+        Assert.assertEquals(path, entry.getPath());
+        Assert.assertEquals(ioReadDiff, entry.getIORead());
+        Assert.assertEquals(ioWriteDiff, entry.getIOWrite());
+        Assert.assertEquals(readDiff, entry.getBytesRead());
+        Assert.assertEquals(writeDiff, entry.getBytesWrite());
+    }
+
+    @Test
+    public void testPersistVolumeStatsKVM() {
+        performPersistVolumeStatsTest(Hypervisor.HypervisorType.KVM);
+    }
+
+    @Test
+    public void testPersistVolumeStatsVmware() {
+        performPersistVolumeStatsTest(Hypervisor.HypervisorType.VMware);
+    }
 }
diff --git a/test/integration/smoke/test_metrics_api.py b/test/integration/smoke/test_metrics_api.py
index 3ff602c1a71..d5ad559fad0 100644
--- a/test/integration/smoke/test_metrics_api.py
+++ b/test/integration/smoke/test_metrics_api.py
@@ -23,6 +23,7 @@ from marvin.lib.base import *
 from marvin.lib.common import *
 from marvin.lib.utils import (random_gen)
 from nose.plugins.attrib import attr
+from marvin.lib.decoratorGenerators import skipTestIf
 
 import time
 
@@ -42,6 +43,7 @@ class TestMetrics(cloudstackTestCase):
                                zoneid=cls.zone.id,
                                type='Routing')[0]
         cls.cluster = cls.apiclient.listClusters(listClusters.listClustersCmd())[0]
+        cls.mgtSvrDetails = cls.config.__dict__["mgtSvr"][0].__dict__
         cls._cleanup = []
         cls.disk_offering = DiskOffering.create(
             cls.apiclient,
@@ -52,12 +54,12 @@ class TestMetrics(cloudstackTestCase):
             cls.apiclient,
             cls.services["service_offering"]
         )
+        cls._cleanup.append(cls.service_offering)
         cls.template = get_test_template(
             cls.apiclient,
             cls.zone.id,
             cls.hypervisor
         )
-        cls._cleanup.append(cls.service_offering)
         cls.domain = get_domain(cls.apiclient)
         cls.account = Account.create(
             cls.apiclient,
@@ -66,11 +68,80 @@ class TestMetrics(cloudstackTestCase):
             domainid=cls.domain.id
         )
         cls._cleanup.append(cls.account)
+        cls.hypervisorNotSupported = True
+        if cls.hypervisor.lower() != 'simulator':
+            cls.hypervisorNotSupported = False
+            cls.vm_stats_interval_cfg = Configurations.list(cls.apiclient, name='vm.stats.interval')[0].value
+            cls.vm_stats_max_retention_time_cfg = Configurations.list(cls.apiclient, name='vm.stats.max.retention.time')[0].value
+            cls.vm_stats_user_vm_only_cfg = Configurations.list(cls.apiclient, name='vm.stats.user.vm.only')[0].value
+            cls.vm_disk_stats_interval_cfg = Configurations.list(cls.apiclient, name='vm.disk.stats.interval')[0].value
+            cls.vm_disk_stats_interval_min_cfg = Configurations.list(cls.apiclient, name='vm.disk.stats.interval.min')[0].value
+            cls.vm_disk_stats_max_retention_time_cfg = Configurations.list(cls.apiclient, name='vm.disk.stats.max.retention.time')[0].value
+            cls.vm_disk_stats_retention_enabled_cfg = Configurations.list(cls.apiclient, name='vm.disk.stats.retention.enabled')[0].value
+            Configurations.update(cls.apiclient, 'vm.stats.interval', value='60000')
+            Configurations.update(cls.apiclient, 'vm.stats.max.retention.time', value='7200')
+            Configurations.update(cls.apiclient, 'vm.stats.user.vm.only', value='false')
+            Configurations.update(cls.apiclient, 'vm.disk.stats.interval', value='60')
+            Configurations.update(cls.apiclient, 'vm.disk.stats.interval.min', value='60')
+            Configurations.update(cls.apiclient, 'vm.disk.stats.max.retention.time', value='7200')
+            Configurations.update(cls.apiclient, 'vm.disk.stats.retention.enabled', value='true')
+            cls.restartServer()
 
     @classmethod
     def tearDownClass(cls):
+        if cls.hypervisor.lower() != 'simulator':
+            cls.updateConfiguration('vm.stats.interval', cls.vm_stats_interval_cfg)
+            cls.updateConfiguration('vm.stats.max.retention.time', cls.vm_stats_max_retention_time_cfg)
+            cls.updateConfiguration('vm.stats.user.vm.only', cls.vm_stats_user_vm_only_cfg)
+            cls.updateConfiguration('vm.disk.stats.interval', cls.vm_disk_stats_interval_cfg)
+            cls.updateConfiguration('vm.disk.stats.interval.min', cls.vm_disk_stats_interval_min_cfg)
+            cls.updateConfiguration('vm.disk.stats.max.retention.time', cls.vm_disk_stats_max_retention_time_cfg)
+            cls.updateConfiguration('vm.disk.stats.retention.enabled', cls.vm_disk_stats_retention_enabled_cfg)
+            cls.restartServer()
         super(TestMetrics, cls).tearDownClass()
 
+    @classmethod
+    def restartServer(cls):
+        """Restart management server"""
+
+        cls.debug("Restarting management server")
+        sshClient = SshClient(
+                    cls.mgtSvrDetails["mgtSvrIp"],
+            22,
+            cls.mgtSvrDetails["user"],
+            cls.mgtSvrDetails["passwd"]
+        )
+        command = "service cloudstack-management stop"
+        sshClient.execute(command)
+
+        command = "service cloudstack-management start"
+        sshClient.execute(command)
+
+        #Waits for management to come up in 5 mins, when it's up it will continue
+        timeout = time.time() + 300
+        while time.time() < timeout:
+            if cls.isManagementUp() is True:
+                # allow hosts to be ready for deployment
+                time.sleep(30)
+                return
+            time.sleep(5)
+        cls.setup_failed = True
+        cls.debug("Management server did not come up, failing")
+        return
+
+    @classmethod
+    def isManagementUp(cls):
+        try:
+            cls.apiclient.listInfrastructure(listInfrastructure.listInfrastructureCmd())
+            return True
+        except Exception:
+            return False
+
+    @classmethod
+    def updateConfiguration(cls, config, value):
+        if value is not None:
+            Configurations.update(cls.apiclient, config, value=value)
+
     def setUp(self):
         self.userapiclient = self.testClient.getUserApiClient(
             UserName=self.account.name,
@@ -389,6 +460,111 @@ class TestMetrics(cloudstackTestCase):
 
         return
 
+    @attr(tags = ["advanced", "advancedns", "smoke", "basic"], required_hardware="true")
+    @skipTestIf("hypervisorNotSupported")
+    def test_list_vms_metrics_history(self):
+        #deploy VM
+        self.small_virtual_machine = VirtualMachine.create(
+                                        self.apiclient,
+                                        self.services["virtual_machine"],
+                                        serviceofferingid=self.service_offering.id,
+                                        templateid=self.template.id,
+                                        zoneid=self.zone.id
+                                        )
+        self.cleanup.append(self.small_virtual_machine)
+
+        # Wait for 2 minutes
+        time.sleep(120)
+
+        cmd = listVirtualMachinesUsageHistory.listVirtualMachinesUsageHistoryCmd()
+        cmd.id = self.small_virtual_machine.id
+
+        result = self.apiclient.listVirtualMachinesUsageHistory(cmd)[0]
+
+        self.assertEqual(result.id, self.small_virtual_machine.id)
+        self.assertTrue(hasattr(result, 'stats'))
+        self.assertTrue(type(result.stats) == list and len(result.stats) > 0)
+        self.validate_vm_stats(result.stats[0])
+
+        return
+
+    @attr(tags = ["advanced", "advancedns", "smoke", "basic"], required_hardware="true")
+    @skipTestIf("hypervisorNotSupported")
+    def test_list_system_vms_metrics_history(self):
+        cmd = listSystemVmsUsageHistory.listSystemVmsUsageHistoryCmd()
+        now = datetime.datetime.now() - datetime.timedelta(minutes=15)
+        start_time = now.strftime("%Y-%m-%d %H:%M:%S")
+        cmd.startdate = start_time
+
+        result = self.apiclient.listSystemVmsUsageHistory(cmd)[0]
+
+        self.assertTrue(hasattr(result, 'stats'))
+        self.assertTrue(type(result.stats) == list and len(result.stats) > 0)
+        self.validate_vm_stats(result.stats[0])
+
+        return
+
+    @attr(tags = ["advanced", "advancedns", "smoke", "basic"], required_hardware="true")
+    @skipTestIf("hypervisorNotSupported")
+    def test_list_volumes_metrics_history(self):
+        #deploy VM
+        self.small_virtual_machine = VirtualMachine.create(
+                                        self.apiclient,
+                                        self.services["virtual_machine"],
+                                        serviceofferingid=self.service_offering.id,
+                                        templateid=self.template.id,
+                                        zoneid=self.zone.id
+                                        )
+        self.cleanup.append(self.small_virtual_machine)
+
+        currentHost = Host.list(self.apiclient, id=self.small_virtual_machine.hostid)[0]
+        if currentHost.hypervisor.lower() == "xenserver" and currentHost.hypervisorversion == "7.1.0":
+            # Skip tests as volume metrics doesn't see to work
+            self.skipTest("Skipping test because volume metrics doesn't work on hypervisor\
+                            %s, %s" % (currentHost.hypervisor, currentHost.hypervisorversion))
+
+        # Wait for 2 minutes
+        time.sleep(120)
+
+        volume = Volume.list(
+            self.apiclient,
+            virtualmachineid=self.small_virtual_machine.id)[0]
+
+        cmd = listVolumesUsageHistory.listVolumesUsageHistoryCmd()
+        cmd.id = volume.id
+
+        result = self.apiclient.listVolumesUsageHistory(cmd)[0]
+        self.assertEqual(result.id, volume.id)
+        self.assertTrue(hasattr(result, 'stats'))
+        self.assertTrue(type(result.stats) == list and len(result.stats) > 0)
+        stats = result.stats[0]
+        self.assertTrue(hasattr(stats, 'diskioread'))
+        self.assertTrue(hasattr(stats, 'diskiowrite'))
+        self.assertTrue(hasattr(stats, 'diskiopstotal'))
+        self.assertTrue(hasattr(stats, 'diskkbsread'))
+        self.assertTrue(hasattr(stats, 'diskkbswrite'))
+        self.assertTrue(hasattr(stats, 'timestamp'))
+        self.assertTrue(self.valid_date(stats.timestamp))
+
+        return
+
+    def validate_vm_stats(self, stats):
+        self.assertTrue(hasattr(stats, 'cpuused'))
+        self.assertTrue(hasattr(stats, 'diskiopstotal'))
+        self.assertTrue(hasattr(stats, 'diskioread'))
+        self.assertTrue(hasattr(stats, 'diskiowrite'))
+        self.assertTrue(hasattr(stats, 'diskkbsread'))
+        self.assertTrue(hasattr(stats, 'diskkbswrite'))
+        self.assertTrue(hasattr(stats, 'memoryintfreekbs'))
+        self.assertTrue(hasattr(stats, 'memorykbs'))
+        self.assertTrue(hasattr(stats, 'memorytargetkbs'))
+        self.assertTrue(hasattr(stats, 'networkkbsread'))
+        self.assertTrue(hasattr(stats, 'networkkbswrite'))
+        self.assertTrue(hasattr(stats, 'networkread'))
+        self.assertTrue(hasattr(stats, 'networkwrite'))
+        self.assertTrue(hasattr(stats, 'timestamp'))
+        self.assertTrue(self.valid_date(stats.timestamp))
+
     def valid_date(cls, date_text):
         try:
             datetime.datetime.strptime(date_text, '%Y-%m-%dT%H:%M:%S%z')
diff --git a/ui/public/locales/en.json b/ui/public/locales/en.json
index 1154af55557..a4d0d0b6197 100644
--- a/ui/public/locales/en.json
+++ b/ui/public/locales/en.json
@@ -416,6 +416,8 @@
 "label.community": "Community",
 "label.complete": "Complete",
 "label.compute": "Compute",
+"label.computeonly.offering": "Compute only disk offering",
+"label.computeonly.offering.tooltip": "Option to specify root disk related information in the compute offering or to directly link a disk offering to the compute offering",
 "label.compute.offerings": "Compute offerings",
 "label.conditions": "Conditions",
 "label.configuration": "Configuration",
@@ -616,10 +618,12 @@
 "label.diskoffering": "Disk offering",
 "label.diskofferingdisplaytext": "Disk offering",
 "label.diskofferingid": "Disk offering",
+"label.diskofferingstrictness": "Disk offering strictness",
 "label.disksize": "Disk size (in GB)",
 "label.disksizeallocated": "Disk allocated",
 "label.disksizeallocatedgb": "Allocated",
 "label.disksizefree": "Disk free",
+"label.disksizestrictness": "Disk size strictness",
 "label.disksizetotal": "Disk total",
 "label.disksizetotalgb": "Total",
 "label.disksizeunallocatedgb": "Unallocated",
@@ -654,16 +658,14 @@
 "label.dpd": "Dead peer detection",
 "label.driver": "Driver",
 "label.duration": "Duration (in sec)",
+"label.duration.custom": "Custom",
+"label.duration.1hour": "1 hour",
+"label.duration.6hours": "6 hours",
+"label.duration.12hours": "12 hours",
+"label.duration.24hours": "24 hours",
+"label.duration.7days": "7 days",
 "label.dynamicscalingenabled": "Dynamic scaling enabled",
 "label.dynamicscalingenabled.tooltip": "VM can dynamically scale only when dynamic scaling is enabled on template, service offering and global setting.",
-"label.iothreadsenabled" : "IOThreads",
-"label.iothreadsenabled.tooltip" : "Enable iothreads allocation for KVM hypervisor",
-"label.iodriverpolicy" : "IO driver policy",
-"label.iodriverpolicy.tooltip" : "IO driver policy could be native, io_uring or threads. Choosing the IO policy for a VM will override the storage pool option 'kvm.storage.pool.io.policy' if set (only if iothreads is enabled)",
-"label.diskofferingstrictness": "Disk offering strictness",
-"label.disksizestrictness": "Disk size strictness",
-"label.computeonly.offering": "Compute only disk offering",
-"label.computeonly.offering.tooltip": "Option to specify root disk related information in the compute offering or to directly link a disk offering to the compute offering",
 "label.edit": "Edit",
 "label.edit.acl.list": "Edit ACL list",
 "label.edit.acl.rule": "Edit ACL rule",
@@ -892,6 +894,11 @@
 "label.invalid.number": "Invalid number",
 "label.invitations": "Invitations",
 "label.invite": "Invite",
+"label.iodriverpolicy" : "IO driver policy",
+"label.iodriverpolicy.tooltip" : "IO driver policy could be native, io_uring or threads. Choosing the IO policy for a VM will override the storage pool option 'kvm.storage.pool.io.policy' if set (only if iothreads is enabled)",
+"label.iops": "IOPS",
+"label.iothreadsenabled" : "IOThreads",
+"label.iothreadsenabled.tooltip" : "Enable iothreads allocation for KVM hypervisor",
 "label.ip": "IP address",
 "label.ip6firewall": "IPv6 firewall",
 "label.ip6routes": "IPv6 routes",
@@ -1681,7 +1688,6 @@
 "label.startquota": "Quota value",
 "label.state": "State",
 "label.static.routes": "Static routes",
-"label.statistics": "Statistics",
 "label.status": "Status",
 "label.step.1": "Step 1",
 "label.step.2": "Step 2",
diff --git a/ui/src/components/view/StatsTab.vue b/ui/src/components/view/StatsTab.vue
index 45026c0ff16..27595223899 100644
--- a/ui/src/components/view/StatsTab.vue
+++ b/ui/src/components/view/StatsTab.vue
@@ -34,32 +34,58 @@
       :footer="null">
       <resource-stats-info :resourceType="resourceTypeToShowInfo" :key="resourceTypeToShowInfo"/>
     </a-modal>
-    <a-row class="chart-row">
-      <a-col>
-        <span class="ant-tag">
+    <div class="chart-row">
+      <a-space direction="vertical">
+        <div>
+          <a-radio-group
+            v-model:value="durationSelectorValue"
+            buttonStyle="solid"
+            @change="handleDurationChange">
+            <a-radio-button value="">
+              {{ $t('1 hour') }}
+            </a-radio-button>
+            <a-radio-button value="6hours" v-if="statsRetentionTime >= 60">
+              {{ $t('label.duration.6hours') }}
+            </a-radio-button>
+            <a-radio-button value="12hours" v-if="statsRetentionTime >= 6 * 60">
+              {{ $t('label.duration.12hours') }}
+            </a-radio-button>
+            <a-radio-button value="day" v-if="statsRetentionTime >= 12 * 60">
+              {{ $t('label.duration.24hours') }}
+            </a-radio-button>
+            <a-radio-button value="week" v-if="statsRetentionTime >= 24 * 60">
+              {{ $t('label.duration.7days') }}
+            </a-radio-button>
+            <a-radio-button value="custom">
+              {{ $t('label.duration.custom') }}
+            </a-radio-button>
+          </a-radio-group>
+          <InfoCircleOutlined class="info-icon" :title="$t('label.see.more.info.shown.charts')" @click="onClickShowResourceInfoModal('CHART')"/>
+        </div>
+        <div class="ant-tag" v-if="durationSelectorValue==='custom'">
           <a-button @click="openFilter()">
             <FilterOutlined/>
           </a-button>
           <span v-html="formatedPeriod"></span>
-        </span>
-        <InfoCircleOutlined class="info-icon" :title="$t('label.see.more.info.shown.charts')" @click="onClickShowResourceInfoModal('CHART')"/>
-      </a-col>
-    </a-row>
+        </div>
+      </a-space>
+    </div>
     <div v-if="loaded">
       <div v-if="chartLabels.length > 0">
-        <a-row class="chart-row">
+        <a-row class="chart-row" v-if="resourceIsVirtualMachine">
           <a-col>
             <strong>CPU</strong>
             <InfoCircleOutlined class="info-icon" :title="$t('label.see.more.info.cpu.usage')" @click="onClickShowResourceInfoModal('CPU')"/>
-            <line-chart
-              :chartData="prepareData(resourceUsageHistory.cpu)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.cpu, 100, 10), '%')"
-              :width="1024"
-              :height="250"
+            <resource-stats-line-chart
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.cpu"
+              :yAxisInitialMax="100"
+              :yAxisIncrementValue="10"
+              :yAxisMeasurementUnit="'%'"
             />
           </a-col>
         </a-row>
-        <a-row class="chart-row">
+        <a-row class="chart-row" v-if="resourceIsVirtualMachine">
           <a-col>
             <strong>{{ $t('label.memory') }}</strong>
             <InfoCircleOutlined class="info-icon" :title="$t('label.see.more.info.memory.usage')" @click="onClickShowResourceInfoModal('MEM')"/>
@@ -78,51 +104,107 @@
                 {{ unit }}
               </a-select-option>
             </a-select>
-            <line-chart
+            <resource-stats-line-chart
               v-if="selectedMemoryChartType === 0 && selectedMemoryUsageType === 0 && selectedMemoryUnitOfMeasurement === 'MB'"
-              :chartData="prepareData(resourceUsageHistory.memory.rawData.used.inMB)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.memory.rawData.used.inMB, 10, 100), ' MB')"
-              :width="1024"
-              :height="250"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.memory.rawData.used.inMB"
+              :yAxisInitialMax="10"
+              :yAxisIncrementValue="100"
+              :yAxisMeasurementUnit="' MB'"
             />
-            <line-chart
+            <resource-stats-line-chart
               v-if="selectedMemoryChartType === 0 && selectedMemoryUsageType === 0 && selectedMemoryUnitOfMeasurement === 'GB'"
-              :chartData="prepareData(resourceUsageHistory.memory.rawData.used.inGB)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.memory.rawData.used.inGB, 1, 1), ' GB')"
-              :width="1024"
-              :height="250"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.memory.rawData.used.inGB"
+              :yAxisInitialMax="1"
+              :yAxisIncrementValue="1"
+              :yAxisMeasurementUnit="' GB'"
             />
-            <line-chart
+            <resource-stats-line-chart
               v-if="selectedMemoryChartType === 0 && selectedMemoryUsageType === 1 && selectedMemoryUnitOfMeasurement === 'MB'"
-              :chartData="prepareData(resourceUsageHistory.memory.rawData.free.inMB)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.memory.rawData.free.inMB, 10, 100), ' MB')"
-              :width="1024"
-              :height="250"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.memory.rawData.free.inMB"
+              :yAxisInitialMax="10"
+              :yAxisIncrementValue="100"
+              :yAxisMeasurementUnit="' MB'"
             />
-            <line-chart
+            <resource-stats-line-chart
               v-if="selectedMemoryChartType === 0 && selectedMemoryUsageType === 1 && selectedMemoryUnitOfMeasurement === 'GB'"
-              :chartData="prepareData(resourceUsageHistory.memory.rawData.free.inGB)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.memory.rawData.free.inGB, 1, 1), ' GB')"
-              :width="1024"
-              :height="250"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.memory.rawData.free.inGB"
+              :yAxisInitialMax="1"
+              :yAxisIncrementValue="1"
+              :yAxisMeasurementUnit="' GB'"
             />
-            <line-chart
+            <resource-stats-line-chart
               v-if="selectedMemoryChartType === 1 && selectedMemoryUsageType === 0"
-              :chartData="prepareData(resourceUsageHistory.memory.percentage.used)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.memory.percentage.used, 100, 10), '%')"
-              :width="1024"
-              :height="250"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.memory.percentage.used"
+              :yAxisInitialMax="100"
+              :yAxisIncrementValue="10"
+              :yAxisMeasurementUnit="'%'"
             />
-            <line-chart
+            <resource-stats-line-chart
               v-if="selectedMemoryChartType === 1 && selectedMemoryUsageType === 1"
-              :chartData="prepareData(resourceUsageHistory.memory.percentage.free)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.memory.percentage.free, 100, 10), '%')"
-              :width="1024"
-              :height="250"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.memory.percentage.free"
+              :yAxisInitialMax="100"
+              :yAxisIncrementValue="10"
+              :yAxisMeasurementUnit="'%'"
+            />
+          </a-col>
+        </a-row>
+        <a-row class="chart-row" v-if="diskStatsAvailable">
+          <a-col>
+            <strong>{{ $t('label.disk') }}</strong>
+            <InfoCircleOutlined class="info-icon" :title="$t('label.see.more.info.disk.usage')" @click="onClickShowResourceInfoModal('DISK')"/>
+            <div class="chart-row-inner">
+              {{ $t('label.iops') }}
+            </div>
+            <resource-stats-line-chart
+              v-if="selectedDiskChartType === 0"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.disk.iops"
+              :yAxisInitialMax="100"
+              :yAxisIncrementValue="100"
+              :yAxisMeasurementUnit="' IOPS'"
+            />
+            <div class="chart-row-inner">
+              {{ $t('label.read.and.write') }}
+              <a-select
+                v-model:value="selectedDiskUnitOfMeasurement">
+                <a-select-option v-for="unit in diskUnitsOfMeasurement" :key="unit">
+                  {{ unit }}
+                </a-select-option>
+              </a-select>
+            </div>
+            <resource-stats-line-chart
+              v-if="selectedDiskUnitOfMeasurement === 'KiB'"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.disk.readAndWrite.inKiB"
+              :yAxisInitialMax="100"
+              :yAxisIncrementValue="100"
+              :yAxisMeasurementUnit="' KiB'"
+            />
+            <resource-stats-line-chart
+              v-if="selectedDiskUnitOfMeasurement === 'MiB'"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.disk.readAndWrite.inMiB"
+              :yAxisInitialMax="10"
+              :yAxisIncrementValue="10"
+              :yAxisMeasurementUnit="' MiB'"
+            />
+            <resource-stats-line-chart
+              v-if="selectedDiskUnitOfMeasurement === 'GiB'"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.disk.readAndWrite.inGiB"
+              :yAxisInitialMax="1"
+              :yAxisIncrementValue="1"
+              :yAxisMeasurementUnit="' GiB'"
             />
           </a-col>
         </a-row>
-        <a-row class="chart-row">
+        <a-row class="chart-row" v-if="resourceType === 'VirtualMachine'">
           <a-col>
             <strong>{{ $t('label.network') }}</strong>
             <InfoCircleOutlined class="info-icon" :title="$t('label.see.more.info.network.usage')" @click="onClickShowResourceInfoModal('NET')"/>
@@ -131,72 +213,29 @@
                 {{ unit }}
               </a-select-option>
             </a-select>
-            <line-chart
+            <resource-stats-line-chart
               v-if="selectedNetworkUnitOfMeasurement === 'KiB'"
-              :chartData="prepareData(resourceUsageHistory.network.inKiB)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.network.inKiB, 100, 100), ' KiB')"
-              :width="1024"
-              :height="250"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.network.inKiB"
+              :yAxisInitialMax="100"
+              :yAxisIncrementValue="100"
+              :yAxisMeasurementUnit="' KiB'"
             />
-            <line-chart
+            <resource-stats-line-chart
               v-if="selectedNetworkUnitOfMeasurement === 'MiB'"
-              :chartData="prepareData(resourceUsageHistory.network.inMiB)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.network.inMiB, 100, 100), ' MiB')"
-              :width="1024"
-              :height="250"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.network.inMiB"
+              :yAxisInitialMax="100"
+              :yAxisIncrementValue="100"
+              :yAxisMeasurementUnit="' MiB'"
             />
-            <line-chart
+            <resource-stats-line-chart
               v-if="selectedNetworkUnitOfMeasurement === 'GiB'"
-              :chartData="prepareData(resourceUsageHistory.network.inGiB)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.network.inGiB, 1, 1), ' GiB')"
-              :width="1024"
-              :height="250"
-            />
-          </a-col>
-        </a-row>
-        <a-row class="chart-row">
-          <a-col>
-            <strong>{{ $t('label.disk') }}</strong>
-            <InfoCircleOutlined class="info-icon" :title="$t('label.see.more.info.disk.usage')" @click="onClickShowResourceInfoModal('DISK')"/>
-            <a-select class="chart-type-select" v-model:value="selectedDiskChartType">
-              <a-select-option v-for="(type, typeIndex) in diskChartTypes" :key="typeIndex">
-                {{ type }}
-              </a-select-option>
-            </a-select>
-            <a-select
-              v-if="selectedDiskChartType === 1"
-              v-model:value="selectedDiskUnitOfMeasurement">
-              <a-select-option v-for="unit in diskUnitsOfMeasurement" :key="unit">
-                {{ unit }}
-              </a-select-option>
-            </a-select>
-            <line-chart
-              v-if="selectedDiskChartType === 0"
-              :chartData="prepareData(resourceUsageHistory.disk.iops)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.disk.iops, 100, 100), ' IOPS')"
-              :width="1024"
-              :height="250"
-            />
-            <line-chart
-              v-if="selectedDiskChartType === 1 && selectedDiskUnitOfMeasurement === 'KiB'"
-              :chartData="prepareData(resourceUsageHistory.disk.readAndWrite.inKiB)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.disk.readAndWrite.inKiB, 100, 100), ' KiB')"
-              :width="1024"
-              :height="250"
-            />
-            <line-chart
-              v-if="selectedDiskChartType === 1 && selectedDiskUnitOfMeasurement === 'MiB'"
-              :chartData="prepareData(resourceUsageHistory.disk.readAndWrite.inMiB)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.disk.readAndWrite.inMiB, 10, 10), ' MiB')"
-              :width="1024"
-              :height="250"
-            />
-            <line-chart
-              v-if="selectedDiskChartType === 1 && selectedDiskUnitOfMeasurement === 'GiB'"
-              :chartData="prepareData(resourceUsageHistory.disk.readAndWrite.inGiB)"
-              :chartOptions="getChartOptions(calculateMaxYAxisAndStepSize(resourceUsageHistory.disk.readAndWrite.inGiB, 1, 1), ' GiB')"
-              :width="1024"
-              :height="250"
+              :chartLabels="chartLabels"
+              :chartData="resourceUsageHistory.network.inGiB"
+              :yAxisInitialMax="1"
+              :yAxisIncrementValue="1"
+              :yAxisMeasurementUnit="' GiB'"
             />
           </a-col>
         </a-row>
@@ -214,22 +253,27 @@ import moment from 'moment'
 import 'chartjs-adapter-moment'
 import FilterStats from './stats/FilterStats'
 import ResourceStatsInfo from './stats/ResourceStatsInfo'
-import LineChart from './chart/LineChart'
+import ResourceStatsLineChart from './stats/ResourceStatsLineChart'
 
 export default {
   props: {
     resource: {
       type: Object,
       required: true
+    },
+    resourceType: {
+      type: String,
+      default: 'VirtualMachine'
     }
   },
   components: {
     FilterStats,
     ResourceStatsInfo,
-    LineChart
+    ResourceStatsLineChart
   },
   data () {
     return {
+      durationSelectorValue: '',
       resourceTypeToShowInfo: null,
       showResourceInfoModal: false,
       resourceInfoModalTitle: null,
@@ -289,6 +333,37 @@ export default {
   mounted () {
     this.fetchData()
   },
+  computed: {
+    statsRetentionTime () {
+      if (this.resourceType === 'Volume') {
+        return this.$store.getters.features.instancesdisksstatsretentiontime
+      }
+      return this.$store.getters.features.instancesstatsretentiontime
+    },
+    resourceStatsApi () {
+      switch (this.resourceType) {
+        case 'SystemVm':
+        case 'DomainRouter':
+          return 'listSystemVmsUsageHistory'
+        case 'Volume':
+          return 'listVolumesUsageHistory'
+      }
+      return 'listVirtualMachinesUsageHistory'
+    },
+    resourceStatsApiResponseObject () {
+      switch (this.resourceType) {
+        case 'Volume':
+          return this.resourceType.toLowerCase()
+      }
+      return 'virtualmachine'
+    },
+    resourceIsVirtualMachine () {
+      return ['VirtualMachine', 'SystemVm', 'DomainRouter'].includes(this.resourceType)
+    },
+    diskStatsAvailable () {
+      return ['VirtualMachine', 'SystemVm', 'DomainRouter', 'Volume'].includes(this.resourceType)
+    }
+  },
   watch: {
     resource: function (newItem) {
       if (!newItem || !newItem.id) {
@@ -322,6 +397,27 @@ export default {
       this.resourceTypeToShowInfo = resource
       this.showResourceInfoModal = true
     },
+    handleDurationChange () {
+      var now = this.getEndDate()
+      var start = new Date(now)
+      switch (this.durationSelectorValue) {
+        case '6hours':
+          start.setHours(start.getHours() - 6)
+          break
+        case '12hours':
+          start.setHours(start.getHours() - 12)
+          break
+        case 'day':
+          start.setDate(start.getDate() - 1)
+          break
+        case 'week':
+          start.setDate(start.getDate() - 7)
+          break
+        default:
+          start.setHours(start.getHours() - 1)
+      }
+      this.handleSubmit({ startDate: start, endDate: now })
+    },
     handleSubmit (values) {
       if (values.startDate) {
         this.startDate = new Date(values.startDate)
@@ -358,7 +454,7 @@ export default {
       if (this.endDate) {
         params.endDate = moment(this.endDate).format()
       }
-      api('listVirtualMachinesUsageHistory', params).then(response => {
+      api(this.resourceStatsApi, params).then(response => {
         this.handleStatsResponse(response)
       }).catch(error => {
         this.$notifyError(error)
@@ -385,7 +481,7 @@ export default {
     },
     handleStatsResponse (responseData) {
       this.resetData()
-      const vm = responseData.listvirtualmachinesusagehistoryresponse.virtualmachine
+      const vm = responseData[this.resourceStatsApi.toLowerCase() + 'response'][this.resourceStatsApiResponseObject]
 
       const chartPointRadius = this.getChartPointRadius(vm[0].stats.length)
 
@@ -424,61 +520,69 @@ export default {
         const currentLabel = ts.split('T')[0] + ' ' + ts.split('T')[1].split('-')[0]
         this.chartLabels.push(currentLabel)
 
-        cpuLine.data.push({ timestamp: currentLabel, stat: element.cpuused.split('%')[0] })
+        if (this.resourceIsVirtualMachine) {
+          cpuLine.data.push({ timestamp: currentLabel, stat: element.cpuused.split('%')[0] })
 
-        element.memoryusedkbs = element.memorykbs - element.memoryintfreekbs
-        memFreeLinePercent.data.push({ timestamp: currentLabel, stat: this.calculateMemoryPercentage(false, element.memorykbs, element.memoryintfreekbs) })
-        memUsedLinePercent.data.push({ timestamp: currentLabel, stat: this.calculateMemoryPercentage(true, element.memorykbs, element.memoryintfreekbs) })
-        memAllocatedLineInMB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memorykbs, 1) })
-        memFreeLineInMB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memoryintfreekbs, 1) })
-        memUsedLineInMB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memoryusedkbs, 1) })
-        memAllocatedLineInGB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memorykbs, 2) })
-        memFreeLineInGB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memoryintfreekbs, 2) })
-        memUsedLineInGB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memoryusedkbs, 2) })
+          element.memoryusedkbs = element.memorykbs - element.memoryintfreekbs
+          memFreeLinePercent.data.push({ timestamp: currentLabel, stat: this.calculateMemoryPercentage(false, element.memorykbs, element.memoryintfreekbs) })
+          memUsedLinePercent.data.push({ timestamp: currentLabel, stat: this.calculateMemoryPercentage(true, element.memorykbs, element.memoryintfreekbs) })
+          memAllocatedLineInMB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memorykbs, 1) })
+          memFreeLineInMB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memoryintfreekbs, 1) })
+          memUsedLineInMB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memoryusedkbs, 1) })
+          memAllocatedLineInGB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memorykbs, 2) })
+          memFreeLineInGB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memoryintfreekbs, 2) })
+          memUsedLineInGB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.memoryusedkbs, 2) })
 
-        netDownloadLineInKiB.data.push({ timestamp: currentLabel, stat: element.networkkbsread })
-        netUploadLineInKiB.data.push({ timestamp: currentLabel, stat: element.networkkbswrite })
-        netDownloadLineInMiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.networkkbsread, 1) })
-        netUploadLineInMiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.networkkbswrite, 1) })
-        netDownloadLineInGiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.networkkbsread, 2) })
-        netUploadLineInGiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.networkkbswrite, 2) })
+          netDownloadLineInKiB.data.push({ timestamp: currentLabel, stat: element.networkkbsread })
+          netUploadLineInKiB.data.push({ timestamp: currentLabel, stat: element.networkkbswrite })
+          netDownloadLineInMiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.networkkbsread, 1) })
+          netUploadLineInMiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.networkkbswrite, 1) })
+          netDownloadLineInGiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.networkkbsread, 2) })
+          netUploadLineInGiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.networkkbswrite, 2) })
+        }
 
-        diskReadLineInKiB.data.push({ timestamp: currentLabel, stat: element.diskkbsread })
-        diskWriteLineInKiB.data.push({ timestamp: currentLabel, stat: element.diskkbswrite })
-        diskReadLineInMiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.diskkbsread, 1) })
-        diskWriteLineInMiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.diskkbswrite, 1) })
-        diskReadLineInGiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.diskkbsread, 2) })
-        diskWriteLineInGiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.diskkbswrite, 2) })
-        diskIopsLine.data.push({ timestamp: currentLabel, stat: element.diskiopstotal })
+        if (this.diskStatsAvailable) {
+          diskReadLineInKiB.data.push({ timestamp: currentLabel, stat: element.diskkbsread })
+          diskWriteLineInKiB.data.push({ timestamp: currentLabel, stat: element.diskkbswrite })
+          diskReadLineInMiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.diskkbsread, 1) })
+          diskWriteLineInMiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.diskkbswrite, 1) })
+          diskReadLineInGiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.diskkbsread, 2) })
+          diskWriteLineInGiB.data.push({ timestamp: currentLabel, stat: this.convertByteBasedUnitOfMeasure(element.diskkbswrite, 2) })
+          diskIopsLine.data.push({ timestamp: currentLabel, stat: element.diskiopstotal })
+        }
       }
 
-      this.resourceUsageHistory.cpu.push(cpuLine)
+      if (this.resourceIsVirtualMachine) {
+        this.resourceUsageHistory.cpu.push(cpuLine)
 
-      this.resourceUsageHistory.memory.percentage.free.push(memFreeLinePercent)
-      this.resourceUsageHistory.memory.percentage.used.push(memUsedLinePercent)
-      this.resourceUsageHistory.memory.rawData.free.inMB.push(memFreeLineInMB)
-      this.resourceUsageHistory.memory.rawData.free.inMB.push(memAllocatedLineInMB)
-      this.resourceUsageHistory.memory.rawData.used.inMB.push(memUsedLineInMB)
-      this.resourceUsageHistory.memory.rawData.used.inMB.push(memAllocatedLineInMB)
-      this.resourceUsageHistory.memory.rawData.free.inGB.push(memFreeLineInGB)
-      this.resourceUsageHistory.memory.rawData.free.inGB.push(memAllocatedLineInGB)
-      this.resourceUsageHistory.memory.rawData.used.inGB.push(memUsedLineInGB)
-      this.resourceUsageHistory.memory.rawData.used.inGB.push(memAllocatedLineInGB)
+        this.resourceUsageHistory.memory.percentage.free.push(memFreeLinePercent)
+        this.resourceUsageHistory.memory.percentage.used.push(memUsedLinePercent)
+        this.resourceUsageHistory.memory.rawData.free.inMB.push(memFreeLineInMB)
+        this.resourceUsageHistory.memory.rawData.free.inMB.push(memAllocatedLineInMB)
+        this.resourceUsageHistory.memory.rawData.used.inMB.push(memUsedLineInMB)
+        this.resourceUsageHistory.memory.rawData.used.inMB.push(memAllocatedLineInMB)
+        this.resourceUsageHistory.memory.rawData.free.inGB.push(memFreeLineInGB)
+        this.resourceUsageHistory.memory.rawData.free.inGB.push(memAllocatedLineInGB)
+        this.resourceUsageHistory.memory.rawData.used.inGB.push(memUsedLineInGB)
+        this.resourceUsageHistory.memory.rawData.used.inGB.push(memAllocatedLineInGB)
 
-      this.resourceUsageHistory.network.inKiB.push(netDownloadLineInKiB)
-      this.resourceUsageHistory.network.inKiB.push(netUploadLineInKiB)
-      this.resourceUsageHistory.network.inMiB.push(netDownloadLineInMiB)
-      this.resourceUsageHistory.network.inMiB.push(netUploadLineInMiB)
-      this.resourceUsageHistory.network.inGiB.push(netDownloadLineInGiB)
-      this.resourceUsageHistory.network.inGiB.push(netUploadLineInGiB)
+        this.resourceUsageHistory.network.inKiB.push(netDownloadLineInKiB)
+        this.resourceUsageHistory.network.inKiB.push(netUploadLineInKiB)
+        this.resourceUsageHistory.network.inMiB.push(netDownloadLineInMiB)
+        this.resourceUsageHistory.network.inMiB.push(netUploadLineInMiB)
+        this.resourceUsageHistory.network.inGiB.push(netDownloadLineInGiB)
+        this.resourceUsageHistory.network.inGiB.push(netUploadLineInGiB)
+      }
 
-      this.resourceUsageHistory.disk.readAndWrite.inKiB.push(diskReadLineInKiB)
-      this.resourceUsageHistory.disk.readAndWrite.inKiB.push(diskWriteLineInKiB)
-      this.resourceUsageHistory.disk.readAndWrite.inMiB.push(diskReadLineInMiB)
-      this.resourceUsageHistory.disk.readAndWrite.inMiB.push(diskWriteLineInMiB)
-      this.resourceUsageHistory.disk.readAndWrite.inGiB.push(diskReadLineInGiB)
-      this.resourceUsageHistory.disk.readAndWrite.inGiB.push(diskWriteLineInGiB)
-      this.resourceUsageHistory.disk.iops.push(diskIopsLine)
+      if (this.diskStatsAvailable) {
+        this.resourceUsageHistory.disk.readAndWrite.inKiB.push(diskReadLineInKiB)
+        this.resourceUsageHistory.disk.readAndWrite.inKiB.push(diskWriteLineInKiB)
+        this.resourceUsageHistory.disk.readAndWrite.inMiB.push(diskReadLineInMiB)
+        this.resourceUsageHistory.disk.readAndWrite.inMiB.push(diskWriteLineInMiB)
+        this.resourceUsageHistory.disk.readAndWrite.inGiB.push(diskReadLineInGiB)
+        this.resourceUsageHistory.disk.readAndWrite.inGiB.push(diskWriteLineInGiB)
+        this.resourceUsageHistory.disk.iops.push(diskIopsLine)
+      }
 
       this.loaded = true
     },
@@ -551,128 +655,6 @@ export default {
         return parseFloat(100.0 * (memoryTotalInKB - memoryFreeInKB) / memoryTotalInKB).toFixed(2)
       }
       return parseFloat(100.0 * memoryFreeInKB / memoryTotalInKB).toFixed(2)
-    },
-    /**
-     * Calculates the maximum Y axis and the step size based on the chart data.
-     * @param chartLines the chart lines with their respective data.
-     * @param initialMaxValue the initial maximum value to the Y axis.
-     * @param incrementValue the increment value.
-     * @returns an object containing the maximum Y axis and the step size for the chart.
-     */
-    calculateMaxYAxisAndStepSize (chartLines, initialMaxYAxis, incrementValue) {
-      const numberOfLabelsOnYaxis = 4
-      var highestValue = 0
-      var maxYAxis = initialMaxYAxis
-      for (const line of chartLines) {
-        for (const d of line.data) {
-          const currentValue = parseFloat(d.stat)
-          if (currentValue > highestValue) {
-            highestValue = currentValue
-            while (highestValue > maxYAxis) {
-              maxYAxis += incrementValue
-            }
-          }
-        }
-      }
-      return { maxYAxes: maxYAxis, stepSize: maxYAxis / numberOfLabelsOnYaxis }
-    },
-    /**
-     * Returns the chart options.
-     * @param yAxesStepSize the step size for the Y axes.
-     * @param yAxesUnitOfMeasurement the unit of measurement label used on the Y axes.
-     * @returns the chart options.
-     */
-    getChartOptions (yAxesOptions, yAxesUnitOfMeasurement) {
-      var chartOptions = {
-        responsive: true,
-        maintainAspectRatio: false,
-        scales: {
-          yAxis: {
-            min: 0,
-            max: yAxesOptions.maxYAxes,
-            reverse: false,
-            ticks: {
-              stepSize: yAxesOptions.stepSize,
-              callback: function (label) {
-                return label + yAxesUnitOfMeasurement
-              }
-            }
-          },
-          xAxis: {
-            type: 'time',
-            autoSkip: false,
-            time: {
-              parser: 'YYYY-MM-DD HH:mm:ss',
-              unit: 'second',
-              displayFormats: {
-                second: 'HH:mm:ss'
-              }
-            }
-          }
-        }
-      }
-      const dateTimes = this.convertStringArrayToDateArray(JSON.parse(JSON.stringify(this.chartLabels)))
-      const averageDifference = this.averageDifferenceBetweenTimes(dateTimes)
-      chartOptions.scales.xAxis.time.stepSize = this.calculateStepSize(this.chartLabels.length, averageDifference)
-      return chartOptions
-    },
-    convertStringArrayToDateArray (stringArray) {
-      const dateArray = []
-      for (const element of stringArray) {
-        dateArray.push(new Date(element.replace(' ', 'T')))
-      }
-      return dateArray
-    },
-    averageDifferenceBetweenTimes (timeList) {
-      const oneSecond = 1000 // 1 second represented as milliseconds
-      const differences = []
-      var previus = timeList.splice(0, 1)[0]
-      for (const time of timeList) {
-        differences.push((time - previus) / oneSecond) // push the difference in seconds
-        previus = time
-      }
-      if (differences.length === 0) {
-        return 1
-      }
-      const averageDifference = Math.trunc(differences.reduce((a, b) => a + b, 0) / differences.length)
-      return averageDifference
-    },
-    calculateStepSize (numberOfDataPoints, differenceBetweenTimes) {
-      const idealNumberOfLabels = 8
-      const result = numberOfDataPoints / idealNumberOfLabels
-      if (result > 1) {
-        return result * differenceBetweenTimes
-      }
-      return differenceBetweenTimes
-    },
-    prepareData (chartData) {
-      const datasetList = []
-      for (const element of chartData) {
-        datasetList.push(
-          {
-            backgroundColor: element.backgroundColor,
-            borderColor: element.borderColor,
-            borderWidth: 3,
-            label: element.label,
-            data: element.data.map(d => d.stat),
-            hidden: this.hideLine(element.data.map(d => d.stat)),
-            pointRadius: element.pointRadius,
-            fill: 'origin'
-          }
-        )
-      }
-      return {
-        labels: this.chartLabels,
-        datasets: datasetList
-      }
-    },
-    hideLine (data) {
-      for (const d of data) {
-        if (d < 0) {
-          return true
-        }
-      }
-      return false
     }
   }
 }
diff --git a/ui/src/components/view/chart/LineChart.vue b/ui/src/components/view/chart/LineChart.vue
deleted file mode 100644
index 1b9206917d7..00000000000
--- a/ui/src/components/view/chart/LineChart.vue
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-<template>
-  <Line
-    :chart-options="chartOptions"
-    :chart-data="chartData"
-    :width="width"
-    :height="height"
-  />
-</template>
-
-<script>
-import { Line } from 'vue-chartjs'
-import { Chart as ChartJS, Title, Tooltip, Legend, LineElement, CategoryScale, TimeScale, LinearScale, PointElement, Filler } from 'chart.js'
-
-ChartJS.register(Title, Tooltip, Legend, LineElement, CategoryScale, TimeScale, LinearScale, PointElement, Filler)
-
-export default {
-  name: 'LineChart',
-  components: { Line },
-  props: {
-    chartData: {
-      type: Object,
-      required: true
-    },
-    chartOptions: {
-      type: Object,
-      default: () => {}
-    },
-    width: {
-      type: Number,
-      default: 650
-    },
-    height: {
-      type: Number,
-      default: 250
-    }
-  }
-}
-</script>
diff --git a/ui/src/components/view/stats/ResourceStatsInfo.vue b/ui/src/components/view/stats/ResourceStatsInfo.vue
index be5f3a16bac..9db3384bc5f 100644
--- a/ui/src/components/view/stats/ResourceStatsInfo.vue
+++ b/ui/src/components/view/stats/ResourceStatsInfo.vue
@@ -20,12 +20,7 @@
     <div v-if="messages.length > 1">
       <ul>
         <li v-for="(msg, index) in messages" :key="index">
-          <div v-if="index === messages.length - 1">
-            {{ msg }}.
-          </div>
-          <div v-else>
-            {{ msg }};
-          </div>
+          {{ msg }}
         </li>
       </ul>
     </div>
@@ -88,6 +83,9 @@ export default {
     for (const element of this.info) {
       if (element.resourceType === this.resourceType) {
         this.messages = element.messageList
+        if (this.$route.fullPath.startsWith('/volume/')) {
+          this.messages = this.messages.filter(x => x !== this.$t('message.disk.usage.info.sum.of.disks'))
+        }
       }
     }
   }
diff --git a/ui/src/components/view/stats/ResourceStatsLineChart.vue b/ui/src/components/view/stats/ResourceStatsLineChart.vue
new file mode 100644
index 00000000000..fa15ea398a5
--- /dev/null
+++ b/ui/src/components/view/stats/ResourceStatsLineChart.vue
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+<template>
+  <Line
+    :chart-options="preparedOptions"
+    :chart-data="preparedData"
+    :width="chartWidth"
+    :height="chartHeight"
+  />
+</template>
+
+<script>
+import { Line } from 'vue-chartjs'
+import { Chart as ChartJS, Title, Tooltip, Legend, LineElement, CategoryScale, TimeScale, LinearScale, PointElement, Filler } from 'chart.js'
+
+ChartJS.register(Title, Tooltip, Legend, LineElement, CategoryScale, TimeScale, LinearScale, PointElement, Filler)
+
+export default {
+  name: 'ResourceStatsLineChart',
+  components: { Line },
+  props: {
+    chartData: {
+      type: Object,
+      required: true
+    },
+    chartLabels: {
+      type: Array,
+      required: true
+    },
+    yAxisMeasurementUnit: {
+      type: String,
+      required: true
+    },
+    yAxisInitialMax: {
+      type: Number,
+      default: 1
+    },
+    yAxisIncrementValue: {
+      type: Number,
+      default: 1
+    },
+    chartWidth: {
+      type: Number,
+      default: 1024
+    },
+    chartHeight: {
+      type: Number,
+      default: 250
+    }
+  },
+  computed: {
+    preparedData () {
+      if (this.chartData) {
+        return this.prepareData(this.chartData)
+      }
+      return {}
+    },
+    preparedOptions () {
+      if (this.chartData) {
+        return this.getChartOptions(this.calculateMaxYAxisAndStepSize(this.chartData, this.yAxisInitialMax, this.yAxisIncrementValue), this.yAxisMeasurementUnit)
+      }
+      return {}
+    }
+  },
+  methods: {
+    /**
+     * Converts a value (Byte-based) from an unit to other one. For example: from Byte to KiB; from GiB to MiB; etc.
+     * To use it consider the following sequence: Byte -> KiB -> MiB -> GiB ...
+     * So, from Byte to MiB there are 2 steps, while from MiB to Byte there are -2 steps.
+     * @param value the value to be converted.
+     * @param step the number of steps between Byte-based units of measure.
+     * @returns the converted value.
+     */
+    convertByteBasedUnitOfMeasure (value, step) {
+      if (value === 0) {
+        return 0.00
+      }
+      if (step === 0) {
+        return value
+      }
+      if (step > 0) {
+        return parseFloat(value / (Math.pow(1024, step))).toFixed(2)
+      }
+      return parseFloat(value * (Math.pow(1024, Math.abs(step)))).toFixed(2)
+    },
+    calculateMaxYAxisAndStepSize (chartLines, initialMaxYAxis, incrementValue) {
+      const numberOfLabelsOnYaxis = 4
+      var highestValue = 0
+      var maxYAxis = initialMaxYAxis
+      for (const line of chartLines) {
+        for (const d of line.data) {
+          const currentValue = parseFloat(d.stat)
+          if (currentValue > highestValue) {
+            highestValue = currentValue
+            while (highestValue > maxYAxis) {
+              maxYAxis += incrementValue
+              if (maxYAxis % incrementValue !== 0) {
+                maxYAxis = Math.round(maxYAxis / incrementValue) * incrementValue
+              }
+            }
+          }
+        }
+      }
+      return { maxYAxes: maxYAxis, stepSize: maxYAxis / numberOfLabelsOnYaxis }
+    },
+    /**
+     * Returns the chart options.
+     * @param yAxesStepSize the step size for the Y axes.
+     * @param yAxesUnitOfMeasurement the unit of measurement label used on the Y axes.
+     * @returns the chart options.
+     */
+    getChartOptions (yAxesOptions, yAxesUnitOfMeasurement) {
+      const dateTimes = this.convertStringArrayToDateArray(JSON.parse(JSON.stringify(this.chartLabels)))
+      const averageDifference = this.averageDifferenceBetweenTimes(dateTimes)
+      const xAxisStepSize = this.calculateStepSize(this.chartLabels.length, averageDifference)
+      const startDate = new Date(dateTimes[0])
+      const endDate = new Date(dateTimes[dateTimes.length - 1])
+      const differentDay = startDate.getDate() !== endDate.getDate()
+      const differentYear = startDate.getFullYear() !== endDate.getFullYear()
+      var displayFormat = 'HH:mm'
+      if (xAxisStepSize < 5 * 60) {
+        displayFormat += ':ss'
+      }
+      if (differentDay) {
+        displayFormat = 'MMM-DD ' + displayFormat
+      }
+      if (xAxisStepSize >= 24 * 60 * 60) {
+        displayFormat = 'MMM-DD'
+      }
+      if (differentYear) {
+        displayFormat = 'YYYY-' + displayFormat
+      }
+      var chartOptions = {
+        responsive: true,
+        maintainAspectRatio: false,
+        scales: {
+          yAxis: {
+            min: 0,
+            max: yAxesOptions.maxYAxes,
+            reverse: false,
+            ticks: {
+              stepSize: yAxesOptions.stepSize,
+              callback: function (label) {
+                return label + yAxesUnitOfMeasurement
+              }
+            }
+          },
+          xAxis: {
+            type: 'time',
+            autoSkip: false,
+            time: {
+              parser: 'YYYY-MM-DD HH:mm:ss',
+              unit: 'second',
+              displayFormats: {
+                second: displayFormat
+              }
+            }
+          }
+        }
+      }
+      chartOptions.scales.xAxis.time.stepSize = xAxisStepSize
+      return chartOptions
+    },
+    convertStringArrayToDateArray (stringArray) {
+      const dateArray = []
+      for (const element of stringArray) {
+        dateArray.push(new Date(element.replace(' ', 'T')))
+      }
+      return dateArray
+    },
+    averageDifferenceBetweenTimes (timeList) {
+      const oneSecond = 1000 // 1 second represented as milliseconds
+      const differences = []
+      var previous = timeList.splice(0, 1)[0]
+      for (const time of timeList) {
+        differences.push((time - previous) / oneSecond) // push the difference in seconds
+        previous = time
+      }
+      if (differences.length === 0) {
+        return 1
+      }
+      const averageDifference = Math.trunc(differences.reduce((a, b) => a + b, 0) / differences.length)
+      return averageDifference
+    },
+    calculateStepSize (numberOfDataPoints, differenceBetweenTimes) {
+      const idealNumberOfLabels = 8
+      const result = numberOfDataPoints / idealNumberOfLabels
+      if (result > 1) {
+        return result * differenceBetweenTimes
+      }
+      return differenceBetweenTimes
+    },
+    prepareData (chartData) {
+      const datasetList = []
+      for (const element of chartData) {
+        datasetList.push(
+          {
+            backgroundColor: element.backgroundColor,
+            borderColor: element.borderColor,
+            borderWidth: 3,
+            label: element.label,
+            data: element.data.map(d => d.stat),
+            hidden: this.hideLine(element.data.map(d => d.stat)),
+            pointRadius: element.pointRadius,
+            fill: 'origin'
+          }
+        )
+      }
+      return {
+        labels: this.chartLabels,
+        datasets: datasetList
+      }
+    },
+    hideLine (data) {
+      for (const d of data) {
+        if (d < 0) {
+          return true
+        }
+      }
+      return false
+    }
+  }
+}
+</script>
diff --git a/ui/src/config/section/infra/routers.js b/ui/src/config/section/infra/routers.js
index 498f8ac1176..6f24d6b7d82 100644
--- a/ui/src/config/section/infra/routers.js
+++ b/ui/src/config/section/infra/routers.js
@@ -36,6 +36,11 @@ export default {
   tabs: [{
     name: 'details',
     component: shallowRef(defineAsyncComponent(() => import('@/components/view/DetailsTab.vue')))
+  }, {
+    name: 'metrics',
+    resourceType: 'DomainRouter',
+    component: shallowRef(defineAsyncComponent(() => import('@/components/view/StatsTab.vue'))),
+    show: () => { return store.getters.features.instancesstatsuseronly === false }
   }, {
     name: 'nics',
     component: shallowRef(defineAsyncComponent(() => import('@/views/network/NicsTable.vue')))
diff --git a/ui/src/config/section/infra/systemVms.js b/ui/src/config/section/infra/systemVms.js
index 9774e2290c7..a649ef1c11a 100644
--- a/ui/src/config/section/infra/systemVms.js
+++ b/ui/src/config/section/infra/systemVms.js
@@ -32,6 +32,12 @@ export default {
       name: 'details',
       component: shallowRef(defineAsyncComponent(() => import('@/components/view/DetailsTab.vue')))
     },
+    {
+      name: 'metrics',
+      resourceType: 'SystemVm',
+      component: shallowRef(defineAsyncComponent(() => import('@/components/view/StatsTab.vue'))),
+      show: () => { return store.getters.features.instancesstatsuseronly === false }
+    },
     {
       name: 'volume',
       component: shallowRef(defineAsyncComponent(() => import('@/components/view/VolumesTab.vue')))
diff --git a/ui/src/config/section/storage.js b/ui/src/config/section/storage.js
index 917027c78b3..e8a5ecd8128 100644
--- a/ui/src/config/section/storage.js
+++ b/ui/src/config/section/storage.js
@@ -75,6 +75,12 @@ export default {
           name: 'details',
           component: shallowRef(defineAsyncComponent(() => import('@/components/view/DetailsTab.vue')))
         },
+        {
+          name: 'metrics',
+          resourceType: 'Volume',
+          component: shallowRef(defineAsyncComponent(() => import('@/components/view/StatsTab.vue'))),
+          show: (record) => { return store.getters.features.instancesdisksstatsretentionenabled }
+        },
         {
           name: 'events',
           resourceType: 'Volume',
diff --git a/ui/src/core/lazy_lib/components_use.js b/ui/src/core/lazy_lib/components_use.js
index b28f26b787f..10790d61bc0 100644
--- a/ui/src/core/lazy_lib/components_use.js
+++ b/ui/src/core/lazy_lib/components_use.js
@@ -62,7 +62,8 @@ import {
   Calendar,
   Slider,
   AutoComplete,
-  Collapse
+  Collapse,
+  Space
 } from 'ant-design-vue'
 import VueClipboard from 'vue3-clipboard'
 import VueCropper from 'vue-cropper'
@@ -125,5 +126,6 @@ export default {
     app.use(AutoComplete)
     app.use(Collapse)
     app.use(Descriptions)
+    app.use(Space)
   }
 }
diff --git a/ui/src/style/components/view/StatsTab.scss b/ui/src/style/components/view/StatsTab.scss
index 89e51ab0cb9..5ad02966bbd 100644
--- a/ui/src/style/components/view/StatsTab.scss
+++ b/ui/src/style/components/view/StatsTab.scss
@@ -25,7 +25,10 @@
   margin: 0 10px 0 5px;
 }
 .chart-row {
-  margin-bottom: 10%;
+  margin-bottom: 5%;
+}
+.chart-row-inner {
+  margin-top: 3%;
 }
 .chart-type-select {
   min-width: 130px;
diff --git a/ui/src/views/compute/InstanceTab.vue b/ui/src/views/compute/InstanceTab.vue
index b0dff1c2158..2b1572b1c16 100644
--- a/ui/src/views/compute/InstanceTab.vue
+++ b/ui/src/views/compute/InstanceTab.vue
@@ -25,7 +25,7 @@
       <a-tab-pane :tab="$t('label.details')" key="details">
         <DetailsTab :resource="dataResource" :loading="loading" />
       </a-tab-pane>
-      <a-tab-pane :tab="$t('label.statistics')" key="stats">
+      <a-tab-pane :tab="$t('label.metrics')" key="stats">
         <StatsTab :resource="resource"/>
       </a-tab-pane>
       <a-tab-pane :tab="$t('label.iso')" key="cdrom" v-if="vm.isoid">