You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by we...@apache.org on 2013/06/01 08:17:43 UTC

[2/4] CLOUDSTACK-1192: Add Disk I/O Statistics

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/server/src/com/cloud/server/StatsCollector.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/server/StatsCollector.java b/server/src/com/cloud/server/StatsCollector.java
index 05be0e2..39b7439 100755
--- a/server/src/com/cloud/server/StatsCollector.java
+++ b/server/src/com/cloud/server/StatsCollector.java
@@ -17,11 +17,14 @@
 package com.cloud.server;
 
 import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -45,8 +48,11 @@ import com.cloud.agent.api.Answer;
 import com.cloud.agent.api.GetFileStatsCommand;
 import com.cloud.agent.api.GetStorageStatsCommand;
 import com.cloud.agent.api.HostStatsEntry;
+import com.cloud.agent.api.VmDiskStatsEntry;
 import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.agent.manager.Commands;
+import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.dao.ManagementServerHostDao;
 import com.cloud.exception.AgentUnavailableException;
 import com.cloud.exception.StorageUnavailableException;
 import com.cloud.host.Host;
@@ -59,19 +65,31 @@ import com.cloud.resource.ResourceState;
 import com.cloud.storage.StorageManager;
 import com.cloud.storage.StoragePoolHostVO;
 import com.cloud.storage.StorageStats;
+import com.cloud.storage.Volume;
 import com.cloud.storage.VolumeStats;
 import com.cloud.storage.VolumeVO;
 import com.cloud.storage.dao.StoragePoolHostDao;
 import com.cloud.storage.dao.VolumeDao;
 import com.cloud.storage.secondary.SecondaryStorageVmManager;
+import com.cloud.user.UserStatisticsVO;
+import com.cloud.user.UserStatsLogVO;
+import com.cloud.user.VmDiskStatisticsVO;
+import com.cloud.user.dao.VmDiskStatisticsDao;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.component.ComponentMethodInterceptable;
 import com.cloud.utils.component.ManagerBase;
 import com.cloud.utils.concurrency.NamedThreadFactory;
+import com.cloud.utils.db.Filter;
+import com.cloud.utils.db.GlobalLock;
 import com.cloud.utils.db.SearchCriteria;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.net.MacAddress;
+import com.cloud.vm.NicVO;
 import com.cloud.vm.UserVmManager;
 import com.cloud.vm.UserVmVO;
 import com.cloud.vm.VmStats;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.VirtualMachine.State;
 import com.cloud.vm.dao.UserVmDao;
 
 /**
@@ -96,6 +114,8 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 	@Inject private SecondaryStorageVmManager _ssvmMgr;
 	@Inject private ResourceManager _resourceMgr;
     @Inject private ConfigurationDao _configDao;
+    @Inject private VmDiskStatisticsDao _vmDiskStatsDao;
+    @Inject private ManagementServerHostDao _msHostDao;
 
 	private ConcurrentHashMap<Long, HostStats> _hostStats = new ConcurrentHashMap<Long, HostStats>();
 	private final ConcurrentHashMap<Long, VmStats> _VmStats = new ConcurrentHashMap<Long, VmStats>();
@@ -107,6 +127,15 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 	long hostAndVmStatsInterval = -1L;
 	long storageStatsInterval = -1L;
 	long volumeStatsInterval = -1L;
+	int vmDiskStatsInterval = 0;
+	
+	private ScheduledExecutorService _diskStatsUpdateExecutor;
+    private int _usageAggregationRange = 1440;
+    private String _usageTimeZone = "GMT";
+    private final long mgmtSrvrId = MacAddress.getMacAddress().toLong();
+    private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 5;    // 5 seconds
+    private static final int USAGE_AGGREGATION_RANGE_MIN = 10; // 10 minutes, same to com.cloud.usage.UsageManagerImpl.USAGE_AGGREGATION_RANGE_MIN
+    private boolean _dailyOrHourly = false;
 
 	//private final GlobalLock m_capacityCheckLock = GlobalLock.getInternLock("capacity.check");
 
@@ -136,6 +165,7 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 		 hostAndVmStatsInterval = NumbersUtil.parseLong(configs.get("vm.stats.interval"), 60000L);
 		 storageStatsInterval = NumbersUtil.parseLong(configs.get("storage.stats.interval"), 60000L);
 		 volumeStatsInterval = NumbersUtil.parseLong(configs.get("volume.stats.interval"), -1L);
+		 vmDiskStatsInterval = NumbersUtil.parseInt(configs.get("vm.disk.stats.interval"), 0);
 
 		 if (hostStatsInterval > 0) {
 		     _executor.scheduleWithFixedDelay(new HostCollector(), 15000L, hostStatsInterval, TimeUnit.MILLISECONDS);
@@ -148,6 +178,12 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 		 if (storageStatsInterval > 0) {
 		     _executor.scheduleWithFixedDelay(new StorageCollector(), 15000L, storageStatsInterval, TimeUnit.MILLISECONDS);
 		 }
+		 
+		 if (vmDiskStatsInterval > 0) {
+                     if (vmDiskStatsInterval < 300)
+                         vmDiskStatsInterval = 300;
+             _executor.scheduleAtFixedRate(new VmDiskStatsTask(), vmDiskStatsInterval, vmDiskStatsInterval, TimeUnit.SECONDS);
+         }
 		
 		// -1 means we don't even start this thread to pick up any data.
 		if (volumeStatsInterval > 0) {
@@ -155,6 +191,49 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 		} else {
 			s_logger.info("Disabling volume stats collector");
 		}
+		
+        //Schedule disk stats update task
+        _diskStatsUpdateExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DiskStatsUpdater"));
+        String aggregationRange = configs.get("usage.stats.job.aggregation.range");
+        _usageAggregationRange  = NumbersUtil.parseInt(aggregationRange, 1440);
+        _usageTimeZone = configs.get("usage.aggregation.timezone");
+        if(_usageTimeZone == null){
+            _usageTimeZone = "GMT";
+        }
+        TimeZone usageTimezone = TimeZone.getTimeZone(_usageTimeZone);
+        Calendar cal = Calendar.getInstance(usageTimezone);
+        cal.setTime(new Date());
+        long endDate = 0;
+        int HOURLY_TIME = 60;
+        final int DAILY_TIME = 60 * 24;
+        if (_usageAggregationRange == DAILY_TIME) {
+            cal.set(Calendar.HOUR_OF_DAY, 0);
+            cal.set(Calendar.MINUTE, 0);
+            cal.set(Calendar.SECOND, 0);
+            cal.set(Calendar.MILLISECOND, 0);
+            cal.roll(Calendar.DAY_OF_YEAR, true);
+            cal.add(Calendar.MILLISECOND, -1);
+            endDate = cal.getTime().getTime();
+            _dailyOrHourly = true;
+        } else if (_usageAggregationRange == HOURLY_TIME) {
+            cal.set(Calendar.MINUTE, 0);
+            cal.set(Calendar.SECOND, 0);
+            cal.set(Calendar.MILLISECOND, 0);
+            cal.roll(Calendar.HOUR_OF_DAY, true);
+            cal.add(Calendar.MILLISECOND, -1);
+            endDate = cal.getTime().getTime();
+            _dailyOrHourly = true;
+        } else {
+            endDate = cal.getTime().getTime();
+            _dailyOrHourly = false;
+        }
+        if (_usageAggregationRange < USAGE_AGGREGATION_RANGE_MIN) {
+            s_logger.warn("Usage stats job aggregation range is to small, using the minimum value of " + USAGE_AGGREGATION_RANGE_MIN);
+            _usageAggregationRange = USAGE_AGGREGATION_RANGE_MIN;
+        }
+        _diskStatsUpdateExecutor.scheduleAtFixedRate(new VmDiskStatsUpdaterTask(), (endDate - System.currentTimeMillis()),
+                (_usageAggregationRange * 60 * 1000), TimeUnit.MILLISECONDS);
+        
 	}
 
 	class HostCollector implements Runnable {
@@ -249,6 +328,10 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 										statsInMemory.setNumCPUs(statsForCurrentIteration.getNumCPUs());
 										statsInMemory.setNetworkReadKBs(statsInMemory.getNetworkReadKBs() + statsForCurrentIteration.getNetworkReadKBs());
 										statsInMemory.setNetworkWriteKBs(statsInMemory.getNetworkWriteKBs() + statsForCurrentIteration.getNetworkWriteKBs());
+										statsInMemory.setDiskWriteKBs(statsInMemory.getDiskWriteKBs() + statsForCurrentIteration.getDiskWriteKBs());
+                                                                                statsInMemory.setDiskReadIOs(statsInMemory.getDiskReadIOs() + statsForCurrentIteration.getDiskReadIOs());
+                                                                                statsInMemory.setDiskWriteIOs(statsInMemory.getDiskWriteIOs() + statsForCurrentIteration.getDiskWriteIOs());
+                                                                                statsInMemory.setDiskReadKBs(statsInMemory.getDiskReadKBs() + statsForCurrentIteration.getDiskReadKBs());
 										
 										_VmStats.put(vmId, statsInMemory);
 									}
@@ -270,6 +353,175 @@ public class StatsCollector extends ManagerBase implements ComponentMethodInterc
 	public VmStats getVmStats(long id) {
 		return _VmStats.get(id);
 	}
+	
+    class VmDiskStatsUpdaterTask implements Runnable {
+        @Override
+        public void run() {
+            GlobalLock scanLock = GlobalLock.getInternLock("vm.disk.stats");
+            try {
+                if(scanLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) {
+                    //Check for ownership
+                    //msHost in UP state with min id should run the job
+                    ManagementServerHostVO msHost = _msHostDao.findOneInUpState(new Filter(ManagementServerHostVO.class, "id", true, 0L, 1L));
+                    if(msHost == null || (msHost.getMsid() != mgmtSrvrId)){
+                        s_logger.debug("Skipping aggregate disk stats update");
+                        scanLock.unlock();
+                        return;
+                    }
+                    Transaction txn = Transaction.open(Transaction.CLOUD_DB);
+                    try {
+                        txn.start();
+                        //get all stats with delta > 0
+                        List<VmDiskStatisticsVO> updatedVmNetStats = _vmDiskStatsDao.listUpdatedStats();
+                        for(VmDiskStatisticsVO stat : updatedVmNetStats){
+                            if (_dailyOrHourly) {
+                                //update agg bytes                    
+                                stat.setAggBytesRead(stat.getCurrentBytesRead() + stat.getNetBytesRead());
+                                stat.setAggBytesWrite(stat.getCurrentBytesWrite() + stat.getNetBytesWrite());
+                                stat.setAggIORead(stat.getCurrentIORead() + stat.getNetIORead());
+                                stat.setAggIOWrite(stat.getCurrentIOWrite() + stat.getNetIOWrite());
+                                _vmDiskStatsDao.update(stat.getId(), stat);
+                            }
+                        }
+                        s_logger.debug("Successfully updated aggregate vm disk stats");
+                        txn.commit();
+                    } catch (Exception e){
+                        txn.rollback();
+                        s_logger.debug("Failed to update aggregate disk stats", e);
+                    } finally {
+                        scanLock.unlock();
+                        txn.close();
+                    }
+                }
+            } catch (Exception e){
+                s_logger.debug("Exception while trying to acquire disk stats lock", e);
+            }  finally {
+                scanLock.releaseRef();
+            }
+        }
+    }
+    
+    class VmDiskStatsTask implements Runnable {
+        @Override
+        public void run() {
+            // collect the vm disk statistics(total) from hypervisor. added by weizhou, 2013.03.
+            Transaction txn = Transaction.open(Transaction.CLOUD_DB);
+            try {
+                txn.start();
+                SearchCriteria<HostVO> sc = _hostDao.createSearchCriteria();
+                sc.addAnd("status", SearchCriteria.Op.EQ, Status.Up.toString());
+                sc.addAnd("resourceState", SearchCriteria.Op.NIN, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, ResourceState.ErrorInMaintenance);
+                sc.addAnd("type", SearchCriteria.Op.EQ, Host.Type.Routing.toString());
+                List<HostVO> hosts = _hostDao.search(sc, null);
+                
+                for (HostVO host : hosts) {
+                    List<UserVmVO> vms = _userVmDao.listRunningByHostId(host.getId());
+                    List<Long> vmIds = new ArrayList<Long>();
+                    
+                    for (UserVmVO vm : vms) {
+                        if (vm.getType() == VirtualMachine.Type.User) // user vm
+                            vmIds.add(vm.getId());
+                    }
+                    
+                    HashMap<Long, List<VmDiskStatsEntry>> vmDiskStatsById = _userVmMgr.getVmDiskStatistics(host.getId(), host.getName(), vmIds);
+                    if (vmDiskStatsById == null)
+                        continue;
+                
+                    Set<Long> vmIdSet = vmDiskStatsById.keySet();
+                    for(Long vmId : vmIdSet)
+                    {
+                        List<VmDiskStatsEntry> vmDiskStats = vmDiskStatsById.get(vmId);
+                        if (vmDiskStats == null)
+                                continue;
+                        UserVmVO userVm = _userVmDao.findById(vmId);
+                        for (VmDiskStatsEntry vmDiskStat:vmDiskStats) {
+                            SearchCriteria<VolumeVO> sc_volume = _volsDao.createSearchCriteria();
+                            sc_volume.addAnd("path", SearchCriteria.Op.EQ, vmDiskStat.getPath());
+                            VolumeVO volume = _volsDao.search(sc_volume, null).get(0);
+                            VmDiskStatisticsVO previousVmDiskStats = _vmDiskStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), vmId, volume.getId());
+                            VmDiskStatisticsVO vmDiskStat_lock = _vmDiskStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), vmId, volume.getId());
+                            
+                            if ((vmDiskStat.getBytesRead() == 0) && (vmDiskStat.getBytesWrite() == 0)
+                                    && (vmDiskStat.getIORead() == 0) && (vmDiskStat.getIOWrite() == 0)) {
+                                s_logger.debug("IO/bytes read and write are all 0. Not updating vm_disk_statistics");
+                                continue;
+                            }
+                            
+                            if (vmDiskStat_lock == null) {
+                                s_logger.warn("unable to find vm disk stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId()+ " and volumeId:" + volume.getId());
+                                continue;
+                            }
+
+                            if (previousVmDiskStats != null
+                                    && ((previousVmDiskStats.getCurrentBytesRead() != vmDiskStat_lock.getCurrentBytesRead())
+                                    || (previousVmDiskStats.getCurrentBytesWrite() != vmDiskStat_lock.getCurrentBytesWrite())
+                                    || (previousVmDiskStats.getCurrentIORead() != vmDiskStat_lock.getCurrentIORead())
+                                    || (previousVmDiskStats.getCurrentIOWrite() != vmDiskStat_lock.getCurrentIOWrite()))) {
+                                s_logger.debug("vm disk stats changed from the time GetVmDiskStatsCommand was sent. " +
+                                        "Ignoring current answer. Host: " + host.getName()  + " . VM: " + vmDiskStat.getVmName() + 
+                                        " Read(Bytes): " + vmDiskStat.getBytesRead() + " write(Bytes): " + vmDiskStat.getBytesWrite() +
+                                        " Read(IO): " + vmDiskStat.getIORead() + " write(IO): " + vmDiskStat.getIOWrite());
+                                continue;
+                            }
+
+                            if (vmDiskStat_lock.getCurrentBytesRead() > vmDiskStat.getBytesRead()) {
+                                if (s_logger.isDebugEnabled()) {
+                                    s_logger.debug("Read # of bytes that's less than the last one.  " +
+                                            "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() +
+                                            " Reported: " + vmDiskStat.getBytesRead() + " Stored: " + vmDiskStat_lock.getCurrentBytesRead());
+                                }
+                                vmDiskStat_lock.setNetBytesRead(vmDiskStat_lock.getNetBytesRead() + vmDiskStat_lock.getCurrentBytesRead());
+                            }
+                            vmDiskStat_lock.setCurrentBytesRead(vmDiskStat.getBytesRead());
+                            if (vmDiskStat_lock.getCurrentBytesWrite() > vmDiskStat.getBytesWrite()) {
+                                if (s_logger.isDebugEnabled()) {
+                                    s_logger.debug("Write # of bytes that's less than the last one.  " +
+                                            "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() +
+                                            " Reported: " + vmDiskStat.getBytesWrite() + " Stored: " + vmDiskStat_lock.getCurrentBytesWrite());
+                                }
+                                vmDiskStat_lock.setNetBytesWrite(vmDiskStat_lock.getNetBytesWrite() + vmDiskStat_lock.getCurrentBytesWrite());
+                            }
+                            vmDiskStat_lock.setCurrentBytesWrite(vmDiskStat.getBytesWrite());
+                            if (vmDiskStat_lock.getCurrentIORead() > vmDiskStat.getIORead()) {
+                                if (s_logger.isDebugEnabled()) {
+                                    s_logger.debug("Read # of IO that's less than the last one.  " +
+                                            "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() +
+                                            " Reported: " + vmDiskStat.getIORead() + " Stored: " + vmDiskStat_lock.getCurrentIORead());
+                                }
+                                vmDiskStat_lock.setNetIORead(vmDiskStat_lock.getNetIORead() + vmDiskStat_lock.getCurrentIORead());
+                            }
+                            vmDiskStat_lock.setCurrentIORead(vmDiskStat.getIORead());
+                            if (vmDiskStat_lock.getCurrentIOWrite() > vmDiskStat.getIOWrite()) {
+                                if (s_logger.isDebugEnabled()) {
+                                    s_logger.debug("Write # of IO that's less than the last one.  " +
+                                            "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() +
+                                            " Reported: " + vmDiskStat.getIOWrite() + " Stored: " + vmDiskStat_lock.getCurrentIOWrite());
+                                }
+                                vmDiskStat_lock.setNetIOWrite(vmDiskStat_lock.getNetIOWrite() + vmDiskStat_lock.getCurrentIOWrite());
+                            }
+                            vmDiskStat_lock.setCurrentIOWrite(vmDiskStat.getIOWrite());
+                            
+                            if (! _dailyOrHourly) {
+                                //update agg bytes 
+                                vmDiskStat_lock.setAggBytesWrite(vmDiskStat_lock.getNetBytesWrite() + vmDiskStat_lock.getCurrentBytesWrite());
+                                vmDiskStat_lock.setAggBytesRead(vmDiskStat_lock.getNetBytesRead() + vmDiskStat_lock.getCurrentBytesRead());
+                                vmDiskStat_lock.setAggIOWrite(vmDiskStat_lock.getNetIOWrite() + vmDiskStat_lock.getCurrentIOWrite());
+                                vmDiskStat_lock.setAggIORead(vmDiskStat_lock.getNetIORead() + vmDiskStat_lock.getCurrentIORead());
+                            }
+
+                            _vmDiskStatsDao.update(vmDiskStat_lock.getId(), vmDiskStat_lock);
+                        }
+                    }
+                }
+                txn.commit();
+            } catch (Exception e) {
+                s_logger.warn("Error while collecting vm disk stats from hosts", e);
+            } finally {
+                txn.close();
+            }
+            
+        }
+    }
 
 	class StorageCollector implements Runnable {
 		@Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/server/src/com/cloud/storage/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/VolumeManagerImpl.java b/server/src/com/cloud/storage/VolumeManagerImpl.java
index d064f3b..43f3681 100644
--- a/server/src/com/cloud/storage/VolumeManagerImpl.java
+++ b/server/src/com/cloud/storage/VolumeManagerImpl.java
@@ -128,9 +128,11 @@ import com.cloud.template.TemplateManager;
 import com.cloud.user.Account;
 import com.cloud.user.AccountManager;
 import com.cloud.user.ResourceLimitService;
+import com.cloud.user.VmDiskStatisticsVO;
 import com.cloud.user.UserContext;
 import com.cloud.user.dao.AccountDao;
 import com.cloud.user.dao.UserDao;
+import com.cloud.user.dao.VmDiskStatisticsDao;
 import com.cloud.uservm.UserVm;
 import com.cloud.utils.EnumUtils;
 import com.cloud.utils.NumbersUtil;
@@ -280,6 +282,8 @@ public class VolumeManagerImpl extends ManagerBase implements VolumeManager {
     @Inject
     protected ResourceTagDao _resourceTagDao;
     @Inject
+    protected VmDiskStatisticsDao _vmDiskStatsDao;
+    @Inject
     protected VMSnapshotDao _vmSnapshotDao;
     @Inject
     protected List<StoragePoolAllocator> _storagePoolAllocators;
@@ -1558,6 +1562,13 @@ public class VolumeManagerImpl extends ManagerBase implements VolumeManager {
             } else {
                 _volsDao.attachVolume(volume.getId(), vm.getId(), deviceId);
             }
+            // insert record for disk I/O statistics
+            VmDiskStatisticsVO diskstats = _vmDiskStatsDao.findBy(vm.getAccountId(), vm.getDataCenterId(),vm.getId(), volume.getId());
+            if (diskstats == null) {
+               diskstats = new VmDiskStatisticsVO(vm.getAccountId(), vm.getDataCenterId(),vm.getId(), volume.getId());
+               _vmDiskStatsDao.persist(diskstats);
+            }
+
             return _volsDao.findById(volume.getId());
         } else {
             if (answer != null) {
@@ -1895,6 +1906,9 @@ public class VolumeManagerImpl extends ManagerBase implements VolumeManager {
                     .getPoolId());
             cmd.setPoolUuid(volumePool.getUuid());
 
+            // Collect vm disk statistics from host before stopping Vm
+            _userVmMgr.collectVmDiskStatistics(vm);
+
             try {
                 answer = _agentMgr.send(vm.getHostId(), cmd);
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/server/src/com/cloud/vm/UserVmManager.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/UserVmManager.java b/server/src/com/cloud/vm/UserVmManager.java
index 4dcfb73..348017a 100755
--- a/server/src/com/cloud/vm/UserVmManager.java
+++ b/server/src/com/cloud/vm/UserVmManager.java
@@ -20,6 +20,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.cloud.agent.api.VmDiskStatsEntry;
 import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.api.query.vo.UserVmJoinVO;
 import com.cloud.exception.*;
@@ -65,6 +66,8 @@ public interface UserVmManager extends VirtualMachineGuru<UserVmVO>, UserVmServi
      */
     HashMap<Long, VmStatsEntry> getVirtualMachineStatistics(long hostId, String hostName, List<Long> vmIds);
     
+    HashMap<Long, List<VmDiskStatsEntry>> getVmDiskStatistics(long hostId, String hostName, List<Long> vmIds);
+    
     boolean deleteVmGroup(long groupId);
 
     boolean addInstanceToGroup(long userVmId, String group);
@@ -95,4 +98,6 @@ public interface UserVmManager extends VirtualMachineGuru<UserVmVO>, UserVmServi
     boolean upgradeVirtualMachine(Long id, Long serviceOfferingId) throws ResourceUnavailableException, ConcurrentOperationException, ManagementServerException, VirtualMachineMigrationException;
 
     boolean setupVmForPvlan(boolean add, Long hostId, NicProfile nic);
+
+    void collectVmDiskStatistics (UserVmVO userVm);
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/server/src/com/cloud/vm/UserVmManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/UserVmManagerImpl.java b/server/src/com/cloud/vm/UserVmManagerImpl.java
index 86bdb14..8cf05aa 100755
--- a/server/src/com/cloud/vm/UserVmManagerImpl.java
+++ b/server/src/com/cloud/vm/UserVmManagerImpl.java
@@ -66,6 +66,8 @@ import org.apache.log4j.Logger;
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.AgentManager.OnError;
 import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.GetVmDiskStatsAnswer;
+import com.cloud.agent.api.GetVmDiskStatsCommand;
 import com.cloud.agent.api.GetVmStatsAnswer;
 import com.cloud.agent.api.GetVmStatsCommand;
 import com.cloud.agent.api.PlugNicAnswer;
@@ -75,6 +77,7 @@ import com.cloud.agent.api.StartAnswer;
 import com.cloud.agent.api.StopAnswer;
 import com.cloud.agent.api.UnPlugNicAnswer;
 import com.cloud.agent.api.UnPlugNicCommand;
+import com.cloud.agent.api.VmDiskStatsEntry;
 import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.agent.api.to.NicTO;
 import com.cloud.agent.api.to.VirtualMachineTO;
@@ -213,9 +216,11 @@ import com.cloud.user.SSHKeyPairVO;
 import com.cloud.user.User;
 import com.cloud.user.UserContext;
 import com.cloud.user.UserVO;
+import com.cloud.user.VmDiskStatisticsVO;
 import com.cloud.user.dao.AccountDao;
 import com.cloud.user.dao.SSHKeyPairDao;
 import com.cloud.user.dao.UserDao;
+import com.cloud.user.dao.VmDiskStatisticsDao;
 import com.cloud.uservm.UserVm;
 import com.cloud.utils.Journal;
 import com.cloud.utils.NumbersUtil;
@@ -239,6 +244,7 @@ import com.cloud.vm.VirtualMachine.State;
 import com.cloud.vm.dao.InstanceGroupDao;
 import com.cloud.vm.dao.InstanceGroupVMMapDao;
 import com.cloud.vm.dao.NicDao;
+import com.cloud.vm.dao.SecondaryStorageVmDao;
 import com.cloud.vm.dao.UserVmCloneSettingDao;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.UserVmDetailsDao;
@@ -394,6 +400,12 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Use
     protected GuestOSCategoryDao _guestOSCategoryDao;
     @Inject
     UsageEventDao _usageEventDao;
+
+    @Inject
+    SecondaryStorageVmDao _secondaryDao;
+    @Inject
+    VmDiskStatisticsDao _vmDiskStatsDao;
+
     @Inject
     protected VMSnapshotDao _vmSnapshotDao;
     @Inject
@@ -411,6 +423,7 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Use
     protected ScheduledExecutorService _executor = null;
     protected int _expungeInterval;
     protected int _expungeDelay;
+    protected boolean _dailyOrHourly = false;
 
     protected String _name;
     protected String _instance;
@@ -1098,6 +1111,41 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Use
         }
 
     }
+    
+    @Override
+    public HashMap<Long, List<VmDiskStatsEntry>> getVmDiskStatistics(long hostId, String hostName, List<Long> vmIds) throws CloudRuntimeException {
+        HashMap<Long, List<VmDiskStatsEntry>> vmDiskStatsById = new HashMap<Long, List<VmDiskStatsEntry>>();
+
+        if (vmIds.isEmpty()) {
+            return vmDiskStatsById;
+        }
+
+        List<String> vmNames = new ArrayList<String>();
+
+        for (Long vmId : vmIds) {
+            UserVmVO vm = _vmDao.findById(vmId);
+            vmNames.add(vm.getInstanceName());
+        }
+
+        Answer answer = _agentMgr.easySend(hostId, new GetVmDiskStatsCommand(vmNames, _hostDao.findById(hostId).getGuid(), hostName));
+        if (answer == null || !answer.getResult()) {
+            s_logger.warn("Unable to obtain VM disk statistics.");
+            return null;
+        } else {
+            HashMap<String, List<VmDiskStatsEntry>> vmDiskStatsByName = ((GetVmDiskStatsAnswer)answer).getVmDiskStatsMap();
+
+            if (vmDiskStatsByName == null) {
+                s_logger.warn("Unable to obtain VM disk statistics.");
+                return null;
+            }
+
+            for (String vmName : vmDiskStatsByName.keySet()) {
+                vmDiskStatsById.put(vmIds.get(vmNames.indexOf(vmName)), vmDiskStatsByName.get(vmName));
+            }
+        }
+
+        return vmDiskStatsById;
+    }
 
     @Override
     public boolean upgradeVirtualMachine(Long vmId, Long newServiceOfferingId) throws ResourceUnavailableException, ConcurrentOperationException, ManagementServerException, VirtualMachineMigrationException{
@@ -1397,6 +1445,18 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Use
 
         _executor = Executors.newScheduledThreadPool(wrks, new NamedThreadFactory("UserVm-Scavenger"));
 
+        String aggregationRange = configs.get("usage.stats.job.aggregation.range");
+        int _usageAggregationRange  = NumbersUtil.parseInt(aggregationRange, 1440);
+        int HOURLY_TIME = 60;
+        final int DAILY_TIME = 60 * 24;
+        if (_usageAggregationRange == DAILY_TIME) {
+            _dailyOrHourly = true;
+        } else if (_usageAggregationRange == HOURLY_TIME) {
+            _dailyOrHourly = true;
+        } else {
+            _dailyOrHourly = false;
+        }
+
         _itMgr.registerGuru(VirtualMachine.Type.User, this);
 
         VirtualMachine.State.getStateMachine().registerListener(
@@ -2929,6 +2989,17 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Use
                 userVm.setPrivateMacAddress(nic.getMacAddress());
             }
         }
+
+        List<VolumeVO> volumes = _volsDao.findByInstance(userVm.getId());
+        VmDiskStatisticsVO diskstats = null;
+        for (VolumeVO volume : volumes) {
+               diskstats = _vmDiskStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(),userVm.getId(), volume.getId());
+            if (diskstats == null) {
+               diskstats = new VmDiskStatisticsVO(userVm.getAccountId(), userVm.getDataCenterId(),userVm.getId(), volume.getId());
+               _vmDiskStatsDao.persist(diskstats);
+            }
+        }
+
         return true;
     }
 
@@ -3308,6 +3379,9 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Use
         boolean status;
         State vmState = vm.getState();
 
+        // Collect vm disk statistics from host before stopping Vm
+         collectVmDiskStatistics(vm);
+
         try {
             VirtualMachineEntity vmEntity = _orchSrvc.getVirtualMachine(vm.getUuid());
             status = vmEntity.destroy(new Long(userId).toString());
@@ -3344,6 +3418,122 @@ public class UserVmManagerImpl extends ManagerBase implements UserVmManager, Use
             ex.addProxyObject(vm.getUuid(), "vmId");
             throw ex;
         }
+
+    }
+
+    @Override
+    public void collectVmDiskStatistics (UserVmVO userVm) {
+    	// Collect vm disk statistics from host before stopping Vm
+    	long hostId = userVm.getHostId();
+    	List<String> vmNames = new ArrayList<String>();
+    	vmNames.add(userVm.getInstanceName());
+    	HostVO host = _hostDao.findById(hostId);
+    	
+    	GetVmDiskStatsAnswer diskStatsAnswer = null;
+    	try {
+    		diskStatsAnswer = (GetVmDiskStatsAnswer) _agentMgr.easySend(hostId, new GetVmDiskStatsCommand(vmNames, host.getGuid(), host.getName()));
+    	} catch (Exception e) {
+            s_logger.warn("Error while collecting disk stats for vm: " + userVm.getHostName() + " from host: " + host.getName(), e);
+            return;
+        }
+        if (diskStatsAnswer != null) {
+            if (!diskStatsAnswer.getResult()) {
+                s_logger.warn("Error while collecting disk stats vm: " + userVm.getHostName() + " from host: " + host.getName() + "; details: " + diskStatsAnswer.getDetails());
+                return;
+            }
+            Transaction txn = Transaction.open(Transaction.CLOUD_DB);
+            try {
+                txn.start();
+                HashMap<String, List<VmDiskStatsEntry>> vmDiskStatsByName = diskStatsAnswer.getVmDiskStatsMap();
+                List<VmDiskStatsEntry> vmDiskStats = vmDiskStatsByName.get(userVm.getInstanceName());
+                
+                if (vmDiskStats == null)
+		    return;
+	        	
+	        for (VmDiskStatsEntry vmDiskStat:vmDiskStats) {
+                    SearchCriteria<VolumeVO> sc_volume = _volsDao.createSearchCriteria();
+                    sc_volume.addAnd("path", SearchCriteria.Op.EQ, vmDiskStat.getPath());
+                    VolumeVO volume = _volsDao.search(sc_volume, null).get(0);
+	            VmDiskStatisticsVO previousVmDiskStats = _vmDiskStatsDao.findBy(userVm.getAccountId(), userVm.getDataCenterId(), userVm.getId(), volume.getId());
+	            VmDiskStatisticsVO vmDiskStat_lock = _vmDiskStatsDao.lock(userVm.getAccountId(), userVm.getDataCenterId(), userVm.getId(), volume.getId());
+	                
+	                if ((vmDiskStat.getIORead() == 0) && (vmDiskStat.getIOWrite() == 0) && (vmDiskStat.getBytesRead() == 0) && (vmDiskStat.getBytesWrite() == 0)) {
+	                    s_logger.debug("Read/Write of IO and Bytes are both 0. Not updating vm_disk_statistics");
+	                    continue;
+	                }
+	                
+	                if (vmDiskStat_lock == null) {
+	                    s_logger.warn("unable to find vm disk stats from host for account: " + userVm.getAccountId() + " with vmId: " + userVm.getId()+ " and volumeId:" + volume.getId());
+	                    continue;
+	                }
+	
+	                if (previousVmDiskStats != null
+	                        && ((previousVmDiskStats.getCurrentIORead() != vmDiskStat_lock.getCurrentIORead())
+	                        || ((previousVmDiskStats.getCurrentIOWrite() != vmDiskStat_lock.getCurrentIOWrite())
+	                        || (previousVmDiskStats.getCurrentBytesRead() != vmDiskStat_lock.getCurrentBytesRead())
+	    	                || (previousVmDiskStats.getCurrentBytesWrite() != vmDiskStat_lock.getCurrentBytesWrite())))) {
+	                    s_logger.debug("vm disk stats changed from the time GetVmDiskStatsCommand was sent. " +
+	                            "Ignoring current answer. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() + 
+	                            " IO Read: " + vmDiskStat.getIORead() + " IO Write: " + vmDiskStat.getIOWrite() +
+	                            " Bytes Read: " + vmDiskStat.getBytesRead() + " Bytes Write: " + vmDiskStat.getBytesWrite());
+	                    continue;
+	                }
+	
+	                if (vmDiskStat_lock.getCurrentIORead() > vmDiskStat.getIORead()) {
+	                    if (s_logger.isDebugEnabled()) {
+	                        s_logger.debug("Read # of IO that's less than the last one.  " +
+	                                "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() +
+	                                " Reported: " + vmDiskStat.getIORead() + " Stored: " + vmDiskStat_lock.getCurrentIORead());
+	                    }
+	                    vmDiskStat_lock.setNetIORead(vmDiskStat_lock.getNetIORead() + vmDiskStat_lock.getCurrentIORead());
+	                }
+	                vmDiskStat_lock.setCurrentIORead(vmDiskStat.getIORead());
+	                if (vmDiskStat_lock.getCurrentIOWrite() > vmDiskStat.getIOWrite()) {
+	                    if (s_logger.isDebugEnabled()) {
+	                        s_logger.debug("Write # of IO that's less than the last one.  " +
+	                                "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() +
+	                                " Reported: " + vmDiskStat.getIOWrite() + " Stored: " + vmDiskStat_lock.getCurrentIOWrite());
+	                    }
+	                    vmDiskStat_lock.setNetIOWrite(vmDiskStat_lock.getNetIOWrite() + vmDiskStat_lock.getCurrentIOWrite());
+	                }
+	                vmDiskStat_lock.setCurrentIOWrite(vmDiskStat.getIOWrite());
+	                if (vmDiskStat_lock.getCurrentBytesRead() > vmDiskStat.getBytesRead()) {
+	                    if (s_logger.isDebugEnabled()) {
+	                        s_logger.debug("Read # of Bytes that's less than the last one.  " +
+	                                "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() +
+	                                " Reported: " + vmDiskStat.getBytesRead() + " Stored: " + vmDiskStat_lock.getCurrentBytesRead());
+	                    }
+	                    vmDiskStat_lock.setNetBytesRead(vmDiskStat_lock.getNetBytesRead() + vmDiskStat_lock.getCurrentBytesRead());
+	                }
+	                vmDiskStat_lock.setCurrentBytesRead(vmDiskStat.getBytesRead());
+	                if (vmDiskStat_lock.getCurrentBytesWrite() > vmDiskStat.getBytesWrite()) {
+	                    if (s_logger.isDebugEnabled()) {
+	                        s_logger.debug("Write # of Bytes that's less than the last one.  " +
+	                                "Assuming something went wrong and persisting it. Host: " + host.getName() + " . VM: " + vmDiskStat.getVmName() +
+	                                " Reported: " + vmDiskStat.getBytesWrite() + " Stored: " + vmDiskStat_lock.getCurrentBytesWrite());
+	                    }
+	                    vmDiskStat_lock.setNetBytesWrite(vmDiskStat_lock.getNetBytesWrite() + vmDiskStat_lock.getCurrentBytesWrite());
+	                }
+	                vmDiskStat_lock.setCurrentBytesWrite(vmDiskStat.getBytesWrite());
+	                
+	                if (! _dailyOrHourly) {
+	                    //update agg bytes 
+	                	vmDiskStat_lock.setAggIORead(vmDiskStat_lock.getNetIORead() + vmDiskStat_lock.getCurrentIORead());
+	                	vmDiskStat_lock.setAggIOWrite(vmDiskStat_lock.getNetIOWrite() + vmDiskStat_lock.getCurrentIOWrite());
+	                	vmDiskStat_lock.setAggBytesRead(vmDiskStat_lock.getNetBytesRead() + vmDiskStat_lock.getCurrentBytesRead());
+	                	vmDiskStat_lock.setAggBytesWrite(vmDiskStat_lock.getNetBytesWrite() + vmDiskStat_lock.getCurrentBytesWrite());
+	                }
+	
+	                _vmDiskStatsDao.update(vmDiskStat_lock.getId(), vmDiskStat_lock);
+	        	}
+	        	txn.commit();
+            } catch (Exception e) {
+                txn.rollback();
+                s_logger.warn("Unable to update vm disk statistics for vm: " + userVm.getId() + " from host: " + hostId, e);
+            } finally {
+                txn.close();
+            }
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/server/test/async-job-component.xml
----------------------------------------------------------------------
diff --git a/server/test/async-job-component.xml b/server/test/async-job-component.xml
index 4698252..55f47cc 100644
--- a/server/test/async-job-component.xml
+++ b/server/test/async-job-component.xml
@@ -74,6 +74,7 @@ under the License.
             <param name="cache.time.to.live">300</param>
         </dao>
         <dao name="UserStats" class="com.cloud.user.dao.UserStatisticsDaoImpl"/>
+        <dao name="VmDiskStats" class="com.cloud.user.dao.VmDiskStatisticsDaoImpl"/>
         <dao name="Disk Template" class="com.cloud.storage.dao.DiskTemplateDaoImpl">
             <param name="cache.size">50</param>
             <param name="cache.time.to.live">-1</param>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/server/test/com/cloud/vm/MockUserVmManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/vm/MockUserVmManagerImpl.java b/server/test/com/cloud/vm/MockUserVmManagerImpl.java
index 448a5dd..40c49d4 100644
--- a/server/test/com/cloud/vm/MockUserVmManagerImpl.java
+++ b/server/test/com/cloud/vm/MockUserVmManagerImpl.java
@@ -47,6 +47,7 @@ import org.apache.cloudstack.api.command.user.vmgroup.DeleteVMGroupCmd;
 import org.springframework.stereotype.Component;
 
 import com.cloud.agent.api.StopAnswer;
+import com.cloud.agent.api.VmDiskStatsEntry;
 import com.cloud.agent.api.VmStatsEntry;
 import com.cloud.agent.api.to.NicTO;
 import com.cloud.agent.api.to.VirtualMachineTO;
@@ -169,6 +170,12 @@ public class MockUserVmManagerImpl extends ManagerBase implements UserVmManager,
     }
 
     @Override
+    public HashMap<Long, List<VmDiskStatsEntry>> getVmDiskStatistics(long hostId, String hostName, List<Long> vmIds) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
     public boolean deleteVmGroup(long groupId) {
         // TODO Auto-generated method stub
         return false;
@@ -461,4 +468,9 @@ public class MockUserVmManagerImpl extends ManagerBase implements UserVmManager,
 		// TODO Auto-generated method stub
 		return false;
 	}
+
+    @Override
+    public void collectVmDiskStatistics (UserVmVO userVm) {
+        // TODO Auto-generated method stub
+    }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/setup/db/db/schema-410to420.sql
----------------------------------------------------------------------
diff --git a/setup/db/db/schema-410to420.sql b/setup/db/db/schema-410to420.sql
index 03e14a9..bcfbcc9 100644
--- a/setup/db/db/schema-410to420.sql
+++ b/setup/db/db/schema-410to420.sql
@@ -1771,6 +1771,82 @@ INSERT IGNORE INTO `cloud`.`configuration` VALUES ('Advanced', 'DEFAULT', 'manag
 
 UPDATE `cloud`.`snapshots` set swift_id=null where swift_id=0;
 
+DROP TABLE IF EXISTS `cloud`.`vm_disk_statistics`;
+CREATE TABLE `cloud`.`vm_disk_statistics` (
+  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+  `data_center_id` bigint(20) unsigned NOT NULL,
+  `account_id` bigint(20) unsigned NOT NULL,
+  `vm_id` bigint(20) unsigned NOT NULL,
+  `volume_id` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `net_io_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `net_io_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `current_io_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `current_io_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_io_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_io_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `net_bytes_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `net_bytes_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `current_bytes_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `current_bytes_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_bytes_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_bytes_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `id` (`id`),
+  UNIQUE KEY `account_id` (`account_id`,`data_center_id`,`vm_id`,`volume_id`),
+  KEY `i_vm_disk_statistics__account_id` (`account_id`),
+  KEY `i_vm_disk_statistics__account_id_data_center_id` (`account_id`,`data_center_id`),
+  CONSTRAINT `fk_vm_disk_statistics__account_id` FOREIGN KEY (`account_id`) REFERENCES `account` (`id`) ON DELETE CASCADE
+) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8;
+
+insert into `cloud`.`vm_disk_statistics`(data_center_id,account_id,vm_id,volume_id) 
+select volumes.data_center_id, volumes.account_id, vm_instance.id, volumes.id from volumes,vm_instance where vm_instance.vm_type="User" and vm_instance.state<>"Expunging" and volumes.instance_id=vm_instance.id order by vm_instance.id;
+
+DROP TABLE IF EXISTS `cloud_usage`.`vm_disk_statistics`;
+CREATE TABLE `cloud_usage`.`vm_disk_statistics` (
+  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+  `data_center_id` bigint(20) unsigned NOT NULL,
+  `account_id` bigint(20) unsigned NOT NULL,
+  `vm_id` bigint(20) unsigned NOT NULL,
+  `volume_id` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `net_io_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `net_io_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `current_io_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `current_io_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_io_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_io_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `net_bytes_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `net_bytes_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `current_bytes_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `current_bytes_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_bytes_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_bytes_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  PRIMARY KEY (`id`),
+  UNIQUE KEY `id` (`id`),
+  UNIQUE KEY `account_id` (`account_id`,`data_center_id`,`vm_id`,`volume_id`)
+) ENGINE=InnoDB CHARSET=utf8;
+
+insert into `cloud_usage`.`vm_disk_statistics` select * from `cloud`.`vm_disk_statistics`;
+
+DROP TABLE IF EXISTS `cloud_usage`.`usage_vm_disk`;
+CREATE TABLE `cloud_usage`.`usage_vm_disk` (
+  `account_id` bigint(20) unsigned NOT NULL,
+  `zone_id` bigint(20) unsigned NOT NULL,
+  `vm_id` bigint(20) unsigned NOT NULL,
+  `volume_id` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `io_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `io_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_io_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_io_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `bytes_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `bytes_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_bytes_read` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `agg_bytes_write` bigint(20) unsigned NOT NULL DEFAULT '0',
+  `event_time_millis` bigint(20) unsigned NOT NULL DEFAULT '0',
+  PRIMARY KEY (`account_id`,`zone_id`,`vm_id`,`volume_id`,`event_time_millis`)
+) ENGINE=InnoDB CHARSET=utf8;
+
+INSERT IGNORE INTO `cloud`.`configuration` VALUES ('Advanced', 'DEFAULT', 'management-server', 'vm.disk.stats.interval', 0, 'Interval (in seconds) to report vm disk statistics.');
+
 
 -- Re-enable foreign key checking, at the end of the upgrade path
 SET foreign_key_checks = 1;

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/ui/dictionary.jsp
----------------------------------------------------------------------
diff --git a/ui/dictionary.jsp b/ui/dictionary.jsp
index ded9ea0..d7f7dd5 100644
--- a/ui/dictionary.jsp
+++ b/ui/dictionary.jsp
@@ -469,11 +469,15 @@ dictionary = {
 'label.disable.vpn': '<fmt:message key="label.disable.vpn" />',
 'label.disabling.vpn.access': '<fmt:message key="label.disabling.vpn.access" />',
 'label.disk.allocated': '<fmt:message key="label.disk.allocated" />',
+'label.disk.read.bytes': '<fmt:message key="label.disk.read.bytes" />',
+'label.disk.read.io': '<fmt:message key="label.disk.read.io" />',
 'label.disk.offering': '<fmt:message key="label.disk.offering" />',
 'label.disk.size': '<fmt:message key="label.disk.size" />',
 'label.disk.size.gb': '<fmt:message key="label.disk.size.gb" />',
 'label.disk.total': '<fmt:message key="label.disk.total" />',
 'label.disk.volume': '<fmt:message key="label.disk.volume" />',
+'label.disk.write.bytes': '<fmt:message key="label.disk.write.bytes" />',
+'label.disk.write.io': '<fmt:message key="label.disk.write.io" />',
 'label.display.name': '<fmt:message key="label.display.name" />',
 'label.display.text': '<fmt:message key="label.display.text" />',
 'label.dns.1': '<fmt:message key="label.dns.1" />',

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/ui/scripts/instances.js
----------------------------------------------------------------------
diff --git a/ui/scripts/instances.js b/ui/scripts/instances.js
index 31237a8..6a589ba 100644
--- a/ui/scripts/instances.js
+++ b/ui/scripts/instances.js
@@ -1648,7 +1648,11 @@
               totalCPU: { label: 'label.total.cpu' },
               cpuused: { label: 'label.cpu.utilized' },
               networkkbsread: { label: 'label.network.read' },
-              networkkbswrite: { label: 'label.network.write' }
+              networkkbswrite: { label: 'label.network.write' },
+              diskkbsread: { label: 'label.disk.read.bytes' },
+              diskkbswrite: { label: 'label.disk.write.bytes' },
+              diskioread: { label: 'label.disk.read.io' },
+              diskiowrite: { label: 'label.disk.write.io' }
             },
             dataProvider: function(args) {
               $.ajax({
@@ -1662,7 +1666,11 @@
                   totalCPU: jsonObj.cpunumber + " x " + cloudStack.converters.convertHz(jsonObj.cpuspeed),
                   cpuused: jsonObj.cpuused,
                   networkkbsread: (jsonObj.networkkbsread == null)? "N/A": cloudStack.converters.convertBytes(jsonObj.networkkbsread * 1024),
-                  networkkbswrite: (jsonObj.networkkbswrite == null)? "N/A": cloudStack.converters.convertBytes(jsonObj.networkkbswrite * 1024)
+                  networkkbswrite: (jsonObj.networkkbswrite == null)? "N/A": cloudStack.converters.convertBytes(jsonObj.networkkbswrite * 1024),
+                  diskkbsread: (jsonObj.diskkbsread == null)? "N/A": cloudStack.converters.convertBytes(jsonObj.diskkbsread * 1024),
+                  diskkbswrite: (jsonObj.diskkbswrite == null)? "N/A": cloudStack.converters.convertBytes(jsonObj.diskkbswrite * 1024),
+                  diskioread: (jsonObj.diskioread == null)? "N/A": cloudStack.converters.convertBytes(jsonObj.diskioread * 1024),
+                  diskiowrite: (jsonObj.diskiowrite == null)? "N/A": cloudStack.converters.convertBytes(jsonObj.diskiowrite * 1024)
                   }
                 });
               }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/usage/src/com/cloud/usage/UsageManagerImpl.java
----------------------------------------------------------------------
diff --git a/usage/src/com/cloud/usage/UsageManagerImpl.java b/usage/src/com/cloud/usage/UsageManagerImpl.java
index 0c2ad6e..65f354c 100644
--- a/usage/src/com/cloud/usage/UsageManagerImpl.java
+++ b/usage/src/com/cloud/usage/UsageManagerImpl.java
@@ -54,6 +54,7 @@ import com.cloud.usage.dao.UsageSecurityGroupDao;
 import com.cloud.usage.dao.UsageStorageDao;
 import com.cloud.usage.dao.UsageVMInstanceDao;
 import com.cloud.usage.dao.UsageVPNUserDao;
+import com.cloud.usage.dao.UsageVmDiskDao;
 import com.cloud.usage.dao.UsageVolumeDao;
 import com.cloud.usage.parser.IPAddressUsageParser;
 import com.cloud.usage.parser.LoadBalancerUsageParser;
@@ -64,13 +65,15 @@ import com.cloud.usage.parser.SecurityGroupUsageParser;
 import com.cloud.usage.parser.StorageUsageParser;
 import com.cloud.usage.parser.VMInstanceUsageParser;
 import com.cloud.usage.parser.VPNUserUsageParser;
+import com.cloud.usage.parser.VmDiskUsageParser;
 import com.cloud.usage.parser.VolumeUsageParser;
 import com.cloud.user.Account;
 import com.cloud.user.AccountVO;
 import com.cloud.user.UserStatisticsVO;
+import com.cloud.user.VmDiskStatisticsVO;
 import com.cloud.user.dao.AccountDao;
 import com.cloud.user.dao.UserStatisticsDao;
-
+import com.cloud.user.dao.VmDiskStatisticsDao;
 
 import com.cloud.utils.component.ManagerBase;
 import com.cloud.utils.concurrency.NamedThreadFactory;
@@ -108,6 +111,8 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
     @Inject private UsageVPNUserDao m_usageVPNUserDao;
     @Inject private UsageSecurityGroupDao m_usageSecurityGroupDao;
     @Inject private UsageJobDao m_usageJobDao;
+    @Inject private VmDiskStatisticsDao m_vmDiskStatsDao;
+    @Inject private UsageVmDiskDao m_usageVmDiskDao;
     @Inject protected AlertManager _alertMgr;
     @Inject protected UsageEventDao _usageEventDao;
     @Inject ConfigurationDao _configDao;
@@ -121,6 +126,7 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
     TimeZone m_usageTimezone = TimeZone.getTimeZone("GMT");;
     private final GlobalLock m_heartbeatLock = GlobalLock.getInternLock("usage.job.heartbeat.check");
     private List<UsageNetworkVO> usageNetworks = new ArrayList<UsageNetworkVO>();
+    private List<UsageVmDiskVO> usageVmDisks = new ArrayList<UsageVmDiskVO>();
 
     private final ScheduledExecutorService m_executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-Job"));
     private final ScheduledExecutorService m_heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Usage-HB"));
@@ -389,6 +395,8 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
             List<AccountVO> accounts = null;
             List<UserStatisticsVO> userStats = null;
             Map<String, UsageNetworkVO> networkStats = null;
+            List<VmDiskStatisticsVO> vmDiskStats = null;
+            Map<String, UsageVmDiskVO> vmDiskUsages = null;
             Transaction userTxn = Transaction.open(Transaction.CLOUD_DB);
             try {
                 Long limit = Long.valueOf(500);
@@ -479,6 +487,46 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
                     }
                     offset = new Long(offset.longValue() + limit.longValue());
                 } while ((userStats != null) && !userStats.isEmpty());
+
+                // reset offset
+                offset = Long.valueOf(0);
+
+                // get all the vm network stats to create usage_vm_network records for the vm network usage
+                Long lastVmDiskStatsId = m_usageDao.getLastVmDiskStatsId();
+                if (lastVmDiskStatsId == null) {
+                       lastVmDiskStatsId = Long.valueOf(0);
+                }
+                SearchCriteria<VmDiskStatisticsVO> sc4 = m_vmDiskStatsDao.createSearchCriteria();
+                sc4.addAnd("id", SearchCriteria.Op.LTEQ, lastVmDiskStatsId);
+                do {
+                    Filter filter = new Filter(VmDiskStatisticsVO.class, "id", true, offset, limit);
+
+                    vmDiskStats = m_vmDiskStatsDao.search(sc4, filter);
+
+                    if ((vmDiskStats != null) && !vmDiskStats.isEmpty()) {
+                        // now copy the accounts to cloud_usage db
+                        m_usageDao.updateVmDiskStats(vmDiskStats);
+                    }
+                    offset = new Long(offset.longValue() + limit.longValue());
+                } while ((vmDiskStats != null) && !vmDiskStats.isEmpty());
+
+                // reset offset
+                offset = Long.valueOf(0);
+
+                sc4 = m_vmDiskStatsDao.createSearchCriteria();
+                sc4.addAnd("id", SearchCriteria.Op.GT, lastVmDiskStatsId);
+                do {
+                    Filter filter = new Filter(VmDiskStatisticsVO.class, "id", true, offset, limit);
+
+                    vmDiskStats = m_vmDiskStatsDao.search(sc4, filter);
+
+                    if ((vmDiskStats != null) && !vmDiskStats.isEmpty()) {
+                        // now copy the accounts to cloud_usage db
+                        m_usageDao.saveVmDiskStats(vmDiskStats);
+                    }
+                    offset = new Long(offset.longValue() + limit.longValue());
+                } while ((vmDiskStats != null) && !vmDiskStats.isEmpty());
+
             } finally {
                 userTxn.close();
             }
@@ -565,6 +613,53 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
                     s_logger.debug("created network stats helper entries for " + numAcctsProcessed + " accts");
                 }
 
+                // get vm disk stats in order to compute vm disk usage
+                vmDiskUsages = m_usageVmDiskDao.getRecentVmDiskStats();
+
+                // Keep track of user stats for an account, across all of its public IPs
+                Map<String, VmDiskStatisticsVO> aggregatedDiskStats = new HashMap<String, VmDiskStatisticsVO>();
+                startIndex = 0;
+                do {
+                       vmDiskStats = m_vmDiskStatsDao.listActiveAndRecentlyDeleted(recentlyDeletedDate, startIndex, 500);
+
+                    if (vmDiskUsages != null) {
+                        for (VmDiskStatisticsVO vmDiskStat : vmDiskStats) {
+                            if(vmDiskStat.getVmId() != null){
+                                String hostKey = vmDiskStat.getDataCenterId() + "-" + vmDiskStat.getAccountId()+"-Vm-" + vmDiskStat.getVmId()+"-Disk-" + vmDiskStat.getVolumeId();
+                                VmDiskStatisticsVO hostAggregatedStat = aggregatedDiskStats.get(hostKey);
+                                if (hostAggregatedStat == null) {
+                                    hostAggregatedStat = new VmDiskStatisticsVO(vmDiskStat.getAccountId(), vmDiskStat.getDataCenterId(), vmDiskStat.getVmId(),vmDiskStat.getVolumeId());
+                                }
+
+                                hostAggregatedStat.setAggIORead(hostAggregatedStat.getAggIORead() + vmDiskStat.getAggIORead());
+                                hostAggregatedStat.setAggIOWrite(hostAggregatedStat.getAggIOWrite() + vmDiskStat.getAggIOWrite());
+                                hostAggregatedStat.setAggBytesRead(hostAggregatedStat.getAggBytesRead() + vmDiskStat.getAggBytesRead());
+                                hostAggregatedStat.setAggBytesWrite(hostAggregatedStat.getAggBytesWrite() + vmDiskStat.getAggBytesWrite());
+                                aggregatedDiskStats.put(hostKey, hostAggregatedStat);
+                            }
+                        }
+                    }
+                    startIndex += 500;
+                } while ((userStats != null) && !userStats.isEmpty());
+
+                // loop over the user stats, create delta entries in the usage_disk helper table
+                numAcctsProcessed = 0;
+                usageVmDisks.clear();
+                for (String key : aggregatedDiskStats.keySet()) {
+                       UsageVmDiskVO currentVmDiskStats = null;
+                    if (vmDiskStats != null) {
+                        currentVmDiskStats = vmDiskUsages.get(key);
+                    }
+
+                    createVmDiskHelperEntry(aggregatedDiskStats.get(key), currentVmDiskStats, endDateMillis);
+                    numAcctsProcessed++;
+                }
+                m_usageVmDiskDao.saveUsageVmDisks(usageVmDisks);
+
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("created vm disk stats helper entries for " + numAcctsProcessed + " accts");
+                }
+
                 // commit the helper records, then start a new transaction
                 usageTxn.commit();
                 usageTxn.start();
@@ -701,6 +796,13 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
             }
         }
 
+        parsed = VmDiskUsageParser.parse(account, currentStartDate, currentEndDate);
+        if (s_logger.isDebugEnabled()) {
+            if (!parsed) {
+                s_logger.debug("vm disk usage successfully parsed? " + parsed + " (for account: " + account.getAccountName() + ", id: " + account.getId() + ")");
+            }
+        }
+
         parsed = VolumeUsageParser.parse(account, currentStartDate, currentEndDate);
         if (s_logger.isDebugEnabled()) {
             if (!parsed) {
@@ -1006,6 +1108,59 @@ public class UsageManagerImpl extends ManagerBase implements UsageManager, Runna
         usageNetworks.add(usageNetworkVO);
     }
 
+    private void createVmDiskHelperEntry(VmDiskStatisticsVO vmDiskStat, UsageVmDiskVO usageVmDiskStat, long timestamp) {
+        long currentAccountedIORead = 0L;
+        long currentAccountedIOWrite = 0L;
+        long currentAccountedBytesRead = 0L;
+        long currentAccountedBytesWrite = 0L;
+        if (usageVmDiskStat != null) {
+            if (s_logger.isDebugEnabled()) {
+                s_logger.debug("getting current accounted bytes for... accountId: " + usageVmDiskStat.getAccountId() + " in zone: " + vmDiskStat.getDataCenterId() + "; aiw: " + vmDiskStat.getAggIOWrite() +
+                        "; air: " + usageVmDiskStat.getAggIORead() + "; abw: " + vmDiskStat.getAggBytesWrite() + "; abr: " + usageVmDiskStat.getAggBytesRead());
+            }
+            currentAccountedIORead = usageVmDiskStat.getAggIORead();
+            currentAccountedIOWrite = usageVmDiskStat.getAggIOWrite();
+            currentAccountedBytesRead = usageVmDiskStat.getAggBytesRead();
+            currentAccountedBytesWrite = usageVmDiskStat.getAggBytesWrite();
+        }
+        long ioRead = vmDiskStat.getAggIORead()  - currentAccountedIORead;
+        long ioWrite = vmDiskStat.getAggIOWrite() - currentAccountedIOWrite;
+        long bytesRead = vmDiskStat.getAggBytesRead()  - currentAccountedBytesRead;
+        long bytesWrite = vmDiskStat.getAggBytesWrite() - currentAccountedBytesWrite;
+
+        if (ioRead < 0) {
+            s_logger.warn("Calculated negative value for io read: " + ioRead + ", vm disk stats say: " + vmDiskStat.getAggIORead() + ", previous vm disk usage was: " + currentAccountedIORead);
+            ioRead = 0;
+        }
+        if (ioWrite < 0) {
+            s_logger.warn("Calculated negative value for io write: " + ioWrite + ", vm disk stats say: " + vmDiskStat.getAggIOWrite() + ", previous vm disk usage was: " + currentAccountedIOWrite);
+            ioWrite = 0;
+        }
+        if (bytesRead < 0) {
+            s_logger.warn("Calculated negative value for bytes read: " + bytesRead + ", vm disk stats say: " + vmDiskStat.getAggBytesRead() + ", previous vm disk usage was: " + currentAccountedBytesRead);
+            bytesRead = 0;
+        }
+        if (bytesWrite < 0) {
+            s_logger.warn("Calculated negative value for bytes write: " + bytesWrite + ", vm disk stats say: " + vmDiskStat.getAggBytesWrite() + ", previous vm disk usage was: " + currentAccountedBytesWrite);
+            bytesWrite = 0;
+        }
+
+        long vmId = 0;
+
+        if(vmDiskStat.getVmId() != null){
+               vmId = vmDiskStat.getVmId();
+        }
+
+        UsageVmDiskVO usageVmDiskVO = new UsageVmDiskVO(vmDiskStat.getAccountId(), vmDiskStat.getDataCenterId(), vmId, vmDiskStat.getVolumeId(), ioRead, ioWrite,
+                       vmDiskStat.getAggIORead(), vmDiskStat.getAggIOWrite(), bytesRead, bytesWrite, vmDiskStat.getAggBytesRead(), vmDiskStat.getAggBytesWrite(), timestamp);
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("creating vmDiskHelperEntry... accountId: " + vmDiskStat.getAccountId() + " in zone: " + vmDiskStat.getDataCenterId() + "; aiw: " + vmDiskStat.getAggIOWrite() + "; air: " + vmDiskStat.getAggIORead() +
+                    "; curAIR: " + currentAccountedIORead + "; curAIW: " + currentAccountedIOWrite + "; uir: " + ioRead + "; uiw: " + ioWrite + "; abw: " + vmDiskStat.getAggBytesWrite() + "; abr: " + vmDiskStat.getAggBytesRead() +
+                    "; curABR: " + currentAccountedBytesRead + "; curABW: " + currentAccountedBytesWrite + "; ubr: " + bytesRead + "; ubw: " + bytesWrite);
+        }
+        usageVmDisks.add(usageVmDiskVO);
+    }
+
     private void createIPHelperEvent(UsageEventVO event) {
 
         String ipAddress = event.getResourceName();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/usage/src/com/cloud/usage/parser/VmDiskUsageParser.java
----------------------------------------------------------------------
diff --git a/usage/src/com/cloud/usage/parser/VmDiskUsageParser.java b/usage/src/com/cloud/usage/parser/VmDiskUsageParser.java
new file mode 100644
index 0000000..b8a5f98
--- /dev/null
+++ b/usage/src/com/cloud/usage/parser/VmDiskUsageParser.java
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.usage.parser;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+
+import org.apache.log4j.Logger;
+import org.apache.cloudstack.usage.UsageTypes;
+
+import com.cloud.usage.UsageVO;
+import com.cloud.usage.UsageVmDiskVO;
+import com.cloud.usage.dao.UsageDao;
+import com.cloud.usage.dao.UsageVmDiskDao;
+import com.cloud.user.AccountVO;
+
+import com.cloud.utils.db.SearchCriteria;
+import org.springframework.stereotype.Component;
+
+@Component
+public class VmDiskUsageParser {
+public static final Logger s_logger = Logger.getLogger(VmDiskUsageParser.class.getName());
+
+    private static UsageDao m_usageDao;
+    private static UsageVmDiskDao m_usageVmDiskDao;
+
+    @Inject private UsageDao _usageDao;
+    @Inject private UsageVmDiskDao _usageVmDiskDao;
+
+    @PostConstruct
+    void init() {
+        m_usageDao = _usageDao;
+        m_usageVmDiskDao = _usageVmDiskDao;
+    }
+
+    public static boolean parse(AccountVO account, Date startDate, Date endDate) {
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Parsing all Vm Disk usage events for account: " + account.getId());
+        }
+
+        if ((endDate == null) || endDate.after(new Date())) {
+            endDate = new Date();
+        }
+
+        // - query usage_disk table for all entries for userId with
+        // event_date in the given range
+        SearchCriteria<UsageVmDiskVO> sc = m_usageVmDiskDao.createSearchCriteria();
+        sc.addAnd("accountId", SearchCriteria.Op.EQ, account.getId());
+        sc.addAnd("eventTimeMillis", SearchCriteria.Op.BETWEEN, startDate.getTime(), endDate.getTime());
+        List<UsageVmDiskVO> usageVmDiskVOs = m_usageVmDiskDao.search(sc, null);
+
+        Map<String, VmDiskInfo> vmDiskUsageByZone = new HashMap<String, VmDiskInfo>();
+
+        // Calculate the bytes since last parsing
+        for (UsageVmDiskVO usageVmDisk : usageVmDiskVOs) {
+            long zoneId = usageVmDisk.getZoneId();
+            String key = ""+zoneId;
+            if(usageVmDisk.getVmId() != 0){
+                key += "-Vm-" + usageVmDisk.getVmId()+"-Disk-" + usageVmDisk.getVolumeId();
+            }
+            VmDiskInfo vmDiskInfo = vmDiskUsageByZone.get(key);
+
+            long ioRead = usageVmDisk.getIORead();
+            long ioWrite = usageVmDisk.getIOWrite();
+            long bytesRead = usageVmDisk.getBytesRead();
+            long bytesWrite = usageVmDisk.getBytesWrite();
+            if (vmDiskInfo != null) {
+                ioRead += vmDiskInfo.getIORead();
+                ioWrite += vmDiskInfo.getIOWrite();
+                bytesRead += vmDiskInfo.getBytesRead();
+                bytesWrite += vmDiskInfo.getBytesWrite();
+            }
+
+            vmDiskUsageByZone.put(key, new VmDiskInfo(zoneId, usageVmDisk.getVmId(), usageVmDisk.getVolumeId(), ioRead, ioWrite, bytesRead, bytesWrite));
+        }
+
+        for (String key : vmDiskUsageByZone.keySet()) {
+            VmDiskInfo vmDiskInfo = vmDiskUsageByZone.get(key);
+            long ioRead = vmDiskInfo.getIORead();
+            long ioWrite = vmDiskInfo.getIOWrite();
+            long bytesRead = vmDiskInfo.getBytesRead();
+            long bytesWrite = vmDiskInfo.getBytesWrite();
+
+            if ((ioRead > 0L) || (ioWrite > 0L) || (bytesRead > 0L) || (bytesWrite > 0L)) {
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("Creating vm disk usage record, io read:" + ioRead + ", io write: " + ioWrite + "bytes read:" + bytesRead + ", bytes write: " + bytesWrite + "for account: "
+                            + account.getId() + " in availability zone " + vmDiskInfo.getZoneId() + ", start: " + startDate + ", end: " + endDate);
+                }
+
+                Long vmId = null;
+                
+                // Create the usage record for bytes read
+                String usageDesc = "disk bytes read";
+                if(vmDiskInfo.getVmId() != 0){
+                    vmId = vmDiskInfo.getVmId();
+                    usageDesc += " for Vm: "+vmDiskInfo.getVmId()+" and Volume: "+ vmDiskInfo.getVolumeId(); 
+                }
+                UsageVO usageRecord = new UsageVO(vmDiskInfo.getZoneId(), account.getId(), account.getDomainId(), usageDesc, ioRead + " io read",
+                        UsageTypes.VM_DISK_IO_READ, new Double(ioRead), vmId, "VirtualMachine", vmDiskInfo.getVolumeId(), startDate, endDate);
+                m_usageDao.persist(usageRecord);
+
+                // Create the usage record for bytes write
+                usageDesc = "disk bytes write";
+                if(vmDiskInfo.getVmId() != 0){
+                    usageDesc += " for Vm: "+vmDiskInfo.getVmId()+" and Volume: "+ vmDiskInfo.getVolumeId(); 
+                }
+                usageRecord = new UsageVO(vmDiskInfo.getZoneId(), account.getId(), account.getDomainId(), usageDesc, ioWrite + " io write",
+                        UsageTypes.VM_DISK_BYTES_WRITE, new Double(ioWrite), vmId, "VirtualMachine", vmDiskInfo.getVolumeId(), startDate, endDate);
+                m_usageDao.persist(usageRecord);
+                
+                // Create the usage record for bytes read
+                usageDesc = "disk bytes read";
+                if(vmDiskInfo.getVmId() != 0){
+                    vmId = vmDiskInfo.getVmId();
+                    usageDesc += " for Vm: "+vmDiskInfo.getVmId()+" and Volume: "+ vmDiskInfo.getVolumeId(); 
+                }
+                usageRecord = new UsageVO(vmDiskInfo.getZoneId(), account.getId(), account.getDomainId(), usageDesc, bytesRead + " bytes read",
+                        UsageTypes.VM_DISK_BYTES_READ, new Double(bytesRead), vmId, "VirtualMachine", vmDiskInfo.getVolumeId(), startDate, endDate);
+                m_usageDao.persist(usageRecord);
+
+                // Create the usage record for bytes write
+                usageDesc = "disk bytes write";
+                if(vmDiskInfo.getVmId() != 0){
+                    usageDesc += " for Vm: "+vmDiskInfo.getVmId()+" and Volume: "+ vmDiskInfo.getVolumeId(); 
+                }
+                usageRecord = new UsageVO(vmDiskInfo.getZoneId(), account.getId(), account.getDomainId(), usageDesc, bytesWrite + " bytes write",
+                        UsageTypes.VM_DISK_BYTES_WRITE, new Double(bytesWrite), vmId, "VirtualMachine", vmDiskInfo.getVolumeId(), startDate, endDate);
+                m_usageDao.persist(usageRecord);
+                
+            } else {
+                // Don't charge anything if there were zero bytes processed
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("No vm disk usage record (0 bytes used) generated for account: " + account.getId());
+                }
+            }
+        }
+
+        return true;
+    }
+    
+    private static class VmDiskInfo {
+        private long zoneId;
+        private long vmId;
+        private Long volumeId;
+        private long ioRead;
+        private long ioWrite;
+        private long bytesRead;
+        private long bytesWrite;
+
+        public VmDiskInfo(long zoneId, long vmId, Long volumeId, long ioRead, long ioWrite, long bytesRead, long bytesWrite) {
+            this.zoneId = zoneId;
+            this.vmId = vmId;
+            this.volumeId = volumeId;
+            this.ioRead = ioRead;
+            this.ioWrite = ioWrite;
+            this.bytesRead = bytesRead;
+            this.bytesWrite = bytesWrite;
+        }
+
+        public long getZoneId() {
+            return zoneId;
+        }
+
+        public long getVmId() {
+            return vmId;
+        }
+        
+        public Long getVolumeId() {
+            return volumeId;
+        }
+
+        public long getIORead() {
+            return ioRead;
+        }
+
+        public long getIOWrite() {
+            return ioWrite;
+        }
+        
+        public long getBytesRead() {
+            return bytesRead;
+        }
+
+        public long getBytesWrite() {
+            return bytesWrite;
+        }
+    
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/usage/test/com/cloud/usage/UsageManagerTest.java
----------------------------------------------------------------------
diff --git a/usage/test/com/cloud/usage/UsageManagerTest.java b/usage/test/com/cloud/usage/UsageManagerTest.java
index eac3fcb..520ab26 100644
--- a/usage/test/com/cloud/usage/UsageManagerTest.java
+++ b/usage/test/com/cloud/usage/UsageManagerTest.java
@@ -46,6 +46,8 @@ public class UsageManagerTest extends TestCase {
     @Inject
     NetworkUsageParser netParser = null;
     @Inject
+    VmDiskUsageParser vmdiskParser = null;
+    @Inject
     PortForwardingUsageParser pfParser = null;
     @Inject
     SecurityGroupUsageParser sgParser = null;
@@ -87,6 +89,7 @@ public class UsageManagerTest extends TestCase {
         lbParser.parse(account, startDate, endDate);
         noParser.parse(account, startDate, endDate);
         netParser.parse(account, startDate, endDate);
+        vmdiskParser.parse(account, startDate, endDate);
         pfParser.parse(account, startDate, endDate);
         sgParser.parse(account, startDate, endDate);
         stParser.parse(account, startDate, endDate);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/b9b0168d/usage/test/com/cloud/usage/UsageManagerTestConfiguration.java
----------------------------------------------------------------------
diff --git a/usage/test/com/cloud/usage/UsageManagerTestConfiguration.java b/usage/test/com/cloud/usage/UsageManagerTestConfiguration.java
index 1d3ed7b..1a342b5 100644
--- a/usage/test/com/cloud/usage/UsageManagerTestConfiguration.java
+++ b/usage/test/com/cloud/usage/UsageManagerTestConfiguration.java
@@ -53,6 +53,7 @@ import java.io.IOException;
         UsagePortForwardingRuleDaoImpl.class,
         UsageNetworkOfferingDaoImpl.class,
         UsageVPNUserDaoImpl.class,
+        UsageVmDiskDaoImpl.class,
         UsageSecurityGroupDaoImpl.class,
         ConfigurationDaoImpl.class,
         UsageManagerImpl.class,
@@ -64,6 +65,7 @@ import java.io.IOException;
         PortForwardingUsageParser.class,
         SecurityGroupUsageParser.class,
         StorageUsageParser.class,
+        VmDiskUsageParser.class,
         VolumeUsageParser.class,
         VPNUserUsageParser.class,
         UserStatisticsDaoImpl.class},