You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by nv...@apache.org on 2022/04/11 13:42:28 UTC

[cloudstack] branch main updated: Persistence of VM stats (#5984)

This is an automated email from the ASF dual-hosted git repository.

nvazquez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudstack.git


The following commit(s) were added to refs/heads/main by this push:
     new 16f2896940c Persistence of VM stats (#5984)
16f2896940c is described below

commit 16f2896940cb6ae5557a51b6b439bd71917e1781
Author: José Flauzino <jo...@gmail.com>
AuthorDate: Mon Apr 11 10:42:21 2022 -0300

    Persistence of VM stats (#5984)
    
    * Add persistence of VM stats
    
    * Fix API 'since' attribute
    
    * Add license
    
    * Address GutoVeronezi's reviews
    
    * Fix the order of VM stats in the API response
    
    * Fix msid in VM stats data
    
    * Fix disk stats and add minor improvements
    
    * Add log message
    
    * Build string using ReflectionToStringBuilderUtils
    
    * Rerun checks
    
    Co-authored-by: joseflauzino <jo...@scclouds.com.br>
---
 .../org/apache/cloudstack/api/ApiArgValidator.java |  16 +-
 .../org/apache/cloudstack/api/ApiConstants.java    |   1 +
 .../cloudstack/api/command/user/vm/ListVMsCmd.java |   9 +
 .../cloudstack/api/response/StatsResponse.java     | 147 ++++++++++
 .../cloudstack/api/response/StatsResponseTest.java | 135 +++++++++
 .../java/com/cloud/agent/api/VmStatsEntry.java     | 159 ++---------
 .../{VmStatsEntry.java => VmStatsEntryBase.java}   |  66 +++--
 .../src/main/java/com/cloud/vm/VmStatsVO.java      |  87 ++++++
 .../src/main/java/com/cloud/vm/dao/VmStatsDao.java |  82 ++++++
 .../main/java/com/cloud/vm/dao/VmStatsDaoImpl.java | 122 ++++++++
 .../spring-engine-schema-core-daos-context.xml     |   1 +
 .../resources/META-INF/db/schema-41610to41700.sql  |  13 +
 .../com/cloud/agent/manager/MockVmManagerImpl.java |   2 +-
 .../hypervisor/vmware/resource/VmwareResource.java |   8 +-
 .../xenserver/resource/CitrixResourceBase.java     |   2 +-
 .../apache/cloudstack/api/ListVMsMetricsCmd.java   |  14 +
 .../cloudstack/api/ListVMsUsageHistoryCmd.java     | 100 +++++++
 .../apache/cloudstack/metrics/MetricsService.java  |   5 +
 .../cloudstack/metrics/MetricsServiceImpl.java     | 201 ++++++++++++-
 .../response/VmMetricsStatsResponse.java           |  62 ++++
 .../cloudstack/metrics/MetricsServiceImplTest.java | 314 +++++++++++++++++++++
 server/src/main/java/com/cloud/api/ApiDBUtils.java |  10 +-
 .../com/cloud/api/dispatch/ParamProcessWorker.java |  65 +++--
 .../java/com/cloud/api/query/QueryManagerImpl.java |   2 +-
 .../com/cloud/api/query/ViewResponseHelper.java    |  10 +-
 .../com/cloud/api/query/dao/UserVmJoinDao.java     |   2 +-
 .../com/cloud/api/query/dao/UserVmJoinDaoImpl.java |   4 +-
 .../main/java/com/cloud/server/StatsCollector.java | 216 +++++++++-----
 .../main/java/com/cloud/vm/UserVmManagerImpl.java  |   7 +-
 .../java/com/cloud/server/StatsCollectorTest.java  | 153 ++++++++--
 .../AccountManagerImplVolumeDeleteEventTest.java   |   6 +-
 ui/src/config/section/compute.js                   |   7 +
 32 files changed, 1706 insertions(+), 322 deletions(-)

diff --git a/api/src/main/java/org/apache/cloudstack/api/ApiArgValidator.java b/api/src/main/java/org/apache/cloudstack/api/ApiArgValidator.java
index 971bb82e37a..859db8a0476 100644
--- a/api/src/main/java/org/apache/cloudstack/api/ApiArgValidator.java
+++ b/api/src/main/java/org/apache/cloudstack/api/ApiArgValidator.java
@@ -18,6 +18,18 @@
 package org.apache.cloudstack.api;
 
 public enum ApiArgValidator {
-    NotNullOrEmpty, // does StringUtils.isEmpty check
-    PositiveNumber, // does != null and > 0 check
+    /**
+     * Validates if the parameter is null or empty with the method {@link Strings#isNullOrEmpty(String)}.
+     */
+    NotNullOrEmpty,
+
+    /**
+     * Validates if the parameter is different from null (parameter != null) and greater than zero (parameter > 0).
+     */
+    PositiveNumber,
+
+    /**
+     * Validates if the parameter is an UUID with the method {@link UuidUtils#validateUUID(String)}.
+     */
+    UuidString,
 }
diff --git a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
index 95903bc5d07..871f99b48f5 100644
--- a/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
+++ b/api/src/main/java/org/apache/cloudstack/api/ApiConstants.java
@@ -21,6 +21,7 @@ public class ApiConstants {
     public static final String ACCOUNTS = "accounts";
     public static final String ACCOUNT_TYPE = "accounttype";
     public static final String ACCOUNT_ID = "accountid";
+    public static final String ACCUMULATE = "accumulate";
     public static final String ACTIVITY = "activity";
     public static final String ADAPTER_TYPE = "adaptertype";
     public static final String ADDRESS = "address";
diff --git a/api/src/main/java/org/apache/cloudstack/api/command/user/vm/ListVMsCmd.java b/api/src/main/java/org/apache/cloudstack/api/command/user/vm/ListVMsCmd.java
index ab0cc8e19cf..11a81df83d6 100644
--- a/api/src/main/java/org/apache/cloudstack/api/command/user/vm/ListVMsCmd.java
+++ b/api/src/main/java/org/apache/cloudstack/api/command/user/vm/ListVMsCmd.java
@@ -137,6 +137,11 @@ public class ListVMsCmd extends BaseListTaggedResourcesCmd implements UserCmd {
             description = "flag to display the resource icon for VMs", since = "4.16.0.0")
     private Boolean showIcon;
 
+    @Parameter(name = ApiConstants.ACCUMULATE, type = CommandType.BOOLEAN,
+            description = "Accumulates the VM metrics data instead of returning only the most recent data collected. The default behavior is set by the global configuration vm.stats.increment.metrics.",
+            since = "4.17.0")
+    private Boolean accumulate;
+
     /////////////////////////////////////////////////////
     /////////////////// Accessors ///////////////////////
     /////////////////////////////////////////////////////
@@ -245,6 +250,10 @@ public class ListVMsCmd extends BaseListTaggedResourcesCmd implements UserCmd {
         return showIcon != null ? showIcon : false;
     }
 
+    public Boolean getAccumulate() {
+        return accumulate;
+    }
+
     /////////////////////////////////////////////////////
     /////////////// API Implementation///////////////////
     /////////////////////////////////////////////////////
diff --git a/api/src/main/java/org/apache/cloudstack/api/response/StatsResponse.java b/api/src/main/java/org/apache/cloudstack/api/response/StatsResponse.java
new file mode 100644
index 00000000000..475d471bdfb
--- /dev/null
+++ b/api/src/main/java/org/apache/cloudstack/api/response/StatsResponse.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.api.response;
+
+import java.util.Date;
+
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseResponse;
+
+import com.cloud.serializer.Param;
+import com.google.gson.annotations.SerializedName;
+
+public class StatsResponse extends BaseResponse {
+
+    @SerializedName("timestamp")
+    @Param(description = "the time when the VM stats were collected. The format is \"yyyy-MM-dd hh:mm:ss\"")
+    private Date timestamp;
+
+    @SerializedName("cpuused")
+    @Param(description = "the amount (percentage) of the VM's CPU currently used")
+    private String cpuUsed;
+
+    @SerializedName(ApiConstants.DISK_IO_READ)
+    @Param(description = "the VM's disk read (IO)")
+    protected Long diskIORead;
+
+    @SerializedName(ApiConstants.DISK_IO_WRITE)
+    @Param(description = "the VM's disk write (IO)")
+    protected Long diskIOWrite;
+
+    @SerializedName(ApiConstants.DISK_IO_PSTOTAL)
+    @Param(description = "the total disk iops")
+    protected Long diskIopsTotal = 0L;
+
+    @SerializedName(ApiConstants.DISK_KBS_READ)
+    @Param(description = "the VM's disk read (bytes)")
+    private Long diskKbsRead;
+
+    @SerializedName(ApiConstants.DISK_KBS_WRITE)
+    @Param(description = "the VM's disk write (bytes)")
+    private Long diskKbsWrite;
+
+    @SerializedName("memoryintfreekbs")
+    @Param(description = "the internal memory free of the VM or zero if it cannot be calculated")
+    private Long memoryIntFreeKBs;
+
+    @SerializedName("memorykbs")
+    @Param(description = "the memory used by the VM in Kbps")
+    private Long memoryKBs;
+
+    @SerializedName("memorytargetkbs")
+    @Param(description = "the target memory in VM in Kbps")
+    private Long memoryTargetKBs;
+
+    @SerializedName("networkkbsread")
+    @Param(description = "the incoming network traffic on the VM")
+    protected Long networkKbsRead;
+
+    @SerializedName("networkkbswrite")
+    @Param(description = "the outgoing network traffic on the host")
+    protected Long networkKbsWrite;
+
+    @SerializedName("networkread")
+    @Param(description = "the network read in MiB")
+    protected String networkRead;
+
+    @SerializedName("networkwrite")
+    @Param(description = "the network write in MiB")
+    protected String networkWrite;
+
+    public void setTimestamp(Date timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public void setCpuUsed(String cpuUsed) {
+        this.cpuUsed = cpuUsed;
+    }
+
+    public void setDiskIORead(Long diskIORead) {
+        this.diskIORead = diskIORead;
+        accumulateDiskIopsTotal(diskIORead);
+    }
+
+    public void setDiskIOWrite(Long diskIOWrite) {
+        this.diskIOWrite = diskIOWrite;
+        accumulateDiskIopsTotal(diskIOWrite);
+    }
+
+    public void setDiskKbsRead(Long diskKbsRead) {
+        this.diskKbsRead = diskKbsRead;
+    }
+
+    public void setDiskKbsWrite(Long diskKbsWrite) {
+        this.diskKbsWrite = diskKbsWrite;
+    }
+
+    public void setMemoryIntFreeKBs(Long memoryIntFreeKBs) {
+        this.memoryIntFreeKBs = memoryIntFreeKBs;
+    }
+
+    public void setMemoryKBs(Long memoryKBs) {
+        this.memoryKBs = memoryKBs;
+    }
+
+    public void setMemoryTargetKBs(Long memoryTargetKBs) {
+        this.memoryTargetKBs = memoryTargetKBs;
+    }
+
+    public void setNetworkKbsRead(Long networkKbsRead) {
+        this.networkKbsRead = networkKbsRead;
+        if (networkKbsRead != null) {
+            this.networkRead = String.format("%.2f MB", networkKbsRead / 1024.0);
+        }
+    }
+
+    public void setNetworkKbsWrite(Long networkKbsWrite) {
+        this.networkKbsWrite = networkKbsWrite;
+        if (networkKbsWrite != null) {
+            this.networkWrite = String.format("%.2f MB", networkKbsWrite / 1024.0);
+        }
+    }
+
+    /**
+     * Accumulates disk IOPS (Input/Output Operations Per Second)
+     * in {@code diskIopsTotal} attribute.
+     * @param diskIo the IOPS value to increment in {@code diskIopsTotal}.
+     */
+    protected void accumulateDiskIopsTotal(Long diskIo) {
+        if (diskIo != null) {
+            this.diskIopsTotal += diskIo;
+        }
+    }
+}
\ No newline at end of file
diff --git a/api/src/test/java/org/apache/cloudstack/api/response/StatsResponseTest.java b/api/src/test/java/org/apache/cloudstack/api/response/StatsResponseTest.java
new file mode 100644
index 00000000000..b6ba8533da5
--- /dev/null
+++ b/api/src/test/java/org/apache/cloudstack/api/response/StatsResponseTest.java
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.api.response;
+
+import java.text.DecimalFormat;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StatsResponseTest {
+
+    @Spy
+    StatsResponse statsResponseMock;
+
+    final char decimalSeparator = ((DecimalFormat) DecimalFormat.getInstance()).getDecimalFormatSymbols().getDecimalSeparator();
+
+    @Test
+    public void setDiskIOReadTestWithAnyInput() {
+        statsResponseMock.setDiskIORead(1L);
+
+        Mockito.verify(statsResponseMock).accumulateDiskIopsTotal(Mockito.anyLong());
+    }
+
+    @Test
+    public void setDiskIOWriteTestWithAnyInput() {
+        statsResponseMock.setDiskIOWrite(1L);
+
+        Mockito.verify(statsResponseMock).accumulateDiskIopsTotal(Mockito.anyLong());
+    }
+
+    @Test
+    public void accumulateDiskIopsTotalTestWithNullInput() {
+        Long expected = 0L;
+
+        statsResponseMock.accumulateDiskIopsTotal(null);
+
+        Assert.assertEquals(expected, statsResponseMock.diskIopsTotal);
+    }
+
+    @Test
+    public void accumulateDiskIopsTotalTestWithZeroAsInput() {
+        Long expected = 0L;
+
+        statsResponseMock.accumulateDiskIopsTotal(0L);
+
+        Assert.assertEquals(expected, statsResponseMock.diskIopsTotal);
+    }
+
+    @Test
+    public void accumulateDiskIopsTotalTestWithInputGreatherThanZero() {
+        Long expected = 1L;
+
+        statsResponseMock.accumulateDiskIopsTotal(1L);
+
+        Assert.assertEquals(expected, statsResponseMock.diskIopsTotal);
+    }
+
+    @Test
+    public void setDiskIOWriteTestWithInputNotNullAndNullDiskIopsTotal() {
+        Long expected = 1L;
+
+        statsResponseMock.setDiskIOWrite(expected);
+
+        Assert.assertEquals(expected, statsResponseMock.diskIOWrite);
+        Assert.assertEquals(expected, statsResponseMock.diskIopsTotal);
+    }
+
+    @Test
+    public void setDiskIOWriteTestWithInputNotNullAndDiskIopsTotalNotNull() {
+        statsResponseMock.diskIopsTotal = 1L;
+        Long expectedDiskIOWrite = 1L, expectedDiskIopsTotal = 2L;
+
+        statsResponseMock.setDiskIOWrite(1L);
+
+        Assert.assertEquals(expectedDiskIOWrite, statsResponseMock.diskIOWrite);
+        Assert.assertEquals(expectedDiskIopsTotal, statsResponseMock.diskIopsTotal);
+    }
+
+    @Test
+    public void setNetworkKbsReadTestWithNullInput() {
+        statsResponseMock.setNetworkKbsRead(null);
+
+        Assert.assertEquals(null, statsResponseMock.networkKbsRead);
+        Assert.assertEquals(null, statsResponseMock.networkRead);
+    }
+
+    @Test
+    public void setNetworkKbsReadTestWithInputNotNull() {
+        Long expectedNetworkKbsRead = Long.valueOf("100");
+        String expectedNetworkRead = String.format("0%s10 MB", decimalSeparator); // the actual result is 0.097 but the value is rounded to 0.10
+
+        statsResponseMock.setNetworkKbsRead(expectedNetworkKbsRead);
+
+        Assert.assertEquals(expectedNetworkKbsRead, statsResponseMock.networkKbsRead);
+        Assert.assertEquals(expectedNetworkRead, statsResponseMock.networkRead);
+    }
+
+    @Test
+    public void setNetworkKbsWriteTestWithNullInput() {
+        statsResponseMock.setNetworkKbsWrite(null);
+
+        Assert.assertEquals(null, statsResponseMock.networkKbsWrite);
+        Assert.assertEquals(null, statsResponseMock.networkWrite);
+    }
+
+    @Test
+    public void setNetworkKbsWriteTestWithInputNotNull() {
+        Long expectedNetworkKbsWrite = Long.valueOf("100");
+        String expectedNetworkWrite = String.format("0%s10 MB", decimalSeparator); // the actual result is 0.097 but the value is rounded to 0.10
+
+        statsResponseMock.setNetworkKbsWrite(expectedNetworkKbsWrite);
+
+        Assert.assertEquals(expectedNetworkKbsWrite, statsResponseMock.networkKbsWrite);
+        Assert.assertEquals(expectedNetworkWrite, statsResponseMock.networkWrite);
+    }
+}
\ No newline at end of file
diff --git a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java b/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
index 9f8280898ee..e09a3c05c87 100644
--- a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
+++ b/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
@@ -20,151 +20,36 @@
 package com.cloud.agent.api;
 
 import com.cloud.vm.UserVmVO;
-import com.cloud.vm.VmStats;
 
-public class VmStatsEntry implements VmStats {
+public class VmStatsEntry extends VmStatsEntryBase {
 
-    private long vmId;
     private UserVmVO userVmVO;
-    private double cpuUtilization;
-    private double networkReadKBs;
-    private double networkWriteKBs;
-    private double diskReadIOs;
-    private double diskWriteIOs;
-    private double diskReadKBs;
-    private double diskWriteKBs;
-    private double memoryKBs;
-    private double intfreememoryKBs;
-    private double targetmemoryKBs;
-    private int numCPUs;
-    private String entityType;
 
     public VmStatsEntry() {
-    }
-
-    public VmStatsEntry(double memoryKBs,double intfreememoryKBs,double targetmemoryKBs, double cpuUtilization, double networkReadKBs, double networkWriteKBs, int numCPUs, String entityType) {
-        this.memoryKBs = memoryKBs;
-        this.intfreememoryKBs = intfreememoryKBs;
-        this.targetmemoryKBs = targetmemoryKBs;
-        this.cpuUtilization = cpuUtilization;
-        this.networkReadKBs = networkReadKBs;
-        this.networkWriteKBs = networkWriteKBs;
-        this.numCPUs = numCPUs;
-        this.entityType = entityType;
-    }
-
-    public long getVmId() {
-        return vmId;
-    }
-
-    public void setVmId(long vmId) {
-        this.vmId = vmId;
-    }
-
-    @Override
-    public double getCPUUtilization() {
-        return cpuUtilization;
-    }
-
-    public void setCPUUtilization(double cpuUtilization) {
-        this.cpuUtilization = cpuUtilization;
-    }
-
-    @Override
-    public double getNetworkReadKBs() {
-        return networkReadKBs;
-    }
-
-    public void setNetworkReadKBs(double networkReadKBs) {
-        this.networkReadKBs = networkReadKBs;
-    }
-
-    @Override
-    public double getNetworkWriteKBs() {
-        return networkWriteKBs;
-    }
-
-    public void setNetworkWriteKBs(double networkWriteKBs) {
-        this.networkWriteKBs = networkWriteKBs;
-    }
-
-    @Override
-    public double getDiskReadIOs() {
-        return diskReadIOs;
-    }
-
-    public void setDiskReadIOs(double diskReadIOs) {
-        this.diskReadIOs = diskReadIOs;
-    }
-
-    @Override
-    public double getDiskWriteIOs() {
-        return diskWriteIOs;
-    }
-
-    public void setDiskWriteIOs(double diskWriteIOs) {
-        this.diskWriteIOs = diskWriteIOs;
-    }
-
-    @Override
-    public double getDiskReadKBs() {
-        return diskReadKBs;
-    }
-
-    public void setDiskReadKBs(double diskReadKBs) {
-        this.diskReadKBs = diskReadKBs;
-    }
-
-    @Override
-    public double getDiskWriteKBs() {
-        return diskWriteKBs;
-    }
-
-    public void setDiskWriteKBs(double diskWriteKBs) {
-        this.diskWriteKBs = diskWriteKBs;
-    }
-
-    @Override
-    public double getMemoryKBs() {
-        return memoryKBs;
-    }
-
-    public void setMemoryKBs(double memoryKBs) {
-        this.memoryKBs = memoryKBs;
-    }
-
-    @Override
-    public double getIntFreeMemoryKBs() {
-        return intfreememoryKBs;
-    }
-
-    public void setIntFreeMemoryKBs(double intfreememoryKBs) {
-        this.intfreememoryKBs = intfreememoryKBs;
-    }
-
-    @Override
-    public double getTargetMemoryKBs() {
-        return targetmemoryKBs;
-    }
-
-    public void setTargetMemoryKBs(double targetmemoryKBs) {
-        this.targetmemoryKBs = targetmemoryKBs;
-    }
-
-    public int getNumCPUs() {
-        return numCPUs;
-    }
-
-    public void setNumCPUs(int numCPUs) {
-        this.numCPUs = numCPUs;
-    }
 
-    public String getEntityType() {
-        return this.entityType;
     }
 
-    public void setEntityType(String entityType) {
-        this.entityType = entityType;
+    /**
+     * Creates an instance of {@code VmStatsEntry} with all the stats attributes filled in.
+     *
+     * @param vmId the VM ID.
+     * @param memoryKBs the memory total (in KBs).
+     * @param intFreeMemoryKBs the internal free memory (in KBs).
+     * @param targetMemoryKBs the target memory (in KBs).
+     * @param cpuUtilization the CPU utilization.
+     * @param networkReadKBs the network read (in KBs).
+     * @param networkWriteKBs the network write (in KBs).
+     * @param numCPUs the number of CPUs.
+     * @param diskReadKBs the disk read (in KBs).
+     * @param diskWriteKBs the disk write (in KBs).
+     * @param diskReadIOs the disk read I/O.
+     * @param diskWriteIOs the disk write I/O.
+     * @param entityType the entity type.
+     */
+    public VmStatsEntry(long vmId, double memoryKBs, double intFreeMemoryKBs, double targetMemoryKBs, double cpuUtilization, double networkReadKBs, double networkWriteKBs, int numCPUs,
+            double diskReadKBs, double diskWriteKBs, double diskReadIOs, double diskWriteIOs, String entityType) {
+        super(vmId, memoryKBs, intFreeMemoryKBs, targetMemoryKBs, cpuUtilization, networkReadKBs, networkWriteKBs, numCPUs, diskReadKBs, diskWriteKBs, diskReadIOs, diskWriteIOs,
+                entityType);
     }
 
     public UserVmVO getUserVmVO() {
diff --git a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java b/core/src/main/java/com/cloud/agent/api/VmStatsEntryBase.java
similarity index 67%
copy from core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
copy to core/src/main/java/com/cloud/agent/api/VmStatsEntryBase.java
index 9f8280898ee..b9f671b88f5 100644
--- a/core/src/main/java/com/cloud/agent/api/VmStatsEntry.java
+++ b/core/src/main/java/com/cloud/agent/api/VmStatsEntryBase.java
@@ -19,13 +19,11 @@
 
 package com.cloud.agent.api;
 
-import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VmStats;
 
-public class VmStatsEntry implements VmStats {
+public class VmStatsEntryBase implements VmStats {
 
     private long vmId;
-    private UserVmVO userVmVO;
     private double cpuUtilization;
     private double networkReadKBs;
     private double networkWriteKBs;
@@ -34,25 +32,49 @@ public class VmStatsEntry implements VmStats {
     private double diskReadKBs;
     private double diskWriteKBs;
     private double memoryKBs;
-    private double intfreememoryKBs;
-    private double targetmemoryKBs;
+    private double intFreeMemoryKBs;
+    private double targetMemoryKBs;
     private int numCPUs;
     private String entityType;
 
-    public VmStatsEntry() {
-    }
-
-    public VmStatsEntry(double memoryKBs,double intfreememoryKBs,double targetmemoryKBs, double cpuUtilization, double networkReadKBs, double networkWriteKBs, int numCPUs, String entityType) {
+    public VmStatsEntryBase() {
+
+    }
+
+    /**
+     * Creates an instance of {@code VmStatsEntryBase} with all the stats attributes filled in.
+     *
+     * @param memoryKBs the memory total (in KBs).
+     * @param intFreeMemoryKBs the internal free memory (in KBs).
+     * @param targetMemoryKBs the target memory (in KBs).
+     * @param cpuUtilization the CPU utilization.
+     * @param networkReadKBs the network read (in KBs).
+     * @param networkWriteKBs the network write (in KBs).
+     * @param numCPUs the number of CPUs.
+     * @param diskReadKBs the disk read (in KBs).
+     * @param diskWriteKBs the disk write (in KBs).
+     * @param diskReadIOs the disk read I/O.
+     * @param diskWriteIOs the disk write I/O.
+     * @param entityType the entity type.
+     */
+    public VmStatsEntryBase(long vmId, double memoryKBs, double intFreeMemoryKBs, double targetMemoryKBs, double cpuUtilization, double networkReadKBs, double networkWriteKBs, int numCPUs,
+            double diskReadKBs, double diskWriteKBs, double diskReadIOs, double diskWriteIOs, String entityType) {
+        this.vmId = vmId;
         this.memoryKBs = memoryKBs;
-        this.intfreememoryKBs = intfreememoryKBs;
-        this.targetmemoryKBs = targetmemoryKBs;
+        this.intFreeMemoryKBs = intFreeMemoryKBs;
+        this.targetMemoryKBs = targetMemoryKBs;
         this.cpuUtilization = cpuUtilization;
         this.networkReadKBs = networkReadKBs;
         this.networkWriteKBs = networkWriteKBs;
         this.numCPUs = numCPUs;
+        this.diskReadKBs = diskReadKBs;
+        this.diskWriteKBs = diskWriteKBs;
+        this.diskReadIOs = diskReadIOs;
+        this.diskWriteIOs = diskWriteIOs;
         this.entityType = entityType;
     }
 
+
     public long getVmId() {
         return vmId;
     }
@@ -135,20 +157,20 @@ public class VmStatsEntry implements VmStats {
 
     @Override
     public double getIntFreeMemoryKBs() {
-        return intfreememoryKBs;
+        return intFreeMemoryKBs;
     }
 
-    public void setIntFreeMemoryKBs(double intfreememoryKBs) {
-        this.intfreememoryKBs = intfreememoryKBs;
+    public void setIntFreeMemoryKBs(double intFreeMemoryKBs) {
+        this.intFreeMemoryKBs = intFreeMemoryKBs;
     }
 
     @Override
     public double getTargetMemoryKBs() {
-        return targetmemoryKBs;
+        return targetMemoryKBs;
     }
 
-    public void setTargetMemoryKBs(double targetmemoryKBs) {
-        this.targetmemoryKBs = targetmemoryKBs;
+    public void setTargetMemoryKBs(double targetMemoryKBs) {
+        this.targetMemoryKBs = targetMemoryKBs;
     }
 
     public int getNumCPUs() {
@@ -167,12 +189,4 @@ public class VmStatsEntry implements VmStats {
         this.entityType = entityType;
     }
 
-    public UserVmVO getUserVmVO() {
-        return userVmVO;
-    }
-
-    public void setUserVmVO(UserVmVO userVmVO) {
-        this.userVmVO = userVmVO;
-    }
-
-}
+}
\ No newline at end of file
diff --git a/engine/schema/src/main/java/com/cloud/vm/VmStatsVO.java b/engine/schema/src/main/java/com/cloud/vm/VmStatsVO.java
new file mode 100644
index 00000000000..debdf2d6403
--- /dev/null
+++ b/engine/schema/src/main/java/com/cloud/vm/VmStatsVO.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm;
+
+import java.util.Date;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import javax.persistence.Temporal;
+import javax.persistence.TemporalType;
+
+import org.apache.cloudstack.utils.reflectiontostringbuilderutils.ReflectionToStringBuilderUtils;
+
+@Entity
+@Table(name = "vm_stats")
+public class VmStatsVO {
+
+    @Id
+    @Column(name = "id", updatable = false, nullable = false)
+    protected long id;
+
+    @Column(name = "vm_id", updatable = false, nullable = false)
+    protected Long vmId;
+
+    @Column(name = "mgmt_server_id", updatable = false, nullable = false)
+    protected Long mgmtServerId;
+
+    @Column(name= "timestamp", updatable = false)
+    @Temporal(value = TemporalType.TIMESTAMP)
+    protected Date timestamp;
+
+    @Column(name = "vm_stats_data", updatable = false, nullable = false, length = 65535)
+    protected String vmStatsData;
+
+    public VmStatsVO(Long vmId, Long mgmtServerId, Date timestamp, String vmStatsData) {
+        this.vmId = vmId;
+        this.mgmtServerId = mgmtServerId;
+        this.timestamp = timestamp;
+        this.vmStatsData = vmStatsData;
+    }
+
+    public VmStatsVO() {
+
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    public Long getVmId() {
+        return vmId;
+    }
+
+    public Long getMgmtServerId() {
+        return mgmtServerId;
+    }
+
+    public Date getTimestamp() {
+        return timestamp;
+    }
+
+    public String getVmStatsData() {
+        return vmStatsData;
+    }
+
+    @Override
+    public String toString() {
+        return ReflectionToStringBuilderUtils.reflectOnlySelectedFields(this, "vmId", "mgmtServerId", "timestamp", "vmStatsData");
+    }
+
+}
\ No newline at end of file
diff --git a/engine/schema/src/main/java/com/cloud/vm/dao/VmStatsDao.java b/engine/schema/src/main/java/com/cloud/vm/dao/VmStatsDao.java
new file mode 100644
index 00000000000..839d5f1922c
--- /dev/null
+++ b/engine/schema/src/main/java/com/cloud/vm/dao/VmStatsDao.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm.dao;
+
+import java.util.Date;
+import java.util.List;
+
+import com.cloud.utils.db.GenericDao;
+import com.cloud.vm.VmStatsVO;
+
+/**
+ * Data Access Object for vm_stats table.
+ */
+public interface VmStatsDao extends GenericDao<VmStatsVO, Long> {
+
+    /**
+     * Finds VM stats by VM ID.
+     * @param vmId the VM ID.
+     * @return list of stats for the specified VM.
+     */
+    List<VmStatsVO> findByVmId(long vmId);
+
+    /**
+     * Finds VM stats by VM ID. The result is sorted by timestamp in descending order.
+     * @param vmId the VM ID.
+     * @return ordered list of stats for the specified VM.
+     */
+    List<VmStatsVO> findByVmIdOrderByTimestampDesc(long vmId);
+
+    /**
+     * Finds stats by VM ID and timestamp >= a given time.
+     * @param vmId the specific VM.
+     * @param time the specific time.
+     * @return list of stats for the specified VM, with timestamp >= the specified time.
+     */
+    List<VmStatsVO> findByVmIdAndTimestampGreaterThanEqual(long vmId, Date time);
+
+    /**
+     * Finds stats by VM ID and timestamp <= a given time.
+     * @param vmId the specific VM.
+     * @param time the specific time.
+     * @return list of stats for the specified VM, with timestamp <= the specified time.
+     */
+    List<VmStatsVO> findByVmIdAndTimestampLessThanEqual(long vmId, Date time);
+
+    /**
+     * Finds stats by VM ID and timestamp between a given time range.
+     * @param vmId the specific VM.
+     * @param startTime the start time.
+     * @param endTime the start time.
+     * @return list of stats for the specified VM, between the specified start and end times.
+     */
+    List<VmStatsVO> findByVmIdAndTimestampBetween(long vmId, Date startTime, Date endTime);
+
+    /**
+     * Removes (expunges) all stats of the specified VM.
+     * @param vmId the VM ID to remove stats.
+     */
+    void removeAllByVmId(long vmId);
+
+    /**
+     * Removes (expunges) all VM stats with {@code timestamp} less than
+     * a given Date.
+     * @param limit the maximum date to keep stored. Records that exceed this limit will be removed.
+     */
+    void removeAllByTimestampLessThan(Date limit);
+
+}
\ No newline at end of file
diff --git a/engine/schema/src/main/java/com/cloud/vm/dao/VmStatsDaoImpl.java b/engine/schema/src/main/java/com/cloud/vm/dao/VmStatsDaoImpl.java
new file mode 100644
index 00000000000..f22687db127
--- /dev/null
+++ b/engine/schema/src/main/java/com/cloud/vm/dao/VmStatsDaoImpl.java
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm.dao;
+
+import java.util.Date;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+
+import org.springframework.stereotype.Component;
+
+import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.GenericDaoBase;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.SearchCriteria.Op;
+import com.cloud.vm.VmStatsVO;
+
+@Component
+public class VmStatsDaoImpl extends GenericDaoBase<VmStatsVO, Long> implements VmStatsDao {
+
+    protected SearchBuilder<VmStatsVO> vmIdSearch;
+    protected SearchBuilder<VmStatsVO> vmIdTimestampGreaterThanEqualSearch;
+    protected SearchBuilder<VmStatsVO> vmIdTimestampLessThanEqualSearch;
+    protected SearchBuilder<VmStatsVO> vmIdTimestampBetweenSearch;
+    protected SearchBuilder<VmStatsVO> timestampSearch;
+
+    @PostConstruct
+    protected void init() {
+        vmIdSearch = createSearchBuilder();
+        vmIdSearch.and("vmId", vmIdSearch.entity().getVmId(), Op.EQ);
+        vmIdSearch.done();
+
+        vmIdTimestampGreaterThanEqualSearch = createSearchBuilder();
+        vmIdTimestampGreaterThanEqualSearch.and("vmId", vmIdTimestampGreaterThanEqualSearch.entity().getVmId(), Op.EQ);
+        vmIdTimestampGreaterThanEqualSearch.and("timestamp", vmIdTimestampGreaterThanEqualSearch.entity().getTimestamp(), Op.GTEQ);
+        vmIdTimestampGreaterThanEqualSearch.done();
+
+        vmIdTimestampLessThanEqualSearch = createSearchBuilder();
+        vmIdTimestampLessThanEqualSearch.and("vmId", vmIdTimestampLessThanEqualSearch.entity().getVmId(), Op.EQ);
+        vmIdTimestampLessThanEqualSearch.and("timestamp", vmIdTimestampLessThanEqualSearch.entity().getTimestamp(), Op.LTEQ);
+        vmIdTimestampLessThanEqualSearch.done();
+
+        vmIdTimestampBetweenSearch = createSearchBuilder();
+        vmIdTimestampBetweenSearch.and("vmId", vmIdTimestampBetweenSearch.entity().getVmId(), Op.EQ);
+        vmIdTimestampBetweenSearch.and("timestamp", vmIdTimestampBetweenSearch.entity().getTimestamp(), Op.BETWEEN);
+        vmIdTimestampBetweenSearch.done();
+
+        timestampSearch = createSearchBuilder();
+        timestampSearch.and("timestamp", timestampSearch.entity().getTimestamp(), Op.LT);
+        timestampSearch.done();
+
+    }
+
+    @Override
+    public List<VmStatsVO> findByVmId(long vmId) {
+        SearchCriteria<VmStatsVO> sc = vmIdSearch.create();
+        sc.setParameters("vmId", vmId);
+        return listBy(sc);
+    }
+
+    @Override
+    public List<VmStatsVO> findByVmIdOrderByTimestampDesc(long vmId) {
+        SearchCriteria<VmStatsVO> sc = vmIdSearch.create();
+        sc.setParameters("vmId", vmId);
+        Filter orderByFilter = new Filter(VmStatsVO.class, "timestamp", false, null, null);
+        return search(sc, orderByFilter, null, false);
+    }
+
+    @Override
+    public List<VmStatsVO> findByVmIdAndTimestampGreaterThanEqual(long vmId, Date time) {
+        SearchCriteria<VmStatsVO> sc = vmIdTimestampGreaterThanEqualSearch.create();
+        sc.setParameters("vmId", vmId);
+        sc.setParameters("timestamp", time);
+        return listBy(sc);
+    }
+
+    @Override
+    public List<VmStatsVO> findByVmIdAndTimestampLessThanEqual(long vmId, Date time) {
+        SearchCriteria<VmStatsVO> sc = vmIdTimestampLessThanEqualSearch.create();
+        sc.setParameters("vmId", vmId);
+        sc.setParameters("timestamp", time);
+        return listBy(sc);
+    }
+
+    @Override
+    public List<VmStatsVO> findByVmIdAndTimestampBetween(long vmId, Date startTime, Date endTime) {
+        SearchCriteria<VmStatsVO> sc = vmIdTimestampBetweenSearch.create();
+        sc.setParameters("vmId", vmId);
+        sc.setParameters("timestamp", startTime, endTime);
+        return listBy(sc);
+    }
+
+    @Override
+    public void removeAllByVmId(long vmId) {
+        SearchCriteria<VmStatsVO> sc = vmIdSearch.create();
+        sc.setParameters("vmId", vmId);
+        expunge(sc);
+    }
+
+    @Override
+    public void removeAllByTimestampLessThan(Date limit) {
+        SearchCriteria<VmStatsVO> sc = timestampSearch.create();
+        sc.setParameters("timestamp", limit);
+        expunge(sc);
+    }
+
+}
\ No newline at end of file
diff --git a/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml b/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml
index 508b01c2b57..437b507cd60 100644
--- a/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml
+++ b/engine/schema/src/main/resources/META-INF/cloudstack/core/spring-engine-schema-core-daos-context.xml
@@ -239,6 +239,7 @@
   <bean id="vMRootDiskTagDaoImpl" class="org.apache.cloudstack.engine.cloud.entity.api.db.dao.VMRootDiskTagDaoImpl" />
   <bean id="vMSnapshotDaoImpl" class="com.cloud.vm.snapshot.dao.VMSnapshotDaoImpl" />
   <bean id="vMSnapshotDetailsDaoImpl" class="com.cloud.vm.snapshot.dao.VMSnapshotDetailsDaoImpl" />
+  <bean id="vmStatsDaoImpl" class="com.cloud.vm.dao.VmStatsDaoImpl" />
   <bean id="vMTemplateDetailsDaoImpl" class="com.cloud.storage.dao.VMTemplateDetailsDaoImpl" />
   <bean id="vMTemplatePoolDaoImpl" class="com.cloud.storage.dao.VMTemplatePoolDaoImpl" />
   <bean id="vMTemplateZoneDaoImpl" class="com.cloud.storage.dao.VMTemplateZoneDaoImpl" />
diff --git a/engine/schema/src/main/resources/META-INF/db/schema-41610to41700.sql b/engine/schema/src/main/resources/META-INF/db/schema-41610to41700.sql
index a28f1cc7b0d..15a589da447 100644
--- a/engine/schema/src/main/resources/META-INF/db/schema-41610to41700.sql
+++ b/engine/schema/src/main/resources/META-INF/db/schema-41610to41700.sql
@@ -655,3 +655,16 @@ INSERT INTO `cloud`.`user_vm_details`(`vm_id`, `name`, `value`)
 ALTER TABLE `cloud`.`kubernetes_cluster` ADD COLUMN `security_group_id` bigint unsigned DEFAULT NULL,
 ADD CONSTRAINT `fk_kubernetes_cluster__security_group_id` FOREIGN KEY `fk_kubernetes_cluster__security_group_id`(`security_group_id`) REFERENCES `security_group`(`id`) ON DELETE CASCADE;
 
+-- PR#5984 Create table to persist VM stats.
+DROP TABLE IF EXISTS `cloud`.`vm_stats`;
+CREATE TABLE `cloud`.`vm_stats` (
+  `id` bigint unsigned NOT NULL auto_increment COMMENT 'id',
+  `vm_id` bigint unsigned NOT NULL,
+  `mgmt_server_id` bigint unsigned NOT NULL,
+  `timestamp` datetime NOT NULL,
+  `vm_stats_data` text NOT NULL,
+  PRIMARY KEY (`id`)
+  ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+-- PR#5984 Update name for global configuration vm.stats.increment.metrics
+Update configuration set name='vm.stats.increment.metrics' where name='vm.stats.increment.metrics.in.memory';
diff --git a/plugins/hypervisors/simulator/src/main/java/com/cloud/agent/manager/MockVmManagerImpl.java b/plugins/hypervisors/simulator/src/main/java/com/cloud/agent/manager/MockVmManagerImpl.java
index 365dfb097d2..67f3e95e872 100644
--- a/plugins/hypervisors/simulator/src/main/java/com/cloud/agent/manager/MockVmManagerImpl.java
+++ b/plugins/hypervisors/simulator/src/main/java/com/cloud/agent/manager/MockVmManagerImpl.java
@@ -319,7 +319,7 @@ public class MockVmManagerImpl extends ManagerBase implements MockVmManager {
         final HashMap<String, VmStatsEntry> vmStatsNameMap = new HashMap<String, VmStatsEntry>();
         final List<String> vmNames = cmd.getVmNames();
         for (final String vmName : vmNames) {
-            final VmStatsEntry entry = new VmStatsEntry(0, 0, 0, 0, 0, 0, 0, "vm");
+            final VmStatsEntry entry = new VmStatsEntry(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "vm");
             entry.setNetworkReadKBs(32768); // default values 256 KBps
             entry.setNetworkWriteKBs(16384);
             entry.setCPUUtilization(10);
diff --git a/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java b/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java
index 19e2a3eacba..7cca51a6d00 100644
--- a/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java
+++ b/plugins/hypervisors/vmware/src/main/java/com/cloud/hypervisor/vmware/resource/VmwareResource.java
@@ -6424,12 +6424,8 @@ public class VmwareResource implements StoragePoolResource, ServerResource, Vmwa
                         }
                     }
 
-                    final VmStatsEntry vmStats = new VmStatsEntry(NumberUtils.toDouble(memkb) * 1024, NumberUtils.toDouble(guestMemusage) * 1024, NumberUtils.toDouble(memlimit) * 1024,
-                            maxCpuUsage, networkReadKBs, networkWriteKBs, NumberUtils.toInt(numberCPUs), "vm");
-                    vmStats.setDiskReadIOs(diskReadIops);
-                    vmStats.setDiskWriteIOs(diskWriteIops);
-                    vmStats.setDiskReadKBs(diskReadKbs);
-                    vmStats.setDiskWriteKBs(diskWriteKbs);
+                    final VmStatsEntry vmStats = new VmStatsEntry(0, NumberUtils.toDouble(memkb) * 1024, NumberUtils.toDouble(guestMemusage) * 1024, NumberUtils.toDouble(memlimit) * 1024,
+                            maxCpuUsage, networkReadKBs, networkWriteKBs, NumberUtils.toInt(numberCPUs), diskReadKbs, diskWriteKbs, diskReadIops, diskWriteIops, "vm");
                     vmResponseMap.put(name, vmStats);
 
                 }
diff --git a/plugins/hypervisors/xenserver/src/main/java/com/cloud/hypervisor/xenserver/resource/CitrixResourceBase.java b/plugins/hypervisors/xenserver/src/main/java/com/cloud/hypervisor/xenserver/resource/CitrixResourceBase.java
index 1651e4049a6..a95c07bdd19 100644
--- a/plugins/hypervisors/xenserver/src/main/java/com/cloud/hypervisor/xenserver/resource/CitrixResourceBase.java
+++ b/plugins/hypervisors/xenserver/src/main/java/com/cloud/hypervisor/xenserver/resource/CitrixResourceBase.java
@@ -3446,7 +3446,7 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
         final HashMap<String, VmStatsEntry> vmResponseMap = new HashMap<String, VmStatsEntry>();
 
         for (final String vmUUID : vmUUIDs) {
-            vmResponseMap.put(vmUUID, new VmStatsEntry(0, 0, 0, 0, 0, 0, 0, "vm"));
+            vmResponseMap.put(vmUUID, new VmStatsEntry(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "vm"));
         }
 
         final Object[] rrdData = getRRDData(conn, 2); // call rrddata with 2 for
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsMetricsCmd.java b/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsMetricsCmd.java
index 947c2f99ba9..89d596e1f05 100644
--- a/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsMetricsCmd.java
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsMetricsCmd.java
@@ -27,9 +27,23 @@ import org.apache.cloudstack.response.VmMetricsResponse;
 import javax.inject.Inject;
 import java.util.List;
 
+/**
+ * API supported for backward compatibility. Use the {@link ListVMsUsageHistoryCmd} API instead. <br>
+ * The reasons for this are: <br>
+ * <ul>
+ *     <li>While API {@link ListVMsMetricsCmd} allows ACS users to get only the most recent stats data
+ *     from VMs or their cumulative data, the {@link ListVMsUsageHistoryCmd} API allows getting historical
+ *     data by filtering by specific VMs and periods.</li>
+ *     <li>{@link ListVMsMetricsCmd} just extends the {@link ListVMsCmd} API, so it inherits all of
+ *     its parameters, even if some of them are not suitable/useful for the API purpose.</li>
+ *     <li>{@link ListVMsMetricsCmd} returns all VM information just like the {@link ListVMsCmd} API,
+ *     although most of it is not suitable/useful for the API purpose.</li>
+ * </ul>
+ */
 @APICommand(name = ListVMsMetricsCmd.APINAME, description = "Lists VM metrics", responseObject = VmMetricsResponse.class,
         requestHasSensitiveInfo = false, responseHasSensitiveInfo = false,  responseView = ResponseObject.ResponseView.Full,
         since = "4.9.3", authorized = {RoleType.Admin,  RoleType.ResourceAdmin, RoleType.DomainAdmin, RoleType.User})
+@Deprecated(since = "4.17.0")
 public class ListVMsMetricsCmd extends ListVMsCmd {
     public static final String APINAME = "listVirtualMachinesMetrics";
 
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java b/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java
new file mode 100644
index 00000000000..9c6df625231
--- /dev/null
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/api/ListVMsUsageHistoryCmd.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.api;
+
+import java.util.Date;
+import java.util.List;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.acl.RoleType;
+import org.apache.cloudstack.api.response.ListResponse;
+import org.apache.cloudstack.api.response.UserVmResponse;
+import org.apache.cloudstack.metrics.MetricsService;
+import org.apache.cloudstack.response.VmMetricsStatsResponse;
+
+@APICommand(name = ListVMsUsageHistoryCmd.APINAME, description = "Lists VM stats", responseObject = VmMetricsStatsResponse.class,
+        requestHasSensitiveInfo = false, responseHasSensitiveInfo = false, since = "4.17",
+        authorized = {RoleType.Admin,  RoleType.ResourceAdmin, RoleType.DomainAdmin, RoleType.User})
+public class ListVMsUsageHistoryCmd extends BaseListCmd {
+    public static final String APINAME = "listVirtualMachinesUsageHistory";
+
+    @Inject
+    private MetricsService metricsService;
+
+    /////////////////////////////////////////////////////
+    //////////////// API parameters /////////////////////
+    /////////////////////////////////////////////////////
+
+    @Parameter(name = ApiConstants.ID, type = CommandType.UUID, entityType = UserVmResponse.class, description = "the ID of the virtual machine.")
+    private Long id;
+
+    @Parameter(name=ApiConstants.IDS, type=CommandType.LIST, collectionType=CommandType.UUID, entityType=UserVmResponse.class, description="the IDs of the virtual machines, mutually exclusive with id.")
+    private List<Long> ids;
+
+    @Parameter(name = ApiConstants.NAME, type = CommandType.STRING, description = "name of the virtual machine (a substring match is made against the parameter value returning the data for all matching VMs).")
+    private String name;
+
+    @Parameter(name = ApiConstants.START_DATE, type = CommandType.DATE, description = "start date to filter VM stats."
+            + "Use format \"yyyy-MM-dd hh:mm:ss\")")
+    private Date startDate;
+
+    @Parameter(name = ApiConstants.END_DATE, type = CommandType.DATE, description = "end date to filter VM stats."
+            + "Use format \"yyyy-MM-dd hh:mm:ss\")")
+    private Date endDate;
+
+    /////////////////////////////////////////////////////
+    /////////////////// Accessors ///////////////////////
+    /////////////////////////////////////////////////////
+
+    public Long getId() {
+        return id;
+    }
+
+    public List<Long> getIds() {
+        return ids;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public Date getStartDate() {
+        return startDate;
+    }
+
+    public Date getEndDate() {
+        return endDate;
+    }
+
+    /////////////////////////////////////////////////////
+    /////////////// API Implementation///////////////////
+    /////////////////////////////////////////////////////
+
+    @Override
+    public String getCommandName() {
+        return APINAME.toLowerCase() + BaseCmd.RESPONSE_SUFFIX;
+    }
+
+    @Override
+    public void execute() {
+        ListResponse<VmMetricsStatsResponse> response = metricsService.searchForVmMetricsStats(this);
+        response.setResponseName(getCommandName());
+        setResponseObject(response);
+    }
+}
\ No newline at end of file
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsService.java b/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsService.java
index a1e0289ff17..26c52f74b14 100644
--- a/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsService.java
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsService.java
@@ -19,8 +19,11 @@ package org.apache.cloudstack.metrics;
 
 import com.cloud.utils.Pair;
 import com.cloud.utils.component.PluggableService;
+
+import org.apache.cloudstack.api.ListVMsUsageHistoryCmd;
 import org.apache.cloudstack.api.response.ClusterResponse;
 import org.apache.cloudstack.api.response.HostResponse;
+import org.apache.cloudstack.api.response.ListResponse;
 import org.apache.cloudstack.api.response.StoragePoolResponse;
 import org.apache.cloudstack.api.response.UserVmResponse;
 import org.apache.cloudstack.api.response.VolumeResponse;
@@ -30,6 +33,7 @@ import org.apache.cloudstack.response.HostMetricsResponse;
 import org.apache.cloudstack.response.InfrastructureResponse;
 import org.apache.cloudstack.response.StoragePoolMetricsResponse;
 import org.apache.cloudstack.response.VmMetricsResponse;
+import org.apache.cloudstack.response.VmMetricsStatsResponse;
 import org.apache.cloudstack.response.VolumeMetricsResponse;
 import org.apache.cloudstack.response.ZoneMetricsResponse;
 
@@ -38,6 +42,7 @@ import java.util.List;
 public interface MetricsService extends PluggableService {
     InfrastructureResponse listInfrastructure();
 
+    ListResponse<VmMetricsStatsResponse> searchForVmMetricsStats(ListVMsUsageHistoryCmd cmd);
     List<VolumeMetricsResponse> listVolumeMetrics(List<VolumeResponse> volumeResponses);
     List<VmMetricsResponse> listVmMetrics(List<UserVmResponse> vmResponses);
     List<StoragePoolMetricsResponse> listStoragePoolMetrics(List<StoragePoolResponse> poolResponses);
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsServiceImpl.java b/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsServiceImpl.java
index cb16501ed8d..df6dc09cf3b 100644
--- a/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsServiceImpl.java
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/metrics/MetricsServiceImpl.java
@@ -18,8 +18,12 @@
 package org.apache.cloudstack.metrics;
 
 import java.lang.reflect.InvocationTargetException;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.inject.Inject;
 
@@ -29,11 +33,14 @@ import org.apache.cloudstack.api.ListHostsMetricsCmd;
 import org.apache.cloudstack.api.ListInfrastructureCmd;
 import org.apache.cloudstack.api.ListStoragePoolsMetricsCmd;
 import org.apache.cloudstack.api.ListVMsMetricsCmd;
+import org.apache.cloudstack.api.ListVMsUsageHistoryCmd;
 import org.apache.cloudstack.api.ListVolumesMetricsCmd;
 import org.apache.cloudstack.api.ListZonesMetricsCmd;
 import org.apache.cloudstack.api.ServerApiException;
 import org.apache.cloudstack.api.response.ClusterResponse;
 import org.apache.cloudstack.api.response.HostResponse;
+import org.apache.cloudstack.api.response.ListResponse;
+import org.apache.cloudstack.api.response.StatsResponse;
 import org.apache.cloudstack.api.response.StoragePoolResponse;
 import org.apache.cloudstack.api.response.UserVmResponse;
 import org.apache.cloudstack.api.response.VolumeResponse;
@@ -44,15 +51,20 @@ import org.apache.cloudstack.response.HostMetricsResponse;
 import org.apache.cloudstack.response.InfrastructureResponse;
 import org.apache.cloudstack.response.StoragePoolMetricsResponse;
 import org.apache.cloudstack.response.VmMetricsResponse;
+import org.apache.cloudstack.response.VmMetricsStatsResponse;
 import org.apache.cloudstack.response.VolumeMetricsResponse;
 import org.apache.cloudstack.response.ZoneMetricsResponse;
 import org.apache.cloudstack.storage.datastore.db.ImageStoreDao;
 import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 
+import com.cloud.agent.api.VmStatsEntryBase;
 import com.cloud.alert.AlertManager;
 import com.cloud.alert.dao.AlertDao;
 import com.cloud.api.ApiDBUtils;
+import com.cloud.api.query.MutualExclusiveIdsManagerBase;
 import com.cloud.api.query.dao.HostJoinDao;
 import com.cloud.api.query.vo.HostJoinVO;
 import com.cloud.capacity.Capacity;
@@ -65,6 +77,7 @@ import com.cloud.dc.dao.ClusterDao;
 import com.cloud.dc.dao.DataCenterDao;
 import com.cloud.dc.dao.HostPodDao;
 import com.cloud.deploy.DeploymentClusterPlanner;
+import com.cloud.exception.InvalidParameterValueException;
 import com.cloud.host.Host;
 import com.cloud.host.HostStats;
 import com.cloud.host.Status;
@@ -76,13 +89,20 @@ import com.cloud.org.Managed;
 import com.cloud.user.Account;
 import com.cloud.user.AccountManager;
 import com.cloud.utils.Pair;
-import com.cloud.utils.component.ComponentLifecycleBase;
+import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.VmStatsVO;
 import com.cloud.vm.dao.DomainRouterDao;
+import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.VMInstanceDao;
+import com.cloud.vm.dao.VmStatsDao;
+import com.google.gson.Gson;
 
-public class MetricsServiceImpl extends ComponentLifecycleBase implements MetricsService {
+public class MetricsServiceImpl extends MutualExclusiveIdsManagerBase implements MetricsService {
 
     @Inject
     private DataCenterDao dataCenterDao;
@@ -110,18 +130,17 @@ public class MetricsServiceImpl extends ComponentLifecycleBase implements Metric
     private ManagementServerHostDao managementServerHostDao;
     @Inject
     private AlertDao alertDao;
+    @Inject
+    protected UserVmDao userVmDao;
+    @Inject
+    protected VmStatsDao vmStatsDao;
+
+    private static Gson gson = new Gson();
 
     protected MetricsServiceImpl() {
         super();
     }
 
-    private Double findRatioValue(final String value) {
-        if (value != null) {
-            return Double.valueOf(value);
-        }
-        return 1.0;
-    }
-
     private void updateHostMetrics(final Metrics metrics, final HostJoinVO host) {
         metrics.incrTotalHosts();
         metrics.addCpuAllocated(host.getCpuReservedCapacity() + host.getCpuUsedCapacity());
@@ -134,6 +153,169 @@ public class MetricsServiceImpl extends ComponentLifecycleBase implements Metric
             metrics.setMaximumMemoryUsage((long) hostStats.getUsedMemory());
         }
     }
+    /**
+     * Searches for VM stats based on the {@code ListVMsUsageHistoryCmd} parameters.
+     *
+     * @param cmd the {@link ListVMsUsageHistoryCmd} specifying what should be searched.
+     * @return the list of VM metrics stats found.
+     */
+    @Override
+    public ListResponse<VmMetricsStatsResponse> searchForVmMetricsStats(ListVMsUsageHistoryCmd cmd) {
+        Pair<List<UserVmVO>, Integer> userVmList = searchForUserVmsInternal(cmd);
+        Map<Long,List<VmStatsVO>> vmStatsList = searchForVmMetricsStatsInternal(cmd, userVmList.first());
+        return createVmMetricsStatsResponse(userVmList, vmStatsList);
+    }
+
+    /**
+     * Searches VMs based on {@code ListVMsUsageHistoryCmd} parameters.
+     *
+     * @param cmd the {@link ListVMsUsageHistoryCmd} specifying the parameters.
+     * @return the list of VMs.
+     */
+    protected Pair<List<UserVmVO>, Integer> searchForUserVmsInternal(ListVMsUsageHistoryCmd cmd) {
+        Filter searchFilter = new Filter(UserVmVO.class, "id", true, cmd.getStartIndex(), cmd.getPageSizeVal());
+        List<Long> ids = getIdsListFromCmd(cmd.getId(), cmd.getIds());
+        String name = cmd.getName();
+        String keyword = cmd.getKeyword();
+
+        SearchBuilder<UserVmVO> sb =  userVmDao.createSearchBuilder();
+        sb.and("idIN", sb.entity().getId(), SearchCriteria.Op.IN);
+        sb.and("displayName", sb.entity().getDisplayName(), SearchCriteria.Op.LIKE);
+        sb.and("state", sb.entity().getState(), SearchCriteria.Op.EQ);
+
+        SearchCriteria<UserVmVO> sc = sb.create();
+        if (CollectionUtils.isNotEmpty(ids)) {
+            sc.setParameters("idIN", ids.toArray());
+        }
+        if (StringUtils.isNotBlank(name)) {
+            sc.setParameters("displayName", "%" + name + "%");
+        }
+        if (StringUtils.isNotBlank(keyword)) {
+            SearchCriteria<UserVmVO> ssc = userVmDao.createSearchCriteria();
+            ssc.addOr("displayName", SearchCriteria.Op.LIKE, "%" + keyword + "%");
+            ssc.addOr("state", SearchCriteria.Op.EQ, keyword);
+            sc.addAnd("displayName", SearchCriteria.Op.SC, ssc);
+        }
+
+        return userVmDao.searchAndCount(sc, searchFilter);
+    }
+
+    /**
+     * Searches stats for a list of VMs, based on date filtering parameters.
+     *
+     * @param cmd the {@link ListVMsUsageHistoryCmd} specifying the filtering parameters.
+     * @param userVmList the list of VMs for which stats should be searched.
+     * @return the key-value map in which keys are VM IDs and values are lists of VM stats.
+     */
+    protected Map<Long,List<VmStatsVO>> searchForVmMetricsStatsInternal(ListVMsUsageHistoryCmd cmd, List<UserVmVO> userVmList) {
+        Map<Long,List<VmStatsVO>> vmStatsVOList = new HashMap<Long,List<VmStatsVO>>();
+        Date startDate = cmd.getStartDate();
+        Date endDate = cmd.getEndDate();
+
+        validateDateParams(startDate, endDate);
+
+        for (UserVmVO userVmVO : userVmList) {
+            Long vmId = userVmVO.getId();
+            vmStatsVOList.put(vmId, findVmStatsAccordingToDateParams(vmId, startDate, endDate));
+        }
+
+        return vmStatsVOList;
+    }
+
+    /**
+     * Checks if {@code startDate} is after {@code endDate} (when both are not null)
+     * and throws an {@link InvalidParameterValueException} if so.
+     *
+     * @param startDate the start date to be validated.
+     * @param endDate the end date to be validated.
+     */
+    protected void validateDateParams(Date startDate, Date endDate) {
+        if ((startDate != null && endDate != null) && (startDate.after(endDate))){
+            throw new InvalidParameterValueException("startDate cannot be after endDate.");
+        }
+    }
+
+    /**
+     * Finds stats for a specific VM based on date parameters.
+     *
+     * @param vmId the specific VM.
+     * @param startDate the start date to filtering.
+     * @param endDate the end date to filtering.
+     * @return the list of stats for the specified VM.
+     */
+    protected List<VmStatsVO> findVmStatsAccordingToDateParams(Long vmId, Date startDate, Date endDate){
+        if (startDate != null && endDate != null) {
+            return vmStatsDao.findByVmIdAndTimestampBetween(vmId, startDate, endDate);
+        }
+        if (startDate != null) {
+            return vmStatsDao.findByVmIdAndTimestampGreaterThanEqual(vmId, startDate);
+        }
+        if (endDate != null) {
+            return vmStatsDao.findByVmIdAndTimestampLessThanEqual(vmId, endDate);
+        }
+        return vmStatsDao.findByVmId(vmId);
+    }
+
+    /**
+     * Creates a {@code ListResponse<VmMetricsStatsResponse>}. For each VM, this joins essential VM info
+     * with its respective list of stats.
+     *
+     * @param userVmList the list of VMs.
+     * @param vmStatsList the respective list of stats.
+     * @return the list of responses that was created.
+     */
+    protected ListResponse<VmMetricsStatsResponse> createVmMetricsStatsResponse(Pair<List<UserVmVO>, Integer> userVmList,
+            Map<Long,List<VmStatsVO>> vmStatsList) {
+        List<VmMetricsStatsResponse> responses = new ArrayList<VmMetricsStatsResponse>();
+        for (UserVmVO userVmVO : userVmList.first()) {
+            VmMetricsStatsResponse vmMetricsStatsResponse = new VmMetricsStatsResponse();
+            vmMetricsStatsResponse.setObjectName("virtualmachine");
+            vmMetricsStatsResponse.setId(userVmVO.getUuid());
+            vmMetricsStatsResponse.setName(userVmVO.getName());
+            vmMetricsStatsResponse.setDisplayName(userVmVO.getDisplayName());
+
+            vmMetricsStatsResponse.setStats(createStatsResponse(vmStatsList.get(userVmVO.getId())));
+            responses.add(vmMetricsStatsResponse);
+        }
+
+        ListResponse<VmMetricsStatsResponse> response = new ListResponse<VmMetricsStatsResponse>();
+        response.setResponses(responses);
+        return response;
+    }
+
+    /**
+     * Creates a {@code Set<StatsResponse>} from a given {@code List<VmStatsVO>}.
+     *
+     * @param vmStatsList the list of VM stats.
+     * @return the set of responses that was created.
+     */
+    protected List<StatsResponse> createStatsResponse(List<VmStatsVO> vmStatsList) {
+        List<StatsResponse> statsResponseList = new ArrayList<StatsResponse>();
+        DecimalFormat decimalFormat = new DecimalFormat("#.##");
+        for (VmStatsVO vmStats : vmStatsList) {
+            StatsResponse response = new StatsResponse();
+            response.setTimestamp(vmStats.getTimestamp());
+
+            VmStatsEntryBase statsEntry = gson.fromJson(vmStats.getVmStatsData(), VmStatsEntryBase.class);
+            response.setCpuUsed(decimalFormat.format(statsEntry.getCPUUtilization()) + "%");
+            response.setNetworkKbsRead((long)statsEntry.getNetworkReadKBs());
+            response.setNetworkKbsWrite((long)statsEntry.getNetworkWriteKBs());
+            response.setDiskKbsRead((long)statsEntry.getDiskReadKBs());
+            response.setDiskKbsWrite((long)statsEntry.getDiskWriteKBs());
+            response.setDiskIORead((long)statsEntry.getDiskReadIOs());
+            response.setDiskIOWrite((long)statsEntry.getDiskWriteIOs());
+            long totalMemory = (long)statsEntry.getMemoryKBs();
+            long freeMemory = (long)statsEntry.getIntFreeMemoryKBs();
+            long correctedFreeMemory = freeMemory >= totalMemory ? 0 : freeMemory;
+            response.setMemoryKBs(totalMemory);
+            response.setMemoryIntFreeKBs(correctedFreeMemory);
+            response.setMemoryTargetKBs((long)statsEntry.getTargetMemoryKBs());
+
+            statsResponseList.add(response);
+        }
+        return statsResponseList;
+    }
+
 
     @Override
     public InfrastructureResponse listInfrastructure() {
@@ -480,6 +662,7 @@ public class MetricsServiceImpl extends ComponentLifecycleBase implements Metric
         cmdList.add(ListHostsMetricsCmd.class);
         cmdList.add(ListClustersMetricsCmd.class);
         cmdList.add(ListZonesMetricsCmd.class);
+        cmdList.add(ListVMsUsageHistoryCmd.class);
         return cmdList;
     }
 
diff --git a/plugins/metrics/src/main/java/org/apache/cloudstack/response/VmMetricsStatsResponse.java b/plugins/metrics/src/main/java/org/apache/cloudstack/response/VmMetricsStatsResponse.java
new file mode 100644
index 00000000000..0505bbd65d3
--- /dev/null
+++ b/plugins/metrics/src/main/java/org/apache/cloudstack/response/VmMetricsStatsResponse.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.response;
+
+import java.util.List;
+
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseResponse;
+import org.apache.cloudstack.api.response.StatsResponse;
+
+import com.cloud.serializer.Param;
+import com.google.gson.annotations.SerializedName;
+
+public class VmMetricsStatsResponse extends BaseResponse {
+    @SerializedName(ApiConstants.ID)
+    @Param(description = "the ID of the virtual machine")
+    private String id;
+
+    @SerializedName(ApiConstants.NAME)
+    @Param(description = "the name of the virtual machine")
+    private String name;
+
+    @SerializedName("displayname")
+    @Param(description = "user generated name. The name of the virtual machine is returned if no displayname exists.")
+    private String displayName;
+
+    @SerializedName("stats")
+    @Param(description = "the list of VM stats")
+    private List<StatsResponse> stats;
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setDisplayName(String displayName) {
+        this.displayName = displayName;
+    }
+
+    public void setStats(List<StatsResponse> stats) {
+        this.stats = stats;
+    }
+
+}
\ No newline at end of file
diff --git a/plugins/metrics/src/test/java/org/apache/cloudstack/metrics/MetricsServiceImplTest.java b/plugins/metrics/src/test/java/org/apache/cloudstack/metrics/MetricsServiceImplTest.java
new file mode 100644
index 00000000000..59ebaf7525b
--- /dev/null
+++ b/plugins/metrics/src/test/java/org/apache/cloudstack/metrics/MetricsServiceImplTest.java
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.metrics;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cloudstack.api.ListVMsUsageHistoryCmd;
+import org.apache.cloudstack.api.response.ListResponse;
+import org.apache.cloudstack.response.VmMetricsStatsResponse;
+import org.apache.commons.lang3.time.DateUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.utils.Pair;
+import com.cloud.utils.db.SearchBuilder;
+import com.cloud.utils.db.SearchCriteria;
+import com.cloud.vm.UserVmVO;
+import com.cloud.vm.VmStatsVO;
+import com.cloud.vm.dao.UserVmDao;
+import com.cloud.vm.dao.VmStatsDao;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MetricsServiceImplTest {
+
+    @Spy
+    @InjectMocks
+    MetricsServiceImpl spy;
+
+    @Mock
+    ListVMsUsageHistoryCmd listVMsUsageHistoryCmdMock;
+
+    @Mock
+    UserVmVO userVmVOMock;
+
+    @Mock
+    SearchBuilder<UserVmVO> sbMock;
+
+    @Mock
+    SearchCriteria<UserVmVO> scMock;
+
+    @Mock
+    UserVmDao userVmDaoMock;
+
+    @Mock
+    VmStatsDao vmStatsDaoMock;
+
+    @Captor
+    ArgumentCaptor<String> stringCaptor1, stringCaptor2;
+
+    @Captor
+    ArgumentCaptor<Object[]> objectArrayCaptor;
+
+    @Captor
+    ArgumentCaptor<SearchCriteria.Op> opCaptor;
+    long fakeVmId1 = 1L, fakeVmId2 = 2L;
+
+    Pair<List<UserVmVO>, Integer> expectedVmListAndCounter;
+
+    @Mock
+    Pair<List<UserVmVO>, Integer> expectedVmListAndCounterMock;
+
+    @Mock
+    Map<Long,List<VmStatsVO>> vmStatsMapMock;
+
+    @Mock
+    VmStatsVO vmStatsVOMock;
+
+    private void prepareSearchCriteriaWhenUseSetParameters() {
+        Mockito.doNothing().when(scMock).setParameters(Mockito.anyString(), Mockito.any());
+    }
+
+    private void preparesearchForUserVmsInternalTest() {
+        expectedVmListAndCounter = new Pair<List<UserVmVO>, Integer>(Arrays.asList(userVmVOMock), 1);
+
+        Mockito.doReturn(1L).when(listVMsUsageHistoryCmdMock).getStartIndex();
+        Mockito.doReturn(2L).when(listVMsUsageHistoryCmdMock).getPageSizeVal();
+
+        Mockito.doReturn(sbMock).when(userVmDaoMock).createSearchBuilder();
+        Mockito.doReturn(fakeVmId1).when(userVmVOMock).getId();
+        Mockito.doReturn(userVmVOMock).when(sbMock).entity();
+        Mockito.doReturn(scMock).when(sbMock).create();
+
+        Mockito.doReturn(new Pair<List<UserVmVO>, Integer>(Arrays.asList(userVmVOMock), 1))
+        .when(userVmDaoMock).searchAndCount(Mockito.any(), Mockito.any());
+    }
+
+    @Test
+    public void searchForUserVmsInternalTestWithOnlyOneId() {
+        preparesearchForUserVmsInternalTest();
+        prepareSearchCriteriaWhenUseSetParameters();
+        Mockito.doReturn(fakeVmId1).when(listVMsUsageHistoryCmdMock).getId();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getIds();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getName();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getKeyword();
+
+        Pair<List<UserVmVO>, Integer> result = spy.searchForUserVmsInternal(listVMsUsageHistoryCmdMock);
+
+        Mockito.verify(scMock).setParameters(stringCaptor1.capture(), objectArrayCaptor.capture());
+        Assert.assertEquals("idIN", stringCaptor1.getValue());
+        Assert.assertEquals(Arrays.asList(fakeVmId1), objectArrayCaptor.getAllValues());
+        Assert.assertEquals(expectedVmListAndCounter, result);
+    }
+
+    @Test
+    public void searchForUserVmsInternalTestWithAListOfIds() {
+        List<Long> expected =  Arrays.asList(fakeVmId1, fakeVmId2);
+        preparesearchForUserVmsInternalTest();
+        prepareSearchCriteriaWhenUseSetParameters();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getId();
+        Mockito.doReturn(expected).when(listVMsUsageHistoryCmdMock).getIds();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getName();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getKeyword();
+
+        Pair<List<UserVmVO>, Integer> result = spy.searchForUserVmsInternal(listVMsUsageHistoryCmdMock);
+
+        Mockito.verify(scMock).setParameters(stringCaptor1.capture(), objectArrayCaptor.capture());
+        Assert.assertEquals("idIN", stringCaptor1.getValue());
+        Assert.assertEquals(expected, objectArrayCaptor.getAllValues());
+        Assert.assertEquals(expectedVmListAndCounter, result);
+    }
+
+    @Test
+    public void searchForUserVmsInternalTestWithName() {
+        preparesearchForUserVmsInternalTest();
+        prepareSearchCriteriaWhenUseSetParameters();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getId();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getIds();
+        Mockito.doReturn("fakeName").when(listVMsUsageHistoryCmdMock).getName();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getKeyword();
+
+        Pair<List<UserVmVO>, Integer> result = spy.searchForUserVmsInternal(listVMsUsageHistoryCmdMock);
+
+        Mockito.verify(scMock).setParameters(stringCaptor1.capture(), objectArrayCaptor.capture());
+        Assert.assertEquals("displayName", stringCaptor1.getValue());
+        Assert.assertEquals("%fakeName%", objectArrayCaptor.getValue());
+        Assert.assertEquals(expectedVmListAndCounter, result);
+    }
+
+    @Test
+    public void searchForUserVmsInternalTestWithKeyword() {
+        preparesearchForUserVmsInternalTest();
+        prepareSearchCriteriaWhenUseSetParameters();
+        Mockito.doReturn(scMock).when(userVmDaoMock).createSearchCriteria();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getId();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getIds();
+        Mockito.doReturn(null).when(listVMsUsageHistoryCmdMock).getName();
+        Mockito.doReturn("fakeKeyword").when(listVMsUsageHistoryCmdMock).getKeyword();
+
+        Pair<List<UserVmVO>, Integer> result = spy.searchForUserVmsInternal(listVMsUsageHistoryCmdMock);
+
+        Mockito.verify(scMock, Mockito.times(2)).addOr(stringCaptor1.capture(), opCaptor.capture(), objectArrayCaptor.capture());
+        List<String> conditions = stringCaptor1.getAllValues();
+        List<Object[]> params = objectArrayCaptor.getAllValues();
+        Assert.assertEquals("displayName", conditions.get(0));
+        Assert.assertEquals("state", conditions.get(1));
+        Assert.assertEquals("%fakeKeyword%", params.get(0));
+        Assert.assertEquals("fakeKeyword", params.get(1));
+        Assert.assertEquals(expectedVmListAndCounter, result);
+    }
+
+    @Test
+    public void searchForVmMetricsStatsInternalTestWithAPopulatedListOfVms() {
+        Mockito.doNothing().when(spy).validateDateParams(Mockito.any(), Mockito.any());
+        Mockito.doReturn(new ArrayList<VmStatsVO>()).when(spy).findVmStatsAccordingToDateParams(
+                Mockito.anyLong(), Mockito.any(), Mockito.any());
+        Mockito.doReturn(fakeVmId1).when(userVmVOMock).getId();
+        Map<Long,List<VmStatsVO>> expected = new HashMap<Long,List<VmStatsVO>>();
+        expected.put(fakeVmId1, new ArrayList<VmStatsVO>());
+
+        Map<Long,List<VmStatsVO>> result = spy.searchForVmMetricsStatsInternal(
+                listVMsUsageHistoryCmdMock, Arrays.asList(userVmVOMock));
+
+        Mockito.verify(userVmVOMock).getId();
+        Mockito.verify(spy).findVmStatsAccordingToDateParams(
+                Mockito.anyLong(), Mockito.any(), Mockito.any());
+        Assert.assertEquals(expected, result);
+    }
+
+    @Test
+    public void searchForVmMetricsStatsInternalTestWithAnEmptyListOfVms() {
+        Mockito.doNothing().when(spy).validateDateParams(Mockito.any(), Mockito.any());
+        Map<Long,List<VmStatsVO>> expected = new HashMap<Long,List<VmStatsVO>>();
+
+        Map<Long,List<VmStatsVO>> result = spy.searchForVmMetricsStatsInternal(
+                listVMsUsageHistoryCmdMock, new ArrayList<UserVmVO>());
+
+        Mockito.verify(userVmVOMock, Mockito.never()).getId();
+        Mockito.verify(spy, Mockito.never()).findVmStatsAccordingToDateParams(
+                Mockito.anyLong(), Mockito.any(), Mockito.any());
+        Assert.assertEquals(expected, result);
+    }
+
+    @Test(expected = InvalidParameterValueException.class)
+    public void validateDateParamsTestWithEndDateBeforeStartDate() {
+        Date startDate = new Date();
+        Date endDate = DateUtils.addSeconds(startDate, -1);
+
+        spy.validateDateParams(startDate, endDate);
+    }
+
+    @Test
+    public void findVmStatsAccordingToDateParamsTestWithStartDateAndEndDate() {
+        Date startDate = new Date();
+        Date endDate = DateUtils.addSeconds(startDate, 1);
+        Mockito.doReturn(new ArrayList<VmStatsVO>()).when(vmStatsDaoMock).findByVmIdAndTimestampBetween(
+                Mockito.anyLong(), Mockito.any(), Mockito.any());
+
+        spy.findVmStatsAccordingToDateParams(fakeVmId1, startDate, endDate);
+
+        Mockito.verify(vmStatsDaoMock).findByVmIdAndTimestampBetween(
+              Mockito.anyLong(), Mockito.any(), Mockito.any());
+    }
+
+    @Test
+    public void findVmStatsAccordingToDateParamsTestWithOnlyStartDate() {
+        Date startDate = new Date();
+        Date endDate = null;
+        Mockito.doReturn(new ArrayList<VmStatsVO>()).when(vmStatsDaoMock).findByVmIdAndTimestampGreaterThanEqual(
+                Mockito.anyLong(), Mockito.any());
+
+        spy.findVmStatsAccordingToDateParams(fakeVmId1, startDate, endDate);
+
+        Mockito.verify(vmStatsDaoMock).findByVmIdAndTimestampGreaterThanEqual(
+              Mockito.anyLong(), Mockito.any());
+    }
+
+    @Test
+    public void findVmStatsAccordingToDateParamsTestWithOnlyEndDate() {
+        Date startDate = null;
+        Date endDate = new Date();
+        Mockito.doReturn(new ArrayList<VmStatsVO>()).when(vmStatsDaoMock).findByVmIdAndTimestampLessThanEqual(
+                Mockito.anyLong(), Mockito.any());
+
+        spy.findVmStatsAccordingToDateParams(fakeVmId1, startDate, endDate);
+
+        Mockito.verify(vmStatsDaoMock).findByVmIdAndTimestampLessThanEqual(
+              Mockito.anyLong(), Mockito.any());
+    }
+
+    @Test
+    public void findVmStatsAccordingToDateParamsTestWithNoDate() {
+        Mockito.doReturn(new ArrayList<VmStatsVO>()).when(vmStatsDaoMock).findByVmId(Mockito.anyLong());
+
+        spy.findVmStatsAccordingToDateParams(fakeVmId1, null, null);
+
+        Mockito.verify(vmStatsDaoMock).findByVmId(Mockito.anyLong());
+    }
+
+    @Test
+    public void createVmMetricsStatsResponseTestWithValidInput() {
+        Mockito.doReturn("").when(userVmVOMock).getUuid();
+        Mockito.doReturn("").when(userVmVOMock).getName();
+        Mockito.doReturn("").when(userVmVOMock).getDisplayName();
+        Mockito.doReturn(fakeVmId1).when(userVmVOMock).getId();
+        Mockito.doReturn(Arrays.asList(userVmVOMock)).when(expectedVmListAndCounterMock).first();
+        Mockito.doReturn(null).when(vmStatsMapMock).get(Mockito.any());
+        Mockito.doReturn(null).when(spy).createStatsResponse(Mockito.any());
+
+        ListResponse<VmMetricsStatsResponse> result = spy.createVmMetricsStatsResponse(
+                expectedVmListAndCounterMock, vmStatsMapMock);
+
+        Assert.assertEquals(Integer.valueOf(1), result.getCount());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void createVmMetricsStatsResponseTestWithNoUserVmList() {
+        spy.createVmMetricsStatsResponse(null, vmStatsMapMock);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void createVmMetricsStatsResponseTestWithNoVmStatsList() {
+        spy.createVmMetricsStatsResponse(expectedVmListAndCounterMock, null);
+    }
+
+    @Test
+    public void createStatsResponseTestWithValidStatsData() {
+        final String fakeVmStatsData = "{\"vmId\":1,\"cpuUtilization\":15.615905273486222,\"networkReadKBs\":0.1,"
+                + "\"networkWriteKBs\":0.2,\"diskReadIOs\":0.1,\"diskWriteIOs\":0.2,\"diskReadKBs\":1.1,"
+                + "\"diskWriteKBs\":2.1,\"memoryKBs\":262144.0,\"intfreememoryKBs\":262144.0,\"targetmemoryKBs\":262144.0,"
+                + "\"numCPUs\":1,\"entityType\":\"vm\"}";
+        Mockito.doReturn(new Date()).when(vmStatsVOMock).getTimestamp();
+        Mockito.doReturn(fakeVmStatsData).when(vmStatsVOMock).getVmStatsData();
+
+        spy.createStatsResponse(Arrays.asList(vmStatsVOMock));
+    }
+}
diff --git a/server/src/main/java/com/cloud/api/ApiDBUtils.java b/server/src/main/java/com/cloud/api/ApiDBUtils.java
index 4e74f0d9738..9a29eb46c42 100644
--- a/server/src/main/java/com/cloud/api/ApiDBUtils.java
+++ b/server/src/main/java/com/cloud/api/ApiDBUtils.java
@@ -994,8 +994,8 @@ public class ApiDBUtils {
         return s_statsCollector.getStoragePoolStats(id);
     }
 
-    public static VmStats getVmStatistics(long hostId) {
-        return s_statsCollector.getVmStats(hostId);
+    public static VmStats getVmStatistics(long vmId, Boolean accumulate) {
+        return s_statsCollector.getVmStats(vmId, accumulate);
     }
 
     public static VolumeStats getVolumeStatistics(String volumeUuid) {
@@ -1805,7 +1805,11 @@ public class ApiDBUtils {
     }
 
     public static UserVmResponse newUserVmResponse(ResponseView view, String objectName, UserVmJoinVO userVm, EnumSet<VMDetails> details, Account caller) {
-        return s_userVmJoinDao.newUserVmResponse(view, objectName, userVm, details, caller);
+        return s_userVmJoinDao.newUserVmResponse(view, objectName, userVm, details, null, caller);
+    }
+
+    public static UserVmResponse newUserVmResponse(ResponseView view, String objectName, UserVmJoinVO userVm, EnumSet<VMDetails> details, Boolean accumulateStats, Account caller) {
+        return s_userVmJoinDao.newUserVmResponse(view, objectName, userVm, details, accumulateStats, caller);
     }
 
     public static UserVmResponse fillVmDetails(ResponseView view, UserVmResponse vmData, UserVmJoinVO vm) {
diff --git a/server/src/main/java/com/cloud/api/dispatch/ParamProcessWorker.java b/server/src/main/java/com/cloud/api/dispatch/ParamProcessWorker.java
index a8506342b72..e2db9f29b20 100644
--- a/server/src/main/java/com/cloud/api/dispatch/ParamProcessWorker.java
+++ b/server/src/main/java/com/cloud/api/dispatch/ParamProcessWorker.java
@@ -46,12 +46,6 @@ import org.apache.cloudstack.api.EntityReference;
 import org.apache.cloudstack.api.InternalIdentity;
 import org.apache.cloudstack.api.Parameter;
 import org.apache.cloudstack.api.ServerApiException;
-import org.apache.cloudstack.api.command.admin.resource.ArchiveAlertsCmd;
-import org.apache.cloudstack.api.command.admin.resource.DeleteAlertsCmd;
-import org.apache.cloudstack.api.command.admin.usage.ListUsageRecordsCmd;
-import org.apache.cloudstack.api.command.user.event.ArchiveEventsCmd;
-import org.apache.cloudstack.api.command.user.event.DeleteEventsCmd;
-import org.apache.cloudstack.api.command.user.event.ListEventsCmd;
 import org.apache.cloudstack.context.CallContext;
 import org.apache.log4j.Logger;
 
@@ -59,6 +53,7 @@ import com.cloud.exception.InvalidParameterValueException;
 import com.cloud.user.Account;
 import com.cloud.user.AccountManager;
 import com.cloud.utils.DateUtil;
+import com.cloud.utils.UuidUtils;
 import com.cloud.utils.db.EntityManager;
 import com.cloud.utils.exception.CloudRuntimeException;
 import org.apache.commons.lang3.StringUtils;
@@ -93,7 +88,7 @@ public class ParamProcessWorker implements DispatchWorker {
 
     private void validateNonEmptyString(final Object param, final String argName) {
         if (param == null || StringUtils.isEmpty(param.toString())) {
-            throw new InvalidParameterValueException(String.format("Empty or null value provided for API arg: %s", argName));
+            throwInvalidParameterValueException(argName);
         }
     }
 
@@ -105,10 +100,22 @@ public class ParamProcessWorker implements DispatchWorker {
             value = Long.valueOf(param.toString());
         }
         if (value == null || value < 1L) {
-            throw new InvalidParameterValueException(String.format("Invalid value provided for API arg: %s", argName));
+            throwInvalidParameterValueException(argName);
         }
     }
 
+    private void validateUuidString(final Object param, final String argName) {
+        String value = String.valueOf(param);
+
+        if (!UuidUtils.validateUUID(value)) {
+            throwInvalidParameterValueException(argName);
+        }
+    }
+
+    protected void throwInvalidParameterValueException(String argName) {
+        throw new InvalidParameterValueException(String.format("Invalid value provided for API arg: %s", argName));
+    }
+
     private void validateField(final Object paramObj, final Parameter annotation) throws ServerApiException {
         if (annotation == null) {
             return;
@@ -136,6 +143,13 @@ public class ParamProcessWorker implements DispatchWorker {
                             break;
                     }
                     break;
+                case UuidString:
+                    switch (annotation.type()) {
+                        case STRING:
+                            validateUuidString(paramObj, argName);
+                            break;
+                    }
+                    break;
             }
         }
     }
@@ -312,31 +326,22 @@ public class ParamProcessWorker implements DispatchWorker {
             case DATE:
                 // This piece of code is for maintaining backward compatibility
                 // and support both the date formats(Bug 9724)
-                if (cmdObj instanceof ListEventsCmd || cmdObj instanceof DeleteEventsCmd || cmdObj instanceof ArchiveEventsCmd ||
-                        cmdObj instanceof ArchiveAlertsCmd || cmdObj instanceof DeleteAlertsCmd || cmdObj instanceof ListUsageRecordsCmd) {
-                    final boolean isObjInNewDateFormat = isObjInNewDateFormat(paramObj.toString());
-                    if (isObjInNewDateFormat) {
-                        final DateFormat newFormat = newInputFormat;
-                        synchronized (newFormat) {
-                            field.set(cmdObj, newFormat.parse(paramObj.toString()));
-                        }
-                    } else {
-                        final DateFormat format = inputFormat;
-                        synchronized (format) {
-                            Date date = format.parse(paramObj.toString());
-                            if (field.getName().equals("startDate")) {
-                                date = messageDate(date, 0, 0, 0);
-                            } else if (field.getName().equals("endDate")) {
-                                date = messageDate(date, 23, 59, 59);
-                            }
-                            field.set(cmdObj, date);
-                        }
+                final boolean isObjInNewDateFormat = isObjInNewDateFormat(paramObj.toString());
+                if (isObjInNewDateFormat) {
+                    final DateFormat newFormat = newInputFormat;
+                    synchronized (newFormat) {
+                        field.set(cmdObj, newFormat.parse(paramObj.toString()));
                     }
                 } else {
                     final DateFormat format = inputFormat;
                     synchronized (format) {
-                        format.setLenient(false);
-                        field.set(cmdObj, format.parse(paramObj.toString()));
+                        Date date = format.parse(paramObj.toString());
+                        if (field.getName().equals("startDate")) {
+                            date = messageDate(date, 0, 0, 0);
+                        } else if (field.getName().equals("endDate")) {
+                            date = messageDate(date, 23, 59, 59);
+                        }
+                        field.set(cmdObj, date);
                     }
                 }
                 break;
@@ -453,7 +458,7 @@ public class ParamProcessWorker implements DispatchWorker {
         // If annotation's empty, the cmd existed before 3.x try conversion to long
         final boolean isPre3x = annotation.since().isEmpty();
         // Match against Java's UUID regex to check if input is uuid string
-        final boolean isUuid = uuid.matches("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$");
+        final boolean isUuid = UuidUtils.validateUUID(uuid);
         // Enforce that it's uuid for newly added apis from version 3.x
         if (!isPre3x && !isUuid)
             return null;
diff --git a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
index b9ea6f0b457..3f481d84fc2 100644
--- a/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
+++ b/server/src/main/java/com/cloud/api/query/QueryManagerImpl.java
@@ -928,7 +928,7 @@ public class QueryManagerImpl extends MutualExclusiveIdsManagerBase implements Q
         if (_accountMgr.isRootAdmin(caller.getId())) {
             respView = ResponseView.Full;
         }
-        List<UserVmResponse> vmResponses = ViewResponseHelper.createUserVmResponse(respView, "virtualmachine", cmd.getDetails(), result.first().toArray(new UserVmJoinVO[result.first().size()]));
+        List<UserVmResponse> vmResponses = ViewResponseHelper.createUserVmResponse(respView, "virtualmachine", cmd.getDetails(), cmd.getAccumulate(), result.first().toArray(new UserVmJoinVO[result.first().size()]));
 
         response.setResponses(vmResponses, result.second());
         return response;
diff --git a/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java b/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java
index ed6f9514ab9..cfa45097455 100644
--- a/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java
+++ b/server/src/main/java/com/cloud/api/query/ViewResponseHelper.java
@@ -134,14 +134,16 @@ public class ViewResponseHelper {
         return respList;
     }
 
-
     public static List<UserVmResponse> createUserVmResponse(ResponseView view, String objectName, UserVmJoinVO... userVms) {
-        return createUserVmResponse(view, objectName, EnumSet.of(VMDetails.all), userVms);
+        return createUserVmResponse(view, objectName, EnumSet.of(VMDetails.all), null, userVms);
     }
 
     public static List<UserVmResponse> createUserVmResponse(ResponseView view, String objectName, EnumSet<VMDetails> details, UserVmJoinVO... userVms) {
-        Account caller = CallContext.current().getCallingAccount();
+        return createUserVmResponse(view, objectName, details, null, userVms);
+    }
 
+    public static List<UserVmResponse> createUserVmResponse(ResponseView view, String objectName, EnumSet<VMDetails> details, Boolean accumulateStats, UserVmJoinVO... userVms) {
+        Account caller = CallContext.current().getCallingAccount();
         Hashtable<Long, UserVmResponse> vmDataList = new Hashtable<Long, UserVmResponse>();
         // Initialise the vmdatalist with the input data
 
@@ -149,7 +151,7 @@ public class ViewResponseHelper {
             UserVmResponse userVmData = vmDataList.get(userVm.getId());
             if (userVmData == null) {
                 // first time encountering this vm
-                userVmData = ApiDBUtils.newUserVmResponse(view, objectName, userVm, details, caller);
+                userVmData = ApiDBUtils.newUserVmResponse(view, objectName, userVm, details, accumulateStats, caller);
             } else{
                 // update nics, securitygroups, tags, affinitygroups for 1 to many mapping fields
                 userVmData = ApiDBUtils.fillVmDetails(view, userVmData, userVm);
diff --git a/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDao.java b/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDao.java
index 87af584024b..952a20e8b0d 100644
--- a/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDao.java
+++ b/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDao.java
@@ -30,7 +30,7 @@ import com.cloud.utils.db.GenericDao;
 
 public interface UserVmJoinDao extends GenericDao<UserVmJoinVO, Long> {
 
-    UserVmResponse newUserVmResponse(ResponseView view, String objectName, UserVmJoinVO userVm, EnumSet<VMDetails> details, Account caller);
+    UserVmResponse newUserVmResponse(ResponseView view, String objectName, UserVmJoinVO userVm, EnumSet<VMDetails> details, Boolean accumulateStats, Account caller);
 
     UserVmResponse setUserVmResponse(ResponseView view, UserVmResponse userVmData, UserVmJoinVO uvo);
 
diff --git a/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDaoImpl.java b/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDaoImpl.java
index 8b2fd255959..0e99f9a9cff 100644
--- a/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDaoImpl.java
+++ b/server/src/main/java/com/cloud/api/query/dao/UserVmJoinDaoImpl.java
@@ -118,7 +118,7 @@ public class UserVmJoinDaoImpl extends GenericDaoBaseWithTagInformation<UserVmJo
     }
 
     @Override
-    public UserVmResponse newUserVmResponse(ResponseView view, String objectName, UserVmJoinVO userVm, EnumSet<VMDetails> details, Account caller) {
+    public UserVmResponse newUserVmResponse(ResponseView view, String objectName, UserVmJoinVO userVm, EnumSet<VMDetails> details, Boolean accumulateStats, Account caller) {
         UserVmResponse userVmResponse = new UserVmResponse();
 
         if (userVm.getHypervisorType() != null) {
@@ -228,7 +228,7 @@ public class UserVmJoinDaoImpl extends GenericDaoBaseWithTagInformation<UserVmJo
 
         if (details.contains(VMDetails.all) || details.contains(VMDetails.stats)) {
             // stats calculation
-            VmStats vmStats = ApiDBUtils.getVmStatistics(userVm.getId());
+            VmStats vmStats = ApiDBUtils.getVmStatistics(userVm.getId(), accumulateStats);
             if (vmStats != null) {
                 userVmResponse.setCpuUsed(new DecimalFormat("#.##").format(vmStats.getCPUUtilization()) + "%");
                 userVmResponse.setNetworkKbsRead((long)vmStats.getNetworkReadKBs());
diff --git a/server/src/main/java/com/cloud/server/StatsCollector.java b/server/src/main/java/com/cloud/server/StatsCollector.java
index 9b193c04b4a..ecea1318f17 100644
--- a/server/src/main/java/com/cloud/server/StatsCollector.java
+++ b/server/src/main/java/com/cloud/server/StatsCollector.java
@@ -47,10 +47,12 @@ import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
 import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
 import org.apache.cloudstack.utils.graphite.GraphiteClient;
 import org.apache.cloudstack.utils.graphite.GraphiteException;
+import org.apache.cloudstack.utils.identity.ManagementServerNode;
 import org.apache.cloudstack.utils.usage.UsageUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateUtils;
 import org.apache.commons.lang3.BooleanUtils;
 import org.apache.log4j.Logger;
 import org.influxdb.BatchOptions;
@@ -70,6 +72,7 @@ import com.cloud.agent.api.VgpuTypesInfo;
 import com.cloud.agent.api.VmDiskStatsEntry;
 import com.cloud.agent.api.VmNetworkStatsEntry;
 import com.cloud.agent.api.VmStatsEntry;
+import com.cloud.agent.api.VmStatsEntryBase;
 import com.cloud.agent.api.VolumeStatsEntry;
 import com.cloud.capacity.CapacityManager;
 import com.cloud.cluster.ManagementServerHostVO;
@@ -142,9 +145,12 @@ import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VMInstanceVO;
 import com.cloud.vm.VirtualMachine;
 import com.cloud.vm.VmStats;
+import com.cloud.vm.VmStatsVO;
 import com.cloud.vm.dao.NicDao;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.VMInstanceDao;
+import com.cloud.vm.dao.VmStatsDao;
+import com.google.gson.Gson;
 
 import static com.cloud.utils.NumbersUtil.toHumanReadableSize;
 import org.apache.commons.io.FileUtils;
@@ -222,12 +228,16 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
     private static final ConfigKey<String> statsOutputUri = new ConfigKey<String>("Advanced", String.class, "stats.output.uri", "",
             "URI to send StatsCollector statistics to. The collector is defined on the URI scheme. Example: graphite://graphite-hostaddress:port or influxdb://influxdb-hostaddress/dbname. Note that the port is optional, if not added the default port for the respective collector (graphite or influxdb) will be used. Additionally, the database name '/dbname' is  also optional; default db name is 'cloudstack'. You must create and configure the database if using influxdb.",
             true);
-    private static final ConfigKey<Boolean> VM_STATS_INCREMENT_METRICS_IN_MEMORY = new ConfigKey<Boolean>("Advanced", Boolean.class, "vm.stats.increment.metrics.in.memory", "true",
-            "When set to 'true', VM metrics(NetworkReadKBs, NetworkWriteKBs, DiskWriteKBs, DiskReadKBs, DiskReadIOs and DiskWriteIOs) that are collected from the hypervisor are summed and stored in memory. "
+    protected static ConfigKey<Boolean> vmStatsIncrementMetrics = new ConfigKey<Boolean>("Advanced", Boolean.class, "vm.stats.increment.metrics", "true",
+            "When set to 'true', VM metrics(NetworkReadKBs, NetworkWriteKBs, DiskWriteKBs, DiskReadKBs, DiskReadIOs and DiskWriteIOs) that are collected from the hypervisor are summed before being returned."
             + "On the other hand, when set to 'false', the VM metrics API will just display the latest metrics collected.", true);
+    protected static ConfigKey<Integer> vmStatsMaxRetentionTime = new ConfigKey<Integer>("Advanced", Integer.class, "vm.stats.max.retention.time", "1",
+            "The maximum time (in minutes) for keeping VM stats records in the database. The VM stats cleanup process will be disabled if this is set to 0 or less than 0.", true);
 
     private static StatsCollector s_instance = null;
 
+    private static Gson gson = new Gson();
+
     private ScheduledExecutorService _executor = null;
     @Inject
     private AgentManager _agentMgr;
@@ -240,6 +250,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
     @Inject
     protected UserVmDao _userVmDao;
     @Inject
+    protected VmStatsDao vmStatsDao;
+    @Inject
     private VolumeDao _volsDao;
     @Inject
     private PrimaryDataStoreDao _storagePoolDao;
@@ -289,6 +301,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
     private HostGpuGroupsDao _hostGpuGroupsDao;
     @Inject
     private ImageStoreDetailsUtil imageStoreDetailsUtil;
+    @Inject
+    private ManagementServerHostDao managementServerHostDao;
 
     private ConcurrentHashMap<Long, HostStats> _hostStats = new ConcurrentHashMap<Long, HostStats>();
     protected ConcurrentHashMap<Long, VmStats> _VmStats = new ConcurrentHashMap<Long, VmStats>();
@@ -296,8 +310,10 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
     private ConcurrentHashMap<Long, StorageStats> _storageStats = new ConcurrentHashMap<Long, StorageStats>();
     private ConcurrentHashMap<Long, StorageStats> _storagePoolStats = new ConcurrentHashMap<Long, StorageStats>();
 
+    private static final long DEFAULT_INITIAL_DELAY = 15000L;
+
     private long hostStatsInterval = -1L;
-    private long hostAndVmStatsInterval = -1L;
+    private long vmStatsInterval = -1L;
     private long storageStatsInterval = -1L;
     private long volumeStatsInterval = -1L;
     private long autoScaleStatsInterval = -1L;
@@ -315,8 +331,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
     private final long mgmtSrvrId = MacAddress.getMacAddress().toLong();
     private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5;    // 5 seconds
     private boolean _dailyOrHourly = false;
-
-    //private final GlobalLock m_capacityCheckLock = GlobalLock.getInternLock("capacity.check");
+    protected long managementServerNodeId = ManagementServerNode.getManagementServerId();
+    protected long msId = managementServerNodeId;
 
     public static StatsCollector getInstance() {
         return s_instance;
@@ -341,7 +357,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
         _executor = Executors.newScheduledThreadPool(6, new NamedThreadFactory("StatsCollector"));
 
         hostStatsInterval = NumbersUtil.parseLong(configs.get("host.stats.interval"), ONE_MINUTE_IN_MILLISCONDS);
-        hostAndVmStatsInterval = NumbersUtil.parseLong(configs.get("vm.stats.interval"), ONE_MINUTE_IN_MILLISCONDS);
+        vmStatsInterval = NumbersUtil.parseLong(configs.get("vm.stats.interval"), ONE_MINUTE_IN_MILLISCONDS);
         storageStatsInterval = NumbersUtil.parseLong(configs.get("storage.stats.interval"), ONE_MINUTE_IN_MILLISCONDS);
         volumeStatsInterval = NumbersUtil.parseLong(configs.get("volume.stats.interval"), ONE_MINUTE_IN_MILLISCONDS);
         autoScaleStatsInterval = NumbersUtil.parseLong(configs.get("autoscale.stats.interval"), ONE_MINUTE_IN_MILLISCONDS);
@@ -383,19 +399,23 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
         }
 
         if (hostStatsInterval > 0) {
-            _executor.scheduleWithFixedDelay(new HostCollector(), 15000L, hostStatsInterval, TimeUnit.MILLISECONDS);
+            _executor.scheduleWithFixedDelay(new HostCollector(), DEFAULT_INITIAL_DELAY, hostStatsInterval, TimeUnit.MILLISECONDS);
         }
 
-        if (hostAndVmStatsInterval > 0) {
-            _executor.scheduleWithFixedDelay(new VmStatsCollector(), 15000L, hostAndVmStatsInterval, TimeUnit.MILLISECONDS);
+        if (vmStatsInterval > 0) {
+            _executor.scheduleWithFixedDelay(new VmStatsCollector(), DEFAULT_INITIAL_DELAY, vmStatsInterval, TimeUnit.MILLISECONDS);
+        } else {
+            s_logger.info("Skipping collect VM stats. The global parameter vm.stats.interval is set to 0 or less than 0.");
         }
 
+        _executor.scheduleWithFixedDelay(new VmStatsCleaner(), DEFAULT_INITIAL_DELAY, 60000L, TimeUnit.MILLISECONDS);
+
         if (storageStatsInterval > 0) {
-            _executor.scheduleWithFixedDelay(new StorageCollector(), 15000L, storageStatsInterval, TimeUnit.MILLISECONDS);
+            _executor.scheduleWithFixedDelay(new StorageCollector(), DEFAULT_INITIAL_DELAY, storageStatsInterval, TimeUnit.MILLISECONDS);
         }
 
         if (autoScaleStatsInterval > 0) {
-            _executor.scheduleWithFixedDelay(new AutoScaleMonitor(), 15000L, autoScaleStatsInterval, TimeUnit.MILLISECONDS);
+            _executor.scheduleWithFixedDelay(new AutoScaleMonitor(), DEFAULT_INITIAL_DELAY, autoScaleStatsInterval, TimeUnit.MILLISECONDS);
         }
 
         if (vmDiskStatsInterval.value() > 0) {
@@ -423,7 +443,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
         }
 
         if (volumeStatsInterval > 0) {
-            _executor.scheduleAtFixedRate(new VolumeStatsTask(), 15000L, volumeStatsInterval, TimeUnit.MILLISECONDS);
+            _executor.scheduleAtFixedRate(new VolumeStatsTask(), DEFAULT_INITIAL_DELAY, volumeStatsInterval, TimeUnit.MILLISECONDS);
         }
 
         //Schedule disk stats update task
@@ -467,6 +487,14 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 
         long period = _usageAggregationRange * ONE_MINUTE_IN_MILLISCONDS;
         _diskStatsUpdateExecutor.scheduleAtFixedRate(new VmDiskStatsUpdaterTask(), (endDate - System.currentTimeMillis()), period, TimeUnit.MILLISECONDS);
+
+        ManagementServerHostVO mgmtServerVo = managementServerHostDao.findByMsid(managementServerNodeId);
+        if (mgmtServerVo != null) {
+            msId = mgmtServerVo.getId();
+        } else {
+            s_logger.warn(String.format("Cannot find management server with msid [%s]. "
+                    + "Therefore, VM stats will be recorded with the management server MAC address converted as a long in the mgmt_server_id column.", managementServerNodeId));
+        }
     }
 
     /**
@@ -568,7 +596,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
         @Override
         protected void runInContext() {
             try {
-                s_logger.trace("VmStatsCollector is running...");
+                s_logger.debug("VmStatsCollector is running...");
 
                 SearchCriteria<HostVO> sc = createSearchCriteriaForHostTypeRoutingStateUpAndNotInMaintenance();
                 List<HostVO> hosts = _hostDao.search(sc, null);
@@ -577,6 +605,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 
                 for (HostVO host : hosts) {
                     List<UserVmVO> vms = _userVmDao.listRunningByHostId(host.getId());
+                    Date timestamp = new Date();
+
                     List<Long> vmIds = new ArrayList<Long>();
 
                     for (UserVmVO vm : vms) {
@@ -594,7 +624,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
                                 UserVmVO userVmVo = _userVmDao.findById(vmId);
                                 statsForCurrentIteration.setUserVmVO(userVmVo);
 
-                                storeVirtualMachineStatsInMemory(statsForCurrentIteration);
+                                persistVirtualMachineStats(statsForCurrentIteration, timestamp);
 
                                 if (externalStatsType == ExternalStatsProtocol.GRAPHITE) {
                                     prepareVmMetricsForGraphite(metrics, statsForCurrentIteration);
@@ -619,8 +649,6 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
                     }
                 }
 
-                cleanUpVirtualMachineStats();
-
             } catch (Throwable t) {
                 s_logger.error("Error trying to retrieve VM stats", t);
             }
@@ -632,8 +660,90 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
         }
     }
 
-    public VmStats getVmStats(long id) {
-        return _VmStats.get(id);
+    /**
+     * <p>Previously, the VM stats cleanup process was triggered during the data collection process.
+     * So, when data collection was disabled, the cleaning process was also disabled.</p>
+     *
+     * <p>With the introduction of persistence of VM stats, as well as the provision of historical data,
+     * we created this class to allow that both the collection process and the data cleaning process
+     * can be enabled/disabled independently.</p>
+     */
+    class VmStatsCleaner extends ManagedContextRunnable{
+        protected void runInContext() {
+            cleanUpVirtualMachineStats();
+        }
+    }
+
+    /**
+     * Gets the latest or the accumulation of the stats collected from a given VM.
+     *
+     * @param vmId the specific VM.
+     * @param accumulate whether or not the stats data should be accumulated.
+     * @return the latest or the accumulation of the stats for the specified VM.
+     */
+    public VmStats getVmStats(long vmId, Boolean accumulate) {
+        List<VmStatsVO> vmStatsVOList = vmStatsDao.findByVmIdOrderByTimestampDesc(vmId);
+
+        if (CollectionUtils.isEmpty(vmStatsVOList)) {
+            return null;
+        }
+
+        if (accumulate != null) {
+            return getLatestOrAccumulatedVmMetricsStats(vmStatsVOList, accumulate.booleanValue());
+        }
+        return getLatestOrAccumulatedVmMetricsStats(vmStatsVOList, BooleanUtils.toBoolean(vmStatsIncrementMetrics.value()));
+    }
+
+    /**
+     * Gets the latest or the accumulation of a list of VM stats.<br>
+     * It extracts the stats data from the VmStatsVO.
+     *
+     * @param vmStatsVOList the list of VM stats.
+     * @param accumulate {@code true} if the data should be accumulated, {@code false} otherwise.
+     * @return the {@link VmStatsEntry} containing the latest or the accumulated stats.
+     */
+    protected VmStatsEntry getLatestOrAccumulatedVmMetricsStats (List<VmStatsVO> vmStatsVOList, boolean accumulate) {
+        if (accumulate) {
+            return accumulateVmMetricsStats(vmStatsVOList);
+        }
+        return gson.fromJson(vmStatsVOList.get(0).getVmStatsData(), VmStatsEntry.class);
+    }
+
+    /**
+     * Accumulates (I/O) stats for a given VM.
+     *
+     * @param vmStatsVOList the list of stats for a given VM.
+     * @return the {@link VmStatsEntry} containing the accumulated (I/O) stats.
+     */
+    protected VmStatsEntry accumulateVmMetricsStats(List<VmStatsVO> vmStatsVOList) {
+        VmStatsEntry latestVmStats = gson.fromJson(vmStatsVOList.remove(0).getVmStatsData(), VmStatsEntry.class);
+
+        VmStatsEntry vmStatsEntry = new VmStatsEntry();
+        vmStatsEntry.setEntityType(latestVmStats.getEntityType());
+        vmStatsEntry.setVmId(latestVmStats.getVmId());
+        vmStatsEntry.setCPUUtilization(latestVmStats.getCPUUtilization());
+        vmStatsEntry.setNumCPUs(latestVmStats.getNumCPUs());
+        vmStatsEntry.setMemoryKBs(latestVmStats.getMemoryKBs());
+        vmStatsEntry.setIntFreeMemoryKBs(latestVmStats.getIntFreeMemoryKBs());
+        vmStatsEntry.setTargetMemoryKBs(latestVmStats.getTargetMemoryKBs());
+        vmStatsEntry.setNetworkReadKBs(latestVmStats.getNetworkReadKBs());
+        vmStatsEntry.setNetworkWriteKBs(latestVmStats.getNetworkWriteKBs());
+        vmStatsEntry.setDiskWriteKBs(latestVmStats.getDiskWriteKBs());
+        vmStatsEntry.setDiskReadIOs(latestVmStats.getDiskReadIOs());
+        vmStatsEntry.setDiskWriteIOs(latestVmStats.getDiskWriteIOs());
+        vmStatsEntry.setDiskReadKBs(latestVmStats.getDiskReadKBs());
+
+        for (VmStatsVO vmStatsVO : vmStatsVOList) {
+            VmStatsEntry currentVmStatsEntry = gson.fromJson(vmStatsVO.getVmStatsData(), VmStatsEntry.class);
+
+            vmStatsEntry.setNetworkReadKBs(vmStatsEntry.getNetworkReadKBs() + currentVmStatsEntry.getNetworkReadKBs());
+            vmStatsEntry.setNetworkWriteKBs(vmStatsEntry.getNetworkWriteKBs() + currentVmStatsEntry.getNetworkWriteKBs());
+            vmStatsEntry.setDiskReadKBs(vmStatsEntry.getDiskReadKBs() + currentVmStatsEntry.getDiskReadKBs());
+            vmStatsEntry.setDiskWriteKBs(vmStatsEntry.getDiskWriteKBs() + currentVmStatsEntry.getDiskWriteKBs());
+            vmStatsEntry.setDiskReadIOs(vmStatsEntry.getDiskReadIOs() + currentVmStatsEntry.getDiskReadIOs());
+            vmStatsEntry.setDiskWriteIOs(vmStatsEntry.getDiskWriteIOs() + currentVmStatsEntry.getDiskWriteIOs());
+        }
+        return vmStatsEntry;
     }
 
     class VmDiskStatsUpdaterTask extends ManagedContextRunnable {
@@ -1460,57 +1570,36 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
     }
 
     /**
-     * Stores virtual machine stats in memory (map of {@link VmStatsEntry}).
+     * Persists VM stats in the database.
+     * @param statsForCurrentIteration the metrics stats data to persist.
+     * @param timestamp the time that will be stamped.
      */
-    private void storeVirtualMachineStatsInMemory(VmStatsEntry statsForCurrentIteration) {
-        VmStatsEntry statsInMemory = (VmStatsEntry)_VmStats.get(statsForCurrentIteration.getVmId());
-
-        boolean vmStatsIncrementMetrics = BooleanUtils.toBoolean(VM_STATS_INCREMENT_METRICS_IN_MEMORY.value());
-        if (statsInMemory == null || !vmStatsIncrementMetrics) {
-            _VmStats.put(statsForCurrentIteration.getVmId(), statsForCurrentIteration);
-        } else {
-            s_logger.debug(String.format("Increment saved values of NetworkReadKBs, NetworkWriteKBs, DiskWriteKBs, DiskReadKBs, DiskReadIOs, DiskWriteIOs, with current metrics for VM with ID [%s]. "
-                    + "To change this process, check value of 'vm.stats.increment.metrics.in.memory' configuration.", statsForCurrentIteration.getVmId()));
-            statsInMemory.setCPUUtilization(statsForCurrentIteration.getCPUUtilization());
-            statsInMemory.setNumCPUs(statsForCurrentIteration.getNumCPUs());
-            statsInMemory.setNetworkReadKBs(statsInMemory.getNetworkReadKBs() + statsForCurrentIteration.getNetworkReadKBs());
-            statsInMemory.setNetworkWriteKBs(statsInMemory.getNetworkWriteKBs() + statsForCurrentIteration.getNetworkWriteKBs());
-            statsInMemory.setDiskWriteKBs(statsInMemory.getDiskWriteKBs() + statsForCurrentIteration.getDiskWriteKBs());
-            statsInMemory.setDiskReadIOs(statsInMemory.getDiskReadIOs() + statsForCurrentIteration.getDiskReadIOs());
-            statsInMemory.setDiskWriteIOs(statsInMemory.getDiskWriteIOs() + statsForCurrentIteration.getDiskWriteIOs());
-            statsInMemory.setDiskReadKBs(statsInMemory.getDiskReadKBs() + statsForCurrentIteration.getDiskReadKBs());
-            statsInMemory.setMemoryKBs(statsForCurrentIteration.getMemoryKBs());
-            statsInMemory.setIntFreeMemoryKBs(statsForCurrentIteration.getIntFreeMemoryKBs());
-            statsInMemory.setTargetMemoryKBs(statsForCurrentIteration.getTargetMemoryKBs());
-
-            _VmStats.put(statsForCurrentIteration.getVmId(), statsInMemory);
-        }
+    protected void persistVirtualMachineStats(VmStatsEntry statsForCurrentIteration, Date timestamp) {
+        VmStatsEntryBase vmStats = new VmStatsEntryBase(statsForCurrentIteration.getVmId(), statsForCurrentIteration.getMemoryKBs(), statsForCurrentIteration.getIntFreeMemoryKBs(),
+                statsForCurrentIteration.getTargetMemoryKBs(), statsForCurrentIteration.getCPUUtilization(), statsForCurrentIteration.getNetworkReadKBs(),
+                statsForCurrentIteration.getNetworkWriteKBs(), statsForCurrentIteration.getNumCPUs(), statsForCurrentIteration.getDiskReadKBs(),
+                statsForCurrentIteration.getDiskWriteKBs(), statsForCurrentIteration.getDiskReadIOs(), statsForCurrentIteration.getDiskWriteIOs(),
+                statsForCurrentIteration.getEntityType());
+        VmStatsVO vmStatsVO = new VmStatsVO(statsForCurrentIteration.getVmId(), msId, timestamp, gson.toJson(vmStats));
+        s_logger.trace(String.format("Recording VM stats: [%s].", vmStatsVO.toString()));
+        vmStatsDao.persist(vmStatsVO);
     }
 
     /**
-     * Removes stats for a given virtual machine.
-     * @param vmId ID of the virtual machine to remove stats.
-     */
-    public void removeVirtualMachineStats(Long vmId) {
-        s_logger.debug(String.format("Removing stats from VM with ID: %s .",vmId));
-        _VmStats.remove(vmId);
-    }
-
-    /**
-     * Removes stats of virtual machines that are not running from memory.
+     * Removes the oldest VM stats records according to the global
+     * parameter {@code vm.stats.max.retention.time}.
      */
     protected void cleanUpVirtualMachineStats() {
-        List<Long> allRunningVmIds = new ArrayList<Long>();
-        for (UserVmVO vm : _userVmDao.listAllRunning()) {
-            allRunningVmIds.add(vm.getId());
-        }
-
-        List<Long> vmIdsToRemoveStats = new ArrayList<Long>(_VmStats.keySet());
-        vmIdsToRemoveStats.removeAll(allRunningVmIds);
-
-        for (Long vmId : vmIdsToRemoveStats) {
-            removeVirtualMachineStats(vmId);
+        Integer maxRetentionTime = vmStatsMaxRetentionTime.value();
+        if (maxRetentionTime <= 0) {
+            s_logger.debug(String.format("Skipping VM stats cleanup. The [%s] parameter [%s] is set to 0 or less than 0.",
+                    vmStatsMaxRetentionTime.scope(), vmStatsMaxRetentionTime.toString()));
+            return;
         }
+        s_logger.trace("Removing older VM stats records.");
+        Date now = new Date();
+        Date limit = DateUtils.addMinutes(now, -maxRetentionTime);
+        vmStatsDao.removeAllByTimestampLessThan(limit);
     }
 
     /**
@@ -1657,7 +1746,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 
     @Override
     public ConfigKey<?>[] getConfigKeys() {
-        return new ConfigKey<?>[] {vmDiskStatsInterval, vmDiskStatsIntervalMin, vmNetworkStatsInterval, vmNetworkStatsIntervalMin, StatsTimeout, statsOutputUri, VM_STATS_INCREMENT_METRICS_IN_MEMORY};
+        return new ConfigKey<?>[] {vmDiskStatsInterval, vmDiskStatsIntervalMin, vmNetworkStatsInterval, vmNetworkStatsIntervalMin, StatsTimeout, statsOutputUri,
+            vmStatsIncrementMetrics, vmStatsMaxRetentionTime};
     }
 
     public double getImageStoreCapacityThreshold() {
diff --git a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
index 2fac76eaefb..e8733e618aa 100644
--- a/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
+++ b/server/src/main/java/com/cloud/vm/UserVmManagerImpl.java
@@ -356,6 +356,7 @@ import com.cloud.vm.dao.NicExtraDhcpOptionDao;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.UserVmDetailsDao;
 import com.cloud.vm.dao.VMInstanceDao;
+import com.cloud.vm.dao.VmStatsDao;
 import com.cloud.vm.snapshot.VMSnapshotManager;
 import com.cloud.vm.snapshot.VMSnapshotVO;
 import com.cloud.vm.snapshot.dao.VMSnapshotDao;
@@ -549,6 +550,8 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
     @Inject
     private AnnotationDao annotationDao;
     @Inject
+    private VmStatsDao vmStatsDao;
+    @Inject
     protected CommandSetupHelper commandSetupHelper;
     @Autowired
     @Qualifier("networkHelper")
@@ -5061,8 +5064,6 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
             throw new InvalidParameterValueException("unable to find a virtual machine with id " + vmId);
         }
 
-        statsCollector.removeVirtualMachineStats(vmId);
-
         _userDao.findById(userId);
         boolean status = false;
         try {
@@ -5375,7 +5376,7 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Vir
             return vm;
         }
 
-        statsCollector.removeVirtualMachineStats(vmId);
+        vmStatsDao.removeAllByVmId(vmId);
 
         boolean status;
         State vmState = vm.getState();
diff --git a/server/src/test/java/com/cloud/server/StatsCollectorTest.java b/server/src/test/java/com/cloud/server/StatsCollectorTest.java
index 4ee2099da6b..f32b05e4ddd 100644
--- a/server/src/test/java/com/cloud/server/StatsCollectorTest.java
+++ b/server/src/test/java/com/cloud/server/StatsCollectorTest.java
@@ -22,12 +22,13 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cloudstack.framework.config.ConfigKey;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
 import org.influxdb.dto.BatchPoints;
@@ -36,6 +37,8 @@ import org.influxdb.dto.Point;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
@@ -45,12 +48,13 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.modules.junit4.PowerMockRunnerDelegate;
 
 import com.cloud.agent.api.VmDiskStatsEntry;
+import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.server.StatsCollector.ExternalStatsProtocol;
 import com.cloud.user.VmDiskStatisticsVO;
 import com.cloud.utils.exception.CloudRuntimeException;
-import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VmStats;
-import com.cloud.vm.dao.UserVmDao;
+import com.cloud.vm.VmStatsVO;
+import com.cloud.vm.dao.VmStatsDao;
 import com.tngtech.java.junit.dataprovider.DataProvider;
 import com.tngtech.java.junit.dataprovider.DataProviderRunner;
 
@@ -70,16 +74,25 @@ public class StatsCollectorTest {
     private static final String DEFAULT_DATABASE_NAME = "cloudstack";
 
     @Mock
-    ConcurrentHashMap<Long, VmStats> vmStatsMock;
+    VmStatsDao vmStatsDaoMock;
 
     @Mock
-    VmStats singleVmStatsMock;
+    VmStatsEntry statsForCurrentIterationMock;
+
+    @Captor
+    ArgumentCaptor<VmStatsVO> vmStatsVOCaptor;
+
+    @Captor
+    ArgumentCaptor<Boolean> booleanCaptor;
+
+    @Mock
+    Boolean accumulateMock;
 
     @Mock
-    UserVmDao userVmDaoMock;
+    VmStatsVO vmStatsVoMock1, vmStatsVoMock2;
 
     @Mock
-    UserVmVO userVmVOMock;
+    VmStatsEntry vmStatsEntryMock;
 
     @Test
     public void createInfluxDbConnectionTest() {
@@ -240,47 +253,127 @@ public class StatsCollectorTest {
         Assert.assertEquals(expected, result);
     }
 
+    private void setVmStatsIncrementMetrics(String value) {
+        StatsCollector.vmStatsIncrementMetrics = new ConfigKey<Boolean>("Advanced", Boolean.class, "vm.stats.increment.metrics", value,
+                "When set to 'true', VM metrics(NetworkReadKBs, NetworkWriteKBs, DiskWriteKBs, DiskReadKBs, DiskReadIOs and DiskWriteIOs) that are collected from the hypervisor are summed before being returned. "
+                        + "On the other hand, when set to 'false', the VM metrics API will just display the latest metrics collected.", true);
+    }
+
+    private void setVmStatsMaxRetentionTimeValue(String value) {
+        StatsCollector.vmStatsMaxRetentionTime = new ConfigKey<Integer>("Advanced", Integer.class, "vm.stats.max.retention.time", value,
+                "The maximum time (in minutes) for keeping VM stats records in the database. The VM stats cleanup process will be disabled if this is set to 0 or less than 0.", true);
+    }
+
     @Test
-    public void removeVirtualMachineStatsTestRemoveOneVmStats() {
-        Mockito.doReturn(new Object()).when(vmStatsMock).remove(Mockito.anyLong());
+    public void cleanUpVirtualMachineStatsTestIsDisabled() {
+        setVmStatsMaxRetentionTimeValue("0");
 
-        statsCollector.removeVirtualMachineStats(1l);
+        statsCollector.cleanUpVirtualMachineStats();
 
-        Mockito.verify(vmStatsMock, Mockito.times(1)).remove(Mockito.anyLong());
+        Mockito.verify(vmStatsDaoMock, Mockito.never()).removeAllByTimestampLessThan(Mockito.any());
     }
 
     @Test
-    public void cleanUpVirtualMachineStatsTestDoNothing() {
-        Mockito.doReturn(new ArrayList<>()).when(userVmDaoMock).listAllRunning();
-        Mockito.doReturn(new ConcurrentHashMap<Long, VmStats>(new HashMap<>()).keySet())
-        .when(vmStatsMock).keySet();
+    public void cleanUpVirtualMachineStatsTestIsEnabled() {
+        setVmStatsMaxRetentionTimeValue("1");
 
         statsCollector.cleanUpVirtualMachineStats();
 
-        Mockito.verify(statsCollector, Mockito.never()).removeVirtualMachineStats(Mockito.anyLong());
+        Mockito.verify(vmStatsDaoMock).removeAllByTimestampLessThan(Mockito.any());
     }
 
     @Test
-    public void cleanUpVirtualMachineStatsTestRemoveOneVmStats() {
-        Mockito.doReturn(new ArrayList<>()).when(userVmDaoMock).listAllRunning();
-        Mockito.doReturn(1l).when(userVmVOMock).getId();
-        Mockito.doReturn(new ConcurrentHashMap<Long, VmStats>(Map.of(1l, singleVmStatsMock)).keySet())
-        .when(vmStatsMock).keySet();
+    public void persistVirtualMachineStatsTestPersistsSuccessfully() {
+        statsCollector.msId = 1L;
+        Date timestamp = new Date();
+        VmStatsEntry statsForCurrentIteration = new VmStatsEntry(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, "vm");
+        Mockito.doReturn(new VmStatsVO()).when(vmStatsDaoMock).persist(Mockito.any());
+        String expectedVmStatsStr = "{\"vmId\":2,\"cpuUtilization\":6.0,\"networkReadKBs\":7.0,\"networkWriteKBs\":8.0,\"diskReadIOs\":12.0,\"diskWriteIOs\":13.0,\"diskReadKBs\":10.0"
+                + ",\"diskWriteKBs\":11.0,\"memoryKBs\":3.0,\"intFreeMemoryKBs\":4.0,\"targetMemoryKBs\":5.0,\"numCPUs\":9,\"entityType\":\"vm\"}";
+
+        statsCollector.persistVirtualMachineStats(statsForCurrentIteration, timestamp);
+
+        Mockito.verify(vmStatsDaoMock).persist(vmStatsVOCaptor.capture());
+        VmStatsVO actual = vmStatsVOCaptor.getAllValues().get(0);
+        Assert.assertEquals(Long.valueOf(2L), actual.getVmId());
+        Assert.assertEquals(Long.valueOf(1L), actual.getMgmtServerId());
+        Assert.assertEquals(expectedVmStatsStr, actual.getVmStatsData());
+        Assert.assertEquals(timestamp, actual.getTimestamp());
+    }
 
-        statsCollector.cleanUpVirtualMachineStats();
+    @Test
+    public void getVmStatsTestWithAccumulateNotNull() {
+        Mockito.doReturn(Arrays.asList(vmStatsVoMock1)).when(vmStatsDaoMock).findByVmIdOrderByTimestampDesc(Mockito.anyLong());
+        Mockito.doReturn(true).when(accumulateMock).booleanValue();
+        Mockito.doReturn(vmStatsEntryMock).when(statsCollector).getLatestOrAccumulatedVmMetricsStats(Mockito.anyList(), Mockito.anyBoolean());
+
+        VmStats result = statsCollector.getVmStats(1L, accumulateMock);
 
-        Mockito.verify(vmStatsMock, Mockito.times(1)).remove(Mockito.anyLong());
+        Mockito.verify(statsCollector).getLatestOrAccumulatedVmMetricsStats(Mockito.anyList(), booleanCaptor.capture());
+        boolean actualArg = booleanCaptor.getValue().booleanValue();
+        Assert.assertEquals(false, actualArg);
+        Assert.assertEquals(vmStatsEntryMock, result);
     }
 
     @Test
-    public void cleanUpVirtualMachineStatsTestRemoveOnlyOneVmStats() {
-        Mockito.doReturn(1l).when(userVmVOMock).getId();
-        Mockito.doReturn(Arrays.asList(userVmVOMock)).when(userVmDaoMock).listAllRunning();
-        Mockito.doReturn(new ConcurrentHashMap<Long, VmStats>(Map.of(1l, singleVmStatsMock, 2l, singleVmStatsMock)).keySet())
-        .when(vmStatsMock).keySet();
+    public void getVmStatsTestWithNullAccumulate() {
+        setVmStatsIncrementMetrics("true");
+        Mockito.doReturn(Arrays.asList(vmStatsVoMock1)).when(vmStatsDaoMock).findByVmIdOrderByTimestampDesc(Mockito.anyLong());
+        Mockito.doReturn(vmStatsEntryMock).when(statsCollector).getLatestOrAccumulatedVmMetricsStats(Mockito.anyList(), Mockito.anyBoolean());
 
-        statsCollector.cleanUpVirtualMachineStats();
+        VmStats result = statsCollector.getVmStats(1L, null);
 
-        Mockito.verify(vmStatsMock, Mockito.times(1)).remove(Mockito.anyLong());
+        Mockito.verify(statsCollector).getLatestOrAccumulatedVmMetricsStats(Mockito.anyList(), booleanCaptor.capture());
+        boolean actualArg = booleanCaptor.getValue().booleanValue();
+        Assert.assertEquals(true, actualArg);
+        Assert.assertEquals(vmStatsEntryMock, result);
     }
+
+    @Test
+    public void getLatestOrAccumulatedVmMetricsStatsTestAccumulate() {
+        Mockito.doReturn(null).when(statsCollector).accumulateVmMetricsStats(Mockito.anyList());
+
+        statsCollector.getLatestOrAccumulatedVmMetricsStats(Arrays.asList(vmStatsVoMock1), true);
+
+        Mockito.verify(statsCollector).accumulateVmMetricsStats(Mockito.anyList());
+    }
+
+    @Test
+    public void getLatestOrAccumulatedVmMetricsStatsTestLatest() {
+        statsCollector.getLatestOrAccumulatedVmMetricsStats(Arrays.asList(vmStatsVoMock1), false);
+
+        Mockito.verify(statsCollector, Mockito.never()).accumulateVmMetricsStats(Mockito.anyList());
+    }
+
+    @Test
+    public void accumulateVmMetricsStatsTest() {
+        String fakeStatsData1 = "{\"vmId\":1,\"cpuUtilization\":1.0,\"networkReadKBs\":1.0,"
+                + "\"networkWriteKBs\":1.1,\"diskReadIOs\":3.0,\"diskWriteIOs\":3.1,\"diskReadKBs\":2.0,"
+                + "\"diskWriteKBs\":2.1,\"memoryKBs\":1.0,\"intFreeMemoryKBs\":1.0,"
+                + "\"targetMemoryKBs\":1.0,\"numCPUs\":1,\"entityType\":\"vm\"}";
+        String fakeStatsData2 = "{\"vmId\":1,\"cpuUtilization\":10.0,\"networkReadKBs\":1.0,"
+                + "\"networkWriteKBs\":1.1,\"diskReadIOs\":3.0,\"diskWriteIOs\":3.1,\"diskReadKBs\":2.0,"
+                + "\"diskWriteKBs\":2.1,\"memoryKBs\":1.0,\"intFreeMemoryKBs\":1.0,"
+                + "\"targetMemoryKBs\":1.0,\"numCPUs\":1,\"entityType\":\"vm\"}";
+        Mockito.doReturn(fakeStatsData1).when(vmStatsVoMock1).getVmStatsData();
+        Mockito.doReturn(fakeStatsData2).when(vmStatsVoMock2).getVmStatsData();
+
+        VmStatsEntry result = statsCollector.accumulateVmMetricsStats(new ArrayList<VmStatsVO>(
+                Arrays.asList(vmStatsVoMock1, vmStatsVoMock2)));
+
+        Assert.assertEquals("vm", result.getEntityType());
+        Assert.assertEquals(1, result.getVmId());
+        Assert.assertEquals(1.0, result.getCPUUtilization(), 0);
+        Assert.assertEquals(1, result.getNumCPUs());
+        Assert.assertEquals(1.0, result.getMemoryKBs(), 0);
+        Assert.assertEquals(1.0, result.getIntFreeMemoryKBs(), 0);
+        Assert.assertEquals(1.0, result.getTargetMemoryKBs(), 0);
+        Assert.assertEquals(2.0, result.getNetworkReadKBs(), 0);
+        Assert.assertEquals(2.2, result.getNetworkWriteKBs(), 0);
+        Assert.assertEquals(4.0, result.getDiskReadKBs(), 0);
+        Assert.assertEquals(4.2, result.getDiskWriteKBs(), 0);
+        Assert.assertEquals(6.0, result.getDiskReadIOs(), 0);
+        Assert.assertEquals(6.2, result.getDiskWriteIOs(), 0);
+    }
+
 }
diff --git a/server/src/test/java/com/cloud/user/AccountManagerImplVolumeDeleteEventTest.java b/server/src/test/java/com/cloud/user/AccountManagerImplVolumeDeleteEventTest.java
index 37c48ad8bd7..5b4c1546395 100644
--- a/server/src/test/java/com/cloud/user/AccountManagerImplVolumeDeleteEventTest.java
+++ b/server/src/test/java/com/cloud/user/AccountManagerImplVolumeDeleteEventTest.java
@@ -54,7 +54,6 @@ import com.cloud.event.UsageEventVO;
 import com.cloud.exception.AgentUnavailableException;
 import com.cloud.exception.CloudException;
 import com.cloud.exception.ConcurrentOperationException;
-import com.cloud.server.StatsCollector;
 import com.cloud.service.ServiceOfferingVO;
 import com.cloud.storage.Volume.Type;
 import com.cloud.storage.VolumeVO;
@@ -62,6 +61,7 @@ import com.cloud.vm.UserVmManager;
 import com.cloud.vm.UserVmManagerImpl;
 import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.dao.VmStatsDao;
 
 @RunWith(MockitoJUnitRunner.class)
 public class AccountManagerImplVolumeDeleteEventTest extends AccountManagetImplTestBase {
@@ -77,7 +77,7 @@ public class AccountManagerImplVolumeDeleteEventTest extends AccountManagetImplT
     private UserVmManager userVmManager;
 
     @Mock
-    private StatsCollector statsCollectorMock;
+    private VmStatsDao vmStatsDaoMock;
 
     Map<String, Object> oldFields = new HashMap<>();
     UserVmVO vm = mock(UserVmVO.class);
@@ -204,7 +204,7 @@ public class AccountManagerImplVolumeDeleteEventTest extends AccountManagetImplT
     // volume.
     public void runningVMRootVolumeUsageEvent()
             throws SecurityException, IllegalArgumentException, ReflectiveOperationException, AgentUnavailableException, ConcurrentOperationException, CloudException {
-        Mockito.doNothing().when(statsCollectorMock).removeVirtualMachineStats(Mockito.anyLong());
+        Mockito.doNothing().when(vmStatsDaoMock).removeAllByVmId(Mockito.anyLong());
         Mockito.lenient().when(_vmMgr.destroyVm(nullable(Long.class), nullable(Boolean.class))).thenReturn(vm);
         List<UsageEventVO> emittedEvents = deleteUserAccountRootVolumeUsageEvents(false);
         UsageEventVO event = emittedEvents.get(0);
diff --git a/ui/src/config/section/compute.js b/ui/src/config/section/compute.js
index 350c00d9492..a5c21459cb5 100644
--- a/ui/src/config/section/compute.js
+++ b/ui/src/config/section/compute.js
@@ -31,6 +31,13 @@ export default {
       docHelp: 'adminguide/virtual_machines.html',
       permission: ['listVirtualMachinesMetrics'],
       resourceType: 'UserVm',
+      params: () => {
+        var params = {}
+        if (store.getters.metrics) {
+          params = { state: 'running' }
+        }
+        return params
+      },
       filters: () => {
         const filters = ['running', 'stopped']
         if (!(store.getters.project && store.getters.project.id)) {