You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/06/14 01:34:41 UTC

[8/8] git commit: updated refs/heads/vmsync to aff0220

Moved files into cloud-engine


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/aff0220d
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/aff0220d
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/aff0220d

Branch: refs/heads/vmsync
Commit: aff0220d453f2414546487a2a5b6e5a17ec8bb34
Parents: 3ef77bc
Author: Alex Huang <al...@gmail.com>
Authored: Thu Jun 13 16:34:41 2013 -0700
Committer: Alex Huang <al...@gmail.com>
Committed: Thu Jun 13 16:34:41 2013 -0700

----------------------------------------------------------------------
 .../cloud/cluster/ClusterManagerListener.java   |  26 ++
 .../src/com/cloud/cluster/ClusterManager.java   |  67 +++
 .../com/cloud/cluster/ClusterServicePdu.java    | 112 +++++
 .../vm/ClusteredVirtualMachineManagerImpl.java  |  63 +++
 .../com/cloud/vm/VirtualMachineManagerImpl.java |  39 +-
 .../src/com/cloud/vm/VmWorkJobDispatcher.java   |  26 +-
 .../cloud/vm/VirtualMachineManagerImplTest.java | 420 ++++++++++++++++++
 .../test/com/cloud/vm/VmWorkTest.java           | 179 ++++++++
 .../cloud/vm/VmWorkTestApiJobDispatcher.java    |  84 ++++
 .../manager/allocator/impl/RandomAllocator.java |   2 +-
 .../manager/ClusteredAgentManagerImpl.java      |  10 +-
 .../allocator/impl/FirstFitAllocator.java       |   2 +-
 .../allocator/impl/TestingAllocator.java        |   2 +-
 .../com/cloud/async/AsyncJobManagerImpl.java    |   8 +-
 .../cloud/cluster/ClusterFenceManagerImpl.java  |   4 +-
 .../src/com/cloud/cluster/ClusterManager.java   |  67 ---
 .../cloud/cluster/ClusterManagerListener.java   |  25 --
 .../com/cloud/cluster/ClusterServicePdu.java    | 112 -----
 .../com/cloud/cluster/LockMasterListener.java   |   6 +-
 .../cloud/ha/HighAvailabilityManagerImpl.java   |   8 +-
 .../com/cloud/storage/StorageManagerImpl.java   |  15 +-
 .../vm/ClusteredVirtualMachineManagerImpl.java  |  63 ---
 .../com/cloud/async/TestAsyncJobManager.java    |  36 +-
 .../cloud/vm/VirtualMachineManagerImplTest.java | 433 -------------------
 server/test/com/cloud/vm/VmWorkTest.java        | 180 --------
 .../cloud/vm/VmWorkTestApiJobDispatcher.java    |  85 ----
 .../cloud/vpc/MockConfigurationManagerImpl.java |  18 -
 27 files changed, 1027 insertions(+), 1065 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/api/src/com/cloud/cluster/ClusterManagerListener.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/cluster/ClusterManagerListener.java b/api/src/com/cloud/cluster/ClusterManagerListener.java
new file mode 100644
index 0000000..1231434
--- /dev/null
+++ b/api/src/com/cloud/cluster/ClusterManagerListener.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.cluster;
+
+import java.util.List;
+
+public interface ClusterManagerListener {
+    void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId);
+
+    void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId);
+	void onManagementNodeIsolated();
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/engine/components-api/src/com/cloud/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/cluster/ClusterManager.java b/engine/components-api/src/com/cloud/cluster/ClusterManager.java
new file mode 100755
index 0000000..017ba31
--- /dev/null
+++ b/engine/components-api/src/com/cloud/cluster/ClusterManager.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.cluster;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.Command;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.host.Status.Event;
+import com.cloud.resource.ResourceState;
+import com.cloud.utils.component.Manager;
+
+public interface ClusterManager extends Manager {
+	public static final int DEFAULT_HEARTBEAT_INTERVAL = 1500;
+	public static final int DEFAULT_HEARTBEAT_THRESHOLD = 150000;
+	public static final String ALERT_SUBJECT = "cluster-alert";
+	
+	public void OnReceiveClusterServicePdu(ClusterServicePdu pdu);
+    public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError);
+    public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError);
+
+    public Answer[] sendToAgent(Long hostId, Command []  cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException;
+    public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException;
+    public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException;
+    public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException;
+    public boolean executeResourceUserRequest(long hostId, ResourceState.Event event) throws AgentUnavailableException;
+	
+	public int getHeartbeatThreshold();
+	
+	public long getManagementNodeId();		// msid of current management server node
+    public boolean isManagementNodeAlive(long msid);
+    public boolean pingManagementNode(long msid);
+	public long getCurrentRunId();
+    
+	public String getSelfPeerName();
+	public String getSelfNodeIP();
+    public String getPeerName(long agentHostId);
+	
+	public void registerListener(ClusterManagerListener listener);
+	public void unregisterListener(ClusterManagerListener listener);
+    public ManagementServerHostVO getPeer(String peerName);
+    
+    /**
+     * Broadcast the command to all of the  management server nodes.
+     * @param agentId agent id this broadcast is regarding
+     * @param cmds commands to broadcast
+     */
+    public void broadcast(long agentId, Command[] cmds);
+    
+    boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException;
+    
+    boolean isAgentRebalanceEnabled();
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/engine/components-api/src/com/cloud/cluster/ClusterServicePdu.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/cluster/ClusterServicePdu.java b/engine/components-api/src/com/cloud/cluster/ClusterServicePdu.java
new file mode 100644
index 0000000..81ff5d8
--- /dev/null
+++ b/engine/components-api/src/com/cloud/cluster/ClusterServicePdu.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.cluster;
+
+public class ClusterServicePdu {
+	public final static int PDU_TYPE_MESSAGE = 0;
+	public final static int PDU_TYPE_REQUEST = 1;
+	public final static int PDU_TYPE_RESPONSE = 2;
+	
+    private long sequenceId;
+    private long ackSequenceId;
+    
+    private String sourcePeer;
+    private String destPeer;
+    
+    private long agentId;
+    private boolean stopOnError;
+    private String jsonPackage;
+    
+    private int pduType = PDU_TYPE_MESSAGE;
+    
+    private static long s_nextPduSequenceId = 1;
+    
+    public ClusterServicePdu() {
+        sequenceId = getNextPduSequenceId();
+        ackSequenceId = 0;
+        agentId = 0;
+        stopOnError = false;
+    }
+    
+    public synchronized long getNextPduSequenceId() {
+        return s_nextPduSequenceId++;
+    }
+
+    public long getSequenceId() {
+        return sequenceId;
+    }
+
+    public void setSequenceId(long sequenceId) {
+        this.sequenceId = sequenceId;
+    }
+
+    public long getAckSequenceId() {
+        return ackSequenceId;
+    }
+
+    public void setAckSequenceId(long ackSequenceId) {
+        this.ackSequenceId = ackSequenceId;
+    }
+
+    public String getSourcePeer() {
+        return sourcePeer;
+    }
+
+    public void setSourcePeer(String sourcePeer) {
+        this.sourcePeer = sourcePeer;
+    }
+
+    public String getDestPeer() {
+        return destPeer;
+    }
+
+    public void setDestPeer(String destPeer) {
+        this.destPeer = destPeer;
+    }
+
+    public long getAgentId() {
+        return agentId;
+    }
+
+    public void setAgentId(long agentId) {
+        this.agentId = agentId;
+    }
+
+    public boolean isStopOnError() {
+        return stopOnError;
+    }
+
+    public void setStopOnError(boolean stopOnError) {
+        this.stopOnError = stopOnError;
+    }
+
+    public String getJsonPackage() {
+        return jsonPackage;
+    }
+
+    public void setJsonPackage(String jsonPackage) {
+        this.jsonPackage = jsonPackage;
+    }
+    
+    public int getPduType() {
+    	return pduType;
+    }
+    
+    public void setPduType(int pduType) {
+    	this.pduType = pduType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/engine/orchestration/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java
new file mode 100644
index 0000000..fba788b
--- /dev/null
+++ b/engine/orchestration/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.ejb.Local;
+import javax.inject.Inject;
+import javax.naming.ConfigurationException;
+
+import com.cloud.cluster.ClusterManager;
+import com.cloud.cluster.ClusterManagerListener;
+import com.cloud.cluster.ManagementServerHost;
+
+@Local(value=VirtualMachineManager.class)
+public class ClusteredVirtualMachineManagerImpl extends VirtualMachineManagerImpl implements ClusterManagerListener {
+    @Inject
+    ClusterManager _clusterMgr;
+
+    protected ClusteredVirtualMachineManagerImpl() {
+    }
+    
+    @Override
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        
+    }
+    
+    @Override
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost node : nodeList) {
+            cancelWorkItems(node.getMsid());
+        }
+    }
+
+    @Override
+	public void onManagementNodeIsolated() {
+	}
+    
+    @Override
+    public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
+        super.configure(name, xmlParams);
+        
+        _clusterMgr.registerListener(this);
+        
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 3492e6a..a7e34f9 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -33,10 +33,6 @@ import javax.inject.Inject;
 import javax.naming.ConfigurationException;
 
 import org.apache.log4j.Logger;
-import org.apache.log4j.lf5.viewer.configure.ConfigurationManager;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 
 import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
 import org.apache.cloudstack.config.ConfigRepo;
@@ -84,9 +80,7 @@ import com.cloud.agent.api.to.VirtualMachineTO;
 import com.cloud.agent.manager.Commands;
 import com.cloud.agent.manager.allocator.HostAllocator;
 import com.cloud.alert.AlertManager;
-import com.cloud.api.StringMapTypeAdapter;
 import com.cloud.async.AsyncJobExecutionContext;
-import com.cloud.configuration.dao.ConfigurationDao;
 import com.cloud.dao.EntityManager;
 import com.cloud.dc.ClusterDetailsDao;
 import com.cloud.dc.ClusterDetailsVO;
@@ -200,15 +194,13 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     @Inject
     protected VolumeDao _volsDao;
     @Inject
-    protected ConfigurationManager _configMgr;
-    @Inject
     protected HighAvailabilityManager _haMgr;
     @Inject
     protected HypervisorGuruManager _hvGuruMgr;
     @Inject
     protected StoragePoolHostDao _poolHostDao;
     @Inject
-    protected RulesManager rulesMgr;
+    protected RulesManager _rulesMgr;
     @Inject
     protected AffinityGroupVMMapDao _affinityGroupVMMapDao;
 
@@ -239,11 +231,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     protected UserVmDetailsDao _uservmDetailsDao;
     
     @Inject
-    protected VMInstanceDao _instanceDao;
-
-    @Inject
-    protected ConfigurationDao _configDao;
-    @Inject
     VolumeManager _volumeMgr;
     
     @Inject protected MessageBus _messageBus;
@@ -267,7 +254,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     protected ConfigValue<Integer> _operationTimeout;
     protected ConfigValue<Boolean> _forceStop;
     protected long _nodeId;
-    protected Gson _gson;
 
     SearchBuilder<VolumeVO> RootVolumeSearch;
 
@@ -448,8 +434,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
     @Override
     public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
-        Map<String, String> params = _configDao.getConfiguration(xmlParams);
-
         _retry = _configRepo.get(Configs.StartRetry);
 
         _cancelWait = _configRepo.get(Configs.VmOpCancelInterval);
@@ -478,11 +462,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         
         _messageBus.subscribe(Topics.VM_POWER_STATE, MessageDispatcher.getDispatcher(this));
 
-        GsonBuilder gBuilder = new GsonBuilder();
-        gBuilder.setVersion(1.3);
-        gBuilder.registerTypeAdapter(Map.class, new StringMapTypeAdapter());
-        _gson = gBuilder.create();
-      
         return true;
     }
 
@@ -668,14 +647,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         return false;
     }
 
-    public String serialize(VmWork work) {
-        return _gson.toJson(work);
-    }
-
-    public <T extends VmWork> T deserialize(Class<T> clazz, String work) {
-        return _gson.fromJson(work, clazz);
-    }
-
     @Override
     @DB
     public void advanceStart(String vmUuid, Map<VirtualMachineProfile.Param, Object> params, DeploymentPlan planToDeploy)
@@ -718,7 +689,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         		workInfo.setVmId(vm.getId());
         		workInfo.setPlan(planToDeploy);
         		workInfo.setParams(params);
-                workJob.setCmdInfo(serialize(workInfo));
+                workJob.setCmdInfo(VmWorkJobDispatcher.serialize(workInfo));
         		
                 _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
         	}
@@ -1151,7 +1122,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         		workInfo.setUserId(user.getId());
         		workInfo.setVmId(vm.getId());
         		workInfo.setForceStop(forced);
-                workJob.setCmdInfo(serialize(workInfo));
+                workJob.setCmdInfo(VmWorkJobDispatcher.serialize(workInfo));
         		
                 _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, vm.getId());
         	}
@@ -3044,7 +3015,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         }
 
         // if specified nic is associated with PF/LB/Static NAT
-        if(rulesMgr.listAssociatedRulesForGuestNic(nic).size() > 0){
+        if(_rulesMgr.listAssociatedRulesForGuestNic(nic).size() > 0){
             throw new CloudRuntimeException("Failed to remove nic from " + vm + " in " + network
                     + ", nic has associated Port forwarding or Load balancer or Static NAT rules.");
         }
@@ -3437,7 +3408,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     		VirtualMachine.Type.Instance, vmId);
     	if(pendingWorkJobs.size() == 0) {
     		// there is no pending operation job
-    		VMInstanceVO vm = _instanceDao.findById(vmId);
+            VMInstanceVO vm = _vmDao.findById(vmId);
     		if(vm != null) {
     			switch(vm.getPowerState()) {
     			case PowerOn :

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
index e0be62f..8c7fd9c 100644
--- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
+++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
@@ -16,16 +16,22 @@
 // under the License.
 package com.cloud.vm;
 
+import java.util.Map;
+
 import javax.inject.Inject;
 
 import org.apache.log4j.Logger;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
 import org.apache.cloudstack.context.CallContext;
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 
+import com.cloud.api.StringMapTypeAdapter;
 import com.cloud.dao.EntityManager;
 import com.cloud.user.dao.AccountDao;
 import com.cloud.utils.component.AdapterBase;
@@ -34,6 +40,22 @@ import com.cloud.vm.dao.VMInstanceDao;
 public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatcher {
     private static final Logger s_logger = Logger.getLogger(VmWorkJobDispatcher.class);
 
+    protected static Gson s_gson;
+    static {
+        GsonBuilder gBuilder = new GsonBuilder();
+        gBuilder.setVersion(1.3);
+        gBuilder.registerTypeAdapter(Map.class, new StringMapTypeAdapter());
+        s_gson = gBuilder.create();
+    }
+
+    public static String serialize(VmWork work) {
+        return s_gson.toJson(work);
+    }
+
+    public static <T extends VmWork> T deserialize(Class<T> clazz, String work) {
+        return s_gson.fromJson(work, clazz);
+    }
+
     public static final String VM_WORK_QUEUE = "VmWorkJobQueue";
     public static final String VM_WORK_JOB_DISPATCHER = "VmWorkJobDispatcher";
     public static final String VM_WORK_JOB_WAKEUP_DISPATCHER = "VmWorkJobWakeupDispatcher";
@@ -56,9 +78,9 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
         	assert(cmd != null);
         	
         	if (cmd.equals(Start)) {
-                work = _vmMgr.deserialize(VmWorkStart.class, job.getCmdInfo());
+                work = deserialize(VmWorkStart.class, job.getCmdInfo());
             } else {
-                work = _vmMgr.deserialize(VmWorkStop.class, job.getCmdInfo());
+                work = deserialize(VmWorkStop.class, job.getCmdInfo());
             }
         	assert(work != null);
         	

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/engine/orchestration/test/com/cloud/vm/VirtualMachineManagerImplTest.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/test/com/cloud/vm/VirtualMachineManagerImplTest.java b/engine/orchestration/test/com/cloud/vm/VirtualMachineManagerImplTest.java
new file mode 100644
index 0000000..3035ba5
--- /dev/null
+++ b/engine/orchestration/test/com/cloud/vm/VirtualMachineManagerImplTest.java
@@ -0,0 +1,420 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.vm;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+
+import org.apache.cloudstack.api.command.user.vm.RestoreVMCmd;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.CheckVirtualMachineAnswer;
+import com.cloud.agent.api.CheckVirtualMachineCommand;
+import com.cloud.agent.api.MigrateWithStorageAnswer;
+import com.cloud.agent.api.MigrateWithStorageCommand;
+import com.cloud.agent.api.MigrateWithStorageCompleteAnswer;
+import com.cloud.agent.api.MigrateWithStorageCompleteCommand;
+import com.cloud.agent.api.MigrateWithStorageReceiveAnswer;
+import com.cloud.agent.api.MigrateWithStorageReceiveCommand;
+import com.cloud.agent.api.MigrateWithStorageSendAnswer;
+import com.cloud.agent.api.MigrateWithStorageSendCommand;
+import com.cloud.agent.api.PrepareForMigrationAnswer;
+import com.cloud.agent.api.PrepareForMigrationCommand;
+import com.cloud.agent.api.ScaleVmAnswer;
+import com.cloud.agent.api.ScaleVmCommand;
+import com.cloud.configuration.dao.ConfigurationDao;
+import com.cloud.dao.EntityManager;
+import com.cloud.dc.dao.ClusterDao;
+import com.cloud.dc.dao.DataCenterDao;
+import com.cloud.dc.dao.HostPodDao;
+import com.cloud.deploy.DeployDestination;
+import com.cloud.exception.ConcurrentOperationException;
+import com.cloud.exception.ManagementServerException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.exception.ResourceUnavailableException;
+import com.cloud.exception.VirtualMachineMigrationException;
+import com.cloud.host.HostVO;
+import com.cloud.host.dao.HostDao;
+import com.cloud.hypervisor.Hypervisor.HypervisorType;
+import com.cloud.hypervisor.HypervisorGuru;
+import com.cloud.hypervisor.HypervisorGuruManager;
+import com.cloud.network.NetworkManager;
+import com.cloud.offering.ServiceOffering;
+import com.cloud.service.ServiceOfferingVO;
+import com.cloud.storage.DiskOfferingVO;
+import com.cloud.storage.StoragePool;
+import com.cloud.storage.StoragePoolHostVO;
+import com.cloud.storage.VMTemplateVO;
+import com.cloud.storage.Volume;
+import com.cloud.storage.VolumeManager;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.DiskOfferingDao;
+import com.cloud.storage.dao.StoragePoolHostDao;
+import com.cloud.storage.dao.VMTemplateDao;
+import com.cloud.storage.dao.VolumeDao;
+import com.cloud.user.Account;
+import com.cloud.user.AccountVO;
+import com.cloud.user.UserVO;
+import com.cloud.user.dao.AccountDao;
+import com.cloud.user.dao.UserDao;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.vm.VirtualMachine.Event;
+import com.cloud.vm.VirtualMachine.PowerState;
+import com.cloud.vm.VirtualMachine.State;
+import com.cloud.vm.dao.UserVmDao;
+import com.cloud.vm.dao.VMInstanceDao;
+import com.cloud.vm.snapshot.VMSnapshotManager;
+
+public class VirtualMachineManagerImplTest {
+
+        @Spy VirtualMachineManagerImpl _vmMgr = new VirtualMachineManagerImpl();
+        @Mock
+        VolumeManager _storageMgr;
+        @Mock
+        Account _account;
+        @Mock
+        AgentManager _agentMgr;
+        @Mock
+        AccountDao _accountDao;
+        @Mock
+        ConfigurationDao _configDao;
+        @Mock
+        HostDao _hostDao;
+        @Mock
+        UserDao _userDao;
+        @Mock
+        UserVmDao _vmDao;
+        @Mock
+        ItWorkDao _workDao;
+        @Mock
+        VMInstanceDao _vmInstanceDao;
+        @Mock
+        VMTemplateDao _templateDao;
+        @Mock
+        VolumeDao _volsDao;
+        @Mock
+        RestoreVMCmd _restoreVMCmd;
+        @Mock
+        AccountVO _accountMock;
+        @Mock
+        UserVO _userMock;
+        @Mock
+        UserVmVO _vmMock;
+        @Mock
+        VMInstanceVO _vmInstance;
+        @Mock
+        HostVO _host;
+        @Mock
+        VMTemplateVO _templateMock;
+        @Mock
+        VolumeVO _volumeMock;
+        @Mock
+        List<VolumeVO> _rootVols;
+        @Mock
+        ItWorkVO _work;
+
+    @Mock
+    EntityManager _entityMgr;
+
+        @Mock ClusterDao _clusterDao;
+        @Mock HostPodDao _podDao;
+        @Mock DataCenterDao _dcDao;
+        @Mock DiskOfferingDao _diskOfferingDao;
+        @Mock PrimaryDataStoreDao _storagePoolDao;
+        @Mock StoragePoolHostDao _poolHostDao;
+        @Mock NetworkManager _networkMgr;
+        @Mock HypervisorGuruManager _hvGuruMgr;
+        @Mock VMSnapshotManager _vmSnapshotMgr;
+
+        // Mock objects for vm migration with storage test.
+        @Mock DiskOfferingVO _diskOfferingMock;
+        @Mock StoragePoolVO _srcStoragePoolMock;
+        @Mock StoragePoolVO _destStoragePoolMock;
+        @Mock HostVO _srcHostMock;
+        @Mock HostVO _destHostMock;
+    @Mock
+    Map<Volume, StoragePool> _volumeToPoolMock;
+
+        @Before
+        public void setup(){
+            MockitoAnnotations.initMocks(this);
+
+        _vmMgr._entityMgr = _entityMgr;
+            _vmMgr._volsDao = _volsDao;
+            _vmMgr._volumeMgr = _storageMgr;
+            _vmMgr._hostDao = _hostDao;
+            _vmMgr._nodeId = 1L;
+/*
+            _vmMgr._workDao = _workDao;
+*/
+            _vmMgr._agentMgr = _agentMgr;
+            _vmMgr._poolHostDao= _poolHostDao;
+            _vmMgr._networkMgr = _networkMgr;
+            _vmMgr._hvGuruMgr = _hvGuruMgr;
+            _vmMgr._vmSnapshotMgr = _vmSnapshotMgr;
+            _vmMgr._vmDao = _vmInstanceDao;
+
+            when(_vmMock.getId()).thenReturn(314l);
+            when(_vmInstance.getId()).thenReturn(1L);
+            when(_vmInstance.getServiceOfferingId()).thenReturn(2L);
+            when(_vmInstance.getInstanceName()).thenReturn("myVm");
+            when(_vmInstance.getHostId()).thenReturn(2L);
+            when(_vmInstance.getType()).thenReturn(VirtualMachine.Type.User);
+            when(_host.getId()).thenReturn(1L);
+            when(_hostDao.findById(anyLong())).thenReturn(null);
+        when(_entityMgr.findById(ServiceOffering.class, anyLong())).thenReturn(getSvcoffering(512));
+            when(_workDao.persist(_work)).thenReturn(_work);
+            when(_workDao.update("1", _work)).thenReturn(true);
+            when(_work.getId()).thenReturn("1");
+            doNothing().when(_work).setStep(ItWorkVO.Step.Done);
+            //doNothing().when(_volsDao).detachVolume(anyLong());
+            //when(_work.setStep(ItWorkVO.Step.Done)).thenReturn("1");
+
+        }
+
+
+    @Test(expected=CloudRuntimeException.class)
+    public void testScaleVM1()  throws Exception {
+
+
+        DeployDestination dest = new DeployDestination(null, null, null, _host);
+        long l = 1L;
+
+        when(_vmInstanceDao.findById(anyLong())).thenReturn(_vmInstance);
+        _vmMgr.migrateForScale(_vmInstance.getUuid(), l, dest, l);
+
+    }
+
+    @Test (expected=CloudRuntimeException.class)
+    public void testScaleVM2()  throws Exception {
+
+        DeployDestination dest = new DeployDestination(null, null, null, _host);
+        long l = 1L;
+
+        when(_vmInstanceDao.findById(anyLong())).thenReturn(_vmInstance);
+        ServiceOfferingVO newServiceOffering = getSvcoffering(512);
+        ScaleVmCommand reconfigureCmd = new ScaleVmCommand("myVmName", newServiceOffering.getCpu(),
+                newServiceOffering.getSpeed(), newServiceOffering.getRamSize(), newServiceOffering.getRamSize(), newServiceOffering.getLimitCpuUse());
+        Answer answer = new ScaleVmAnswer(reconfigureCmd, true, "details");
+        when(_agentMgr.send(2l, reconfigureCmd)).thenReturn(null);
+        _vmMgr.reConfigureVm(_vmInstance, getSvcoffering(256), false);
+
+    }
+
+    @Test (expected=CloudRuntimeException.class)
+    public void testScaleVM3()  throws Exception {
+
+        /*VirtualMachineProfile<VMInstanceVO> profile = new VirtualMachineProfileImpl<VMInstanceVO>(vm);
+
+        Long srcHostId = vm.getHostId();
+        Long oldSvcOfferingId = vm.getServiceOfferingId();
+        if (srcHostId == null) {
+            throw new CloudRuntimeException("Unable to scale the vm because it doesn't have a host id");
+        }*/
+
+        when(_vmInstance.getHostId()).thenReturn(null);
+        when(_vmInstanceDao.findById(anyLong())).thenReturn(_vmInstance);
+        _vmMgr.findHostAndMigrate(_vmInstance.getUuid(), 2l);
+
+    }
+
+
+    private ServiceOfferingVO getSvcoffering(int ramSize){
+
+        long id  = 4L;
+        String name = "name";
+        String displayText = "displayText";
+        int cpu = 1;
+        //int ramSize = 256;
+        int speed = 128;
+
+        boolean ha = false;
+        boolean useLocalStorage = false;
+
+        ServiceOfferingVO serviceOffering = new ServiceOfferingVO(name, cpu, ramSize, speed, null, null, ha, displayText, useLocalStorage, false, null, false, null, false);
+        return serviceOffering;
+    }
+
+    private void initializeMockConfigForMigratingVmWithVolumes() throws OperationTimedoutException,
+        ResourceUnavailableException {
+
+        // Mock the source and destination hosts.
+        when(_srcHostMock.getId()).thenReturn(5L);
+        when(_destHostMock.getId()).thenReturn(6L);
+        when(_hostDao.findById(5L)).thenReturn(_srcHostMock);
+        when(_hostDao.findById(6L)).thenReturn(_destHostMock);
+
+        // Mock the vm being migrated.
+        when(_vmMock.getId()).thenReturn(1L);
+        when(_vmMock.getHypervisorType()).thenReturn(HypervisorType.XenServer);
+        when(_vmMock.getState()).thenReturn(State.Running).thenReturn(State.Running).thenReturn(State.Migrating)
+            .thenReturn(State.Migrating);
+        when(_vmMock.getHostId()).thenReturn(5L);
+        when(_vmInstance.getId()).thenReturn(1L);
+        when(_vmInstance.getServiceOfferingId()).thenReturn(2L);
+        when(_vmInstance.getInstanceName()).thenReturn("myVm");
+        when(_vmInstance.getHostId()).thenReturn(5L);
+        when(_vmInstance.getType()).thenReturn(VirtualMachine.Type.User);
+        when(_vmInstance.getState()).thenReturn(State.Running).thenReturn(State.Running).thenReturn(State.Migrating)
+            .thenReturn(State.Migrating);
+
+        // Mock the work item.
+        when(_workDao.persist(any(ItWorkVO.class))).thenReturn(_work);
+        when(_workDao.update("1", _work)).thenReturn(true);
+        when(_work.getId()).thenReturn("1");
+        doNothing().when(_work).setStep(ItWorkVO.Step.Done);
+
+        // Mock the vm guru and the user vm object that gets returned.
+        _vmMgr._vmGurus = new HashMap<VirtualMachine.Type, VirtualMachineGuru>();
+
+        // Mock the iteration over all the volumes of an instance.
+        Iterator<VolumeVO> volumeIterator = mock(Iterator.class);
+        when(_volsDao.findUsableVolumesForInstance(anyLong())).thenReturn(_rootVols);
+        when(_rootVols.iterator()).thenReturn(volumeIterator);
+        when(volumeIterator.hasNext()).thenReturn(true, false);
+        when(volumeIterator.next()).thenReturn(_volumeMock);
+
+        // Mock the disk offering and pool objects for a volume.
+        when(_volumeMock.getDiskOfferingId()).thenReturn(5L);
+        when(_volumeMock.getPoolId()).thenReturn(200L);
+        when(_diskOfferingDao.findById(anyLong())).thenReturn(_diskOfferingMock);
+        when(_storagePoolDao.findById(anyLong())).thenReturn(_srcStoragePoolMock);
+
+        // Mock the volume to pool mapping.
+        when(_volumeToPoolMock.get(_volumeMock)).thenReturn(_destStoragePoolMock);
+        when(_destStoragePoolMock.getId()).thenReturn(201L);
+        when(_srcStoragePoolMock.getId()).thenReturn(200L);
+        when(_destStoragePoolMock.isLocal()).thenReturn(false);
+        when(_diskOfferingMock.getUseLocalStorage()).thenReturn(false);
+        when(_poolHostDao.findByPoolHost(anyLong(), anyLong())).thenReturn(mock(StoragePoolHostVO.class));
+
+        // Mock hypervisor guru.
+        HypervisorGuru guruMock = mock(HypervisorGuru.class);
+        when(_hvGuruMgr.getGuru(HypervisorType.XenServer)).thenReturn(guruMock);
+
+        when(_srcHostMock.getClusterId()).thenReturn(3L);
+        when(_destHostMock.getClusterId()).thenReturn(3L);
+
+        // Mock the commands and answers to the agent.
+        PrepareForMigrationAnswer prepAnswerMock = mock(PrepareForMigrationAnswer.class);
+        when(prepAnswerMock.getResult()).thenReturn(true);
+        when(_agentMgr.send(anyLong(), isA(PrepareForMigrationCommand.class))).thenReturn(prepAnswerMock);
+
+        MigrateWithStorageAnswer migAnswerMock = mock(MigrateWithStorageAnswer.class);
+        when(migAnswerMock.getResult()).thenReturn(true);
+        when(_agentMgr.send(anyLong(), isA(MigrateWithStorageCommand.class))).thenReturn(migAnswerMock);
+
+        MigrateWithStorageReceiveAnswer migRecAnswerMock = mock(MigrateWithStorageReceiveAnswer.class);
+        when(migRecAnswerMock.getResult()).thenReturn(true);
+        when(_agentMgr.send(anyLong(), isA(MigrateWithStorageReceiveCommand.class))).thenReturn(migRecAnswerMock);
+
+        MigrateWithStorageSendAnswer migSendAnswerMock = mock(MigrateWithStorageSendAnswer.class);
+        when(migSendAnswerMock.getResult()).thenReturn(true);
+        when(_agentMgr.send(anyLong(), isA(MigrateWithStorageSendCommand.class))).thenReturn(migSendAnswerMock);
+
+        MigrateWithStorageCompleteAnswer migCompleteAnswerMock = mock(MigrateWithStorageCompleteAnswer.class);
+        when(migCompleteAnswerMock.getResult()).thenReturn(true);
+        when(_agentMgr.send(anyLong(), isA(MigrateWithStorageCompleteCommand.class))).thenReturn(migCompleteAnswerMock);
+
+        CheckVirtualMachineAnswer checkVmAnswerMock = mock(CheckVirtualMachineAnswer.class);
+        when(checkVmAnswerMock.getResult()).thenReturn(true);
+        when(checkVmAnswerMock.getState()).thenReturn(PowerState.PowerOn);
+        when(_agentMgr.send(anyLong(), isA(CheckVirtualMachineCommand.class))).thenReturn(checkVmAnswerMock);
+
+        // Mock the state transitions of vm.
+        Pair<Long, Long> opaqueMock = new Pair<Long, Long> (_vmMock.getHostId(), _destHostMock.getId());
+        when(_vmSnapshotMgr.hasActiveVMSnapshotTasks(anyLong())).thenReturn(false);
+        when(_vmInstanceDao.updateState(State.Running, Event.MigrationRequested, State.Migrating, _vmMock, opaqueMock))
+            .thenReturn(true);
+        when(_vmInstanceDao.updateState(State.Migrating, Event.OperationSucceeded, State.Running, _vmMock, opaqueMock))
+            .thenReturn(true);
+    }
+
+    // Check migration of a vm with its volumes within a cluster.
+    @Test
+    public void testMigrateWithVolumeWithinCluster() throws ResourceUnavailableException, ConcurrentOperationException,
+        ManagementServerException, VirtualMachineMigrationException, OperationTimedoutException {
+
+        initializeMockConfigForMigratingVmWithVolumes();
+        when(_srcHostMock.getClusterId()).thenReturn(3L);
+        when(_destHostMock.getClusterId()).thenReturn(3L);
+
+        _vmMgr.migrateWithStorage(_vmInstance.getUuid(), _srcHostMock.getId(), _destHostMock.getId(), _volumeToPoolMock);
+    }
+
+    // Check migration of a vm with its volumes across a cluster.
+    @Test
+    public void testMigrateWithVolumeAcrossCluster() throws ResourceUnavailableException, ConcurrentOperationException,
+        ManagementServerException, VirtualMachineMigrationException, OperationTimedoutException {
+
+        initializeMockConfigForMigratingVmWithVolumes();
+        when(_srcHostMock.getClusterId()).thenReturn(3L);
+        when(_destHostMock.getClusterId()).thenReturn(4L);
+
+        _vmMgr.migrateWithStorage(_vmInstance.getUuid(), _srcHostMock.getId(), _destHostMock.getId(), _volumeToPoolMock);
+    }
+
+    // Check migration of a vm fails when src and destination pool are not of same type; that is, one is shared and
+    // other is local.
+    @Test(expected=CloudRuntimeException.class)
+    public void testMigrateWithVolumeFail1() throws ResourceUnavailableException, ConcurrentOperationException,
+        ManagementServerException, VirtualMachineMigrationException, OperationTimedoutException {
+
+        initializeMockConfigForMigratingVmWithVolumes();
+        when(_srcHostMock.getClusterId()).thenReturn(3L);
+        when(_destHostMock.getClusterId()).thenReturn(3L);
+
+        when(_destStoragePoolMock.isLocal()).thenReturn(true);
+        when(_diskOfferingMock.getUseLocalStorage()).thenReturn(false);
+
+        _vmMgr.migrateWithStorage(_vmInstance.getUuid(), _srcHostMock.getId(), _destHostMock.getId(), _volumeToPoolMock);
+    }
+
+    // Check migration of a vm fails when vm is not in Running state.
+    @Test(expected=ConcurrentOperationException.class)
+    public void testMigrateWithVolumeFail2() throws ResourceUnavailableException, ConcurrentOperationException,
+        ManagementServerException, VirtualMachineMigrationException, OperationTimedoutException {
+
+        initializeMockConfigForMigratingVmWithVolumes();
+        when(_srcHostMock.getClusterId()).thenReturn(3L);
+        when(_destHostMock.getClusterId()).thenReturn(3L);
+
+        when(_vmMock.getState()).thenReturn(State.Stopped);
+
+        _vmMgr.migrateWithStorage(_vmInstance.getUuid(), _srcHostMock.getId(), _destHostMock.getId(), _volumeToPoolMock);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/engine/orchestration/test/com/cloud/vm/VmWorkTest.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/test/com/cloud/vm/VmWorkTest.java b/engine/orchestration/test/com/cloud/vm/VmWorkTest.java
new file mode 100644
index 0000000..09fa10f
--- /dev/null
+++ b/engine/orchestration/test/com/cloud/vm/VmWorkTest.java
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import junit.framework.TestCase;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import com.google.gson.Gson;
+
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
+import org.apache.cloudstack.vm.jobs.VmWorkJobDao;
+import org.apache.cloudstack.vm.jobs.VmWorkJobVO;
+import org.apache.cloudstack.vm.jobs.VmWorkJobVO.Step;
+
+import com.cloud.cluster.ClusterManager;
+import com.cloud.deploy.DataCenterDeployment;
+import com.cloud.deploy.DeploymentPlan;
+import com.cloud.deploy.DeploymentPlanner.ExcludeList;
+import com.cloud.exception.InsufficientCapacityException;
+import com.cloud.exception.InsufficientStorageCapacityException;
+import com.cloud.utils.LogUtils;
+import com.cloud.utils.Predicate;
+import com.cloud.utils.component.ComponentContext;
+import com.cloud.utils.db.Transaction;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(locations="classpath:/VmWorkTestContext.xml")
+public class VmWorkTest extends TestCase {
+	@Inject AsyncJobManager _jobMgr;
+	@Inject VirtualMachineManager _vmMgr;
+    @Inject ClusterManager _clusterMgr;
+    @Inject VmWorkJobDao _vmworkJobDao;
+	
+	Gson _gson = new Gson();
+	
+	@Before
+	public void setup() {
+		LogUtils.initLog4j("log4j-vmops.xml");
+		
+    	ComponentContext.initComponentsLifeCycle();
+       	_vmMgr = Mockito.spy(_vmMgr);
+       	Mockito.when(_clusterMgr.getManagementNodeId()).thenReturn(1L);
+    	
+    	Transaction.open("dummy");
+    	
+		// drop constraint check in order to do single table test
+		Statement stat = null;
+		try {
+			stat = Transaction.currentTxn().getConnection().createStatement();
+			stat.execute("SET foreign_key_checks = 0;");
+		} catch (SQLException e) {
+		} finally {
+			if(stat != null) {
+				try {
+					stat.close();
+				} catch (SQLException e) {
+				}
+			}
+		}
+ 	}
+	
+    @Override
+    @After
+    public void tearDown() {
+    	Transaction.currentTxn().close();
+    }
+	
+	@Test
+	public void testDeployPlanSerialization() {
+		DeploymentPlan plan = new DataCenterDeployment(1L);
+		ExcludeList excludeList = new ExcludeList();
+		
+		excludeList.addCluster(1);
+		plan.setAvoids(excludeList);
+		
+		String json = _gson.toJson(plan);
+		DeploymentPlan planClone = _gson.fromJson(json, DataCenterDeployment.class);
+		Assert.assertTrue(planClone.getDataCenterId() == plan.getDataCenterId());
+	}
+	
+	@Test
+	public void testVmWorkStart() {
+		VmWorkStart work = new VmWorkStart();
+		Map<VirtualMachineProfile.Param, Object> params = new HashMap<VirtualMachineProfile.Param, Object>();
+		params.put(VirtualMachineProfile.Param.HaTag, "HA");
+		params.put(VirtualMachineProfile.Param.ControlNic, new Long(100));
+		work.setParams(params);
+		
+		VmWorkStart workClone = _gson.fromJson(_gson.toJson(work), VmWorkStart.class);
+		Assert.assertTrue(work.getParams().size() == workClone.getParams().size());
+		Assert.assertTrue(work.getParams().get(VirtualMachineProfile.Param.HaTag).equals(workClone.getParams().get(VirtualMachineProfile.Param.HaTag)));
+	}
+	
+	public void testVmWorkDispatcher() {
+        VmWorkJobVO workJob = new VmWorkJobVO(UUID.randomUUID().toString());
+		workJob.setDispatcher("VmWorkJobDispatcher");
+		workJob.setCmd("doVmWorkStart");
+		workJob.setAccountId(1L);
+		workJob.setUserId(2L);
+		workJob.setStep(Step.Starting);
+		workJob.setVmType(VirtualMachine.Type.ConsoleProxy);
+		workJob.setVmInstanceId(1L);
+		
+		VmWorkStart workInfo = new VmWorkStart();
+        workJob.setCmdInfo(VmWorkJobDispatcher.serialize(workInfo));
+		
+		_jobMgr.submitAsyncJob(workJob, "VM", 1);
+		
+		_jobMgr.waitAndCheck(new String[] {"Done"}, 120000, 120000, new Predicate() {
+
+			@Override
+			public boolean checkCondition() {
+				return true;
+			}
+		});
+	}
+	
+	@Test
+	public void testVmWorkWakeup() {
+		AsyncJobVO mainJob = new AsyncJobVO();
+		
+		mainJob.setDispatcher("TestApiJobDispatcher");
+		mainJob.setAccountId(1L);
+		mainJob.setUserId(1L);
+		mainJob.setCmd("Dummy");
+		mainJob.setCmdInfo("Dummy");
+		
+		_jobMgr.submitAsyncJob(mainJob);
+		
+		try {
+			Thread.sleep(120000);
+		} catch (InterruptedException e) {
+		}
+	}
+	
+	@Test
+	public void testExceptionSerialization() {
+		InsufficientCapacityException exception = new InsufficientStorageCapacityException("foo", VmWorkJobVO.class, 1L);
+		
+		String encodedString = JobSerializerHelper.toObjectSerializedString(exception);
+		System.out.println(encodedString);
+
+		exception = (InsufficientCapacityException)JobSerializerHelper.fromObjectSerializedString(encodedString);
+		Assert.assertTrue(exception.getScope() == VmWorkJobVO.class);
+		Assert.assertTrue(exception.getMessage().equals("foo"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/engine/orchestration/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java b/engine/orchestration/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java
new file mode 100644
index 0000000..926fcbb
--- /dev/null
+++ b/engine/orchestration/test/com/cloud/vm/VmWorkTestApiJobDispatcher.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package com.cloud.vm;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import javax.inject.Inject;
+
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobDispatcher;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.vm.jobs.VmWorkJobVO;
+
+import com.cloud.async.AsyncJobExecutionContext;
+import com.cloud.utils.component.AdapterBase;
+import com.cloud.utils.db.Transaction;
+
+public class VmWorkTestApiJobDispatcher extends AdapterBase implements AsyncJobDispatcher {
+
+	@Inject AsyncJobManager _jobMgr;
+	
+	@Override
+    public void runJob(AsyncJob job) {
+		
+		// drop constraint check in order to do single table test
+		Statement stat = null;
+		try {
+			stat = Transaction.currentTxn().getConnection().createStatement();
+			stat.execute("SET foreign_key_checks = 0;");
+		} catch (SQLException e) {
+		} finally {
+			if(stat != null) {
+				try {
+					stat.close();
+				} catch (SQLException e) {
+				}
+			}
+		}
+		
+        VmWorkJobVO workJob = new VmWorkJobVO(job.getRelated());
+    	
+		workJob.setDispatcher("TestWorkJobDispatcher");
+        workJob.setCmd(VmWorkJobDispatcher.Start);
+		
+		workJob.setAccountId(1L);
+		workJob.setUserId(1L);
+		workJob.setStep(VmWorkJobVO.Step.Starting);
+		workJob.setVmType(VirtualMachine.Type.ConsoleProxy);
+		workJob.setVmInstanceId(1L);
+
+		// save work context info (there are some duplications)
+		VmWorkStart workInfo = new VmWorkStart();
+		workInfo.setAccountId(1L);
+		workInfo.setUserId(1L);
+		workInfo.setVmId(1L);
+		workInfo.setPlan(null);
+		workInfo.setParams(null);
+        workJob.setCmdInfo(VmWorkJobDispatcher.serialize(workInfo));
+		
+        _jobMgr.submitAsyncJob(workJob, VmWorkJobDispatcher.VM_WORK_QUEUE, 1L);
+		
+		_jobMgr.joinJob(job.getId(), workJob.getId(), "processVmStartWakeup",
+                VmWorkJobDispatcher.VM_WORK_JOB_WAKEUP_DISPATCHER,
+				new String[] {},
+				3000, 120000);
+		AsyncJobExecutionContext.getCurrentExecutionContext().resetSyncSource();
+	}
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/plugins/host-allocators/random/src/com/cloud/agent/manager/allocator/impl/RandomAllocator.java
----------------------------------------------------------------------
diff --git a/plugins/host-allocators/random/src/com/cloud/agent/manager/allocator/impl/RandomAllocator.java b/plugins/host-allocators/random/src/com/cloud/agent/manager/allocator/impl/RandomAllocator.java
index c7a5821..5ab4837 100755
--- a/plugins/host-allocators/random/src/com/cloud/agent/manager/allocator/impl/RandomAllocator.java
+++ b/plugins/host-allocators/random/src/com/cloud/agent/manager/allocator/impl/RandomAllocator.java
@@ -52,7 +52,7 @@ public class RandomAllocator extends AdapterBase implements HostAllocator {
 
     @Override
     public List<Host> allocateTo(VirtualMachineProfile vmProfile, DeploymentPlan plan, Type type,
-            ExcludeList avoid, List<Host> hosts, int returnUpTo, boolean considerReservedCapacity) {
+            ExcludeList avoid, List<? extends Host> hosts, int returnUpTo, boolean considerReservedCapacity) {
         long dcId = plan.getDataCenterId();
         Long podId = plan.getPodId();
         Long clusterId = plan.getClusterId();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
index 3ce60b7..913ce17 100755
--- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
+++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
@@ -44,16 +44,14 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
 import org.apache.log4j.Logger;
-import org.springframework.context.annotation.Primary;
-import org.springframework.stereotype.Component;
 
 import com.cloud.agent.AgentManager;
 import com.cloud.agent.api.Answer;
 import com.cloud.agent.api.CancelCommand;
 import com.cloud.agent.api.ChangeAgentCommand;
 import com.cloud.agent.api.Command;
-import com.cloud.agent.api.TransferAgentCommand;
 import com.cloud.agent.api.ScheduleHostScanTaskCommand;
+import com.cloud.agent.api.TransferAgentCommand;
 import com.cloud.agent.transport.Request;
 import com.cloud.agent.transport.Request.Version;
 import com.cloud.agent.transport.Response;
@@ -679,12 +677,12 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
     }
 
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
     }
 
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for (ManagementServerHostVO vo : nodeList) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost vo : nodeList) {
             s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid());
             long lastPing = (System.currentTimeMillis() >> 10) - _pingTimeout;
             _hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing);

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/agent/manager/allocator/impl/FirstFitAllocator.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/allocator/impl/FirstFitAllocator.java b/server/src/com/cloud/agent/manager/allocator/impl/FirstFitAllocator.java
index bb06fd6..778ec84 100755
--- a/server/src/com/cloud/agent/manager/allocator/impl/FirstFitAllocator.java
+++ b/server/src/com/cloud/agent/manager/allocator/impl/FirstFitAllocator.java
@@ -180,7 +180,7 @@ public class FirstFitAllocator extends AdapterBase implements HostAllocator {
 
     @Override
     public List<Host> allocateTo(VirtualMachineProfile vmProfile, DeploymentPlan plan,
-            Type type, ExcludeList avoid, List<Host> hosts, int returnUpTo, boolean considerReservedCapacity) {
+            Type type, ExcludeList avoid, List<? extends Host> hosts, int returnUpTo, boolean considerReservedCapacity) {
         long dcId = plan.getDataCenterId();
         Long podId = plan.getPodId();
         Long clusterId = plan.getClusterId();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/agent/manager/allocator/impl/TestingAllocator.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/allocator/impl/TestingAllocator.java b/server/src/com/cloud/agent/manager/allocator/impl/TestingAllocator.java
index 042b26e..5cf027b 100755
--- a/server/src/com/cloud/agent/manager/allocator/impl/TestingAllocator.java
+++ b/server/src/com/cloud/agent/manager/allocator/impl/TestingAllocator.java
@@ -52,7 +52,7 @@ public class TestingAllocator extends AdapterBase implements HostAllocator {
 
     @Override
     public List<Host> allocateTo(VirtualMachineProfile vmProfile, DeploymentPlan plan, Type type,
-            ExcludeList avoid, List<Host> hosts, int returnUpTo, boolean considerReservedCapacity) {
+            ExcludeList avoid, List<? extends Host> hosts, int returnUpTo, boolean considerReservedCapacity) {
         return allocateTo(vmProfile, plan, type, avoid, returnUpTo, considerReservedCapacity);
     }
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/async/AsyncJobManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/async/AsyncJobManagerImpl.java b/server/src/com/cloud/async/AsyncJobManagerImpl.java
index fd6775e..084585f 100644
--- a/server/src/com/cloud/async/AsyncJobManagerImpl.java
+++ b/server/src/com/cloud/async/AsyncJobManagerImpl.java
@@ -62,7 +62,7 @@ import org.apache.cloudstack.framework.messagebus.PublishScope;
 import com.cloud.api.ApiSerializerHelper;
 import com.cloud.cluster.ClusterManager;
 import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.ManagementServerHost;
 import com.cloud.configuration.Config;
 import com.cloud.configuration.dao.ConfigurationDao;
 import com.cloud.exception.InvalidParameterValueException;
@@ -874,12 +874,12 @@ public class AsyncJobManagerImpl extends ManagerBase implements AsyncJobManager,
     }
 
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
     }
 
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for(ManagementServerHostVO msHost : nodeList) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost msHost : nodeList) {
             Transaction txn = Transaction.open(Transaction.CLOUD_DB);
             try {
                 txn.start();

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/cluster/ClusterFenceManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterFenceManagerImpl.java b/server/src/com/cloud/cluster/ClusterFenceManagerImpl.java
index 7e4922e..5125a07 100644
--- a/server/src/com/cloud/cluster/ClusterFenceManagerImpl.java
+++ b/server/src/com/cloud/cluster/ClusterFenceManagerImpl.java
@@ -43,11 +43,11 @@ public class ClusterFenceManagerImpl extends ManagerBase implements ClusterFence
 	}
 
 	@Override
-	public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
 	}
 
 	@Override
-	public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterManager.java b/server/src/com/cloud/cluster/ClusterManager.java
deleted file mode 100755
index 017ba31..0000000
--- a/server/src/com/cloud/cluster/ClusterManager.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.Command;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.exception.OperationTimedoutException;
-import com.cloud.host.Status.Event;
-import com.cloud.resource.ResourceState;
-import com.cloud.utils.component.Manager;
-
-public interface ClusterManager extends Manager {
-	public static final int DEFAULT_HEARTBEAT_INTERVAL = 1500;
-	public static final int DEFAULT_HEARTBEAT_THRESHOLD = 150000;
-	public static final String ALERT_SUBJECT = "cluster-alert";
-	
-	public void OnReceiveClusterServicePdu(ClusterServicePdu pdu);
-    public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError);
-    public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError);
-
-    public Answer[] sendToAgent(Long hostId, Command []  cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException;
-    public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException;
-    public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException;
-    public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException;
-    public boolean executeResourceUserRequest(long hostId, ResourceState.Event event) throws AgentUnavailableException;
-	
-	public int getHeartbeatThreshold();
-	
-	public long getManagementNodeId();		// msid of current management server node
-    public boolean isManagementNodeAlive(long msid);
-    public boolean pingManagementNode(long msid);
-	public long getCurrentRunId();
-    
-	public String getSelfPeerName();
-	public String getSelfNodeIP();
-    public String getPeerName(long agentHostId);
-	
-	public void registerListener(ClusterManagerListener listener);
-	public void unregisterListener(ClusterManagerListener listener);
-    public ManagementServerHostVO getPeer(String peerName);
-    
-    /**
-     * Broadcast the command to all of the  management server nodes.
-     * @param agentId agent id this broadcast is regarding
-     * @param cmds commands to broadcast
-     */
-    public void broadcast(long agentId, Command[] cmds);
-    
-    boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException;
-    
-    boolean isAgentRebalanceEnabled();
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/cluster/ClusterManagerListener.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterManagerListener.java b/server/src/com/cloud/cluster/ClusterManagerListener.java
deleted file mode 100644
index bcb1736..0000000
--- a/server/src/com/cloud/cluster/ClusterManagerListener.java
+++ /dev/null
@@ -1,25 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-import java.util.List;
-
-public interface ClusterManagerListener {
-	void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId);
-	void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId);
-	void onManagementNodeIsolated();
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/cluster/ClusterServicePdu.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterServicePdu.java b/server/src/com/cloud/cluster/ClusterServicePdu.java
deleted file mode 100644
index 81ff5d8..0000000
--- a/server/src/com/cloud/cluster/ClusterServicePdu.java
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-public class ClusterServicePdu {
-	public final static int PDU_TYPE_MESSAGE = 0;
-	public final static int PDU_TYPE_REQUEST = 1;
-	public final static int PDU_TYPE_RESPONSE = 2;
-	
-    private long sequenceId;
-    private long ackSequenceId;
-    
-    private String sourcePeer;
-    private String destPeer;
-    
-    private long agentId;
-    private boolean stopOnError;
-    private String jsonPackage;
-    
-    private int pduType = PDU_TYPE_MESSAGE;
-    
-    private static long s_nextPduSequenceId = 1;
-    
-    public ClusterServicePdu() {
-        sequenceId = getNextPduSequenceId();
-        ackSequenceId = 0;
-        agentId = 0;
-        stopOnError = false;
-    }
-    
-    public synchronized long getNextPduSequenceId() {
-        return s_nextPduSequenceId++;
-    }
-
-    public long getSequenceId() {
-        return sequenceId;
-    }
-
-    public void setSequenceId(long sequenceId) {
-        this.sequenceId = sequenceId;
-    }
-
-    public long getAckSequenceId() {
-        return ackSequenceId;
-    }
-
-    public void setAckSequenceId(long ackSequenceId) {
-        this.ackSequenceId = ackSequenceId;
-    }
-
-    public String getSourcePeer() {
-        return sourcePeer;
-    }
-
-    public void setSourcePeer(String sourcePeer) {
-        this.sourcePeer = sourcePeer;
-    }
-
-    public String getDestPeer() {
-        return destPeer;
-    }
-
-    public void setDestPeer(String destPeer) {
-        this.destPeer = destPeer;
-    }
-
-    public long getAgentId() {
-        return agentId;
-    }
-
-    public void setAgentId(long agentId) {
-        this.agentId = agentId;
-    }
-
-    public boolean isStopOnError() {
-        return stopOnError;
-    }
-
-    public void setStopOnError(boolean stopOnError) {
-        this.stopOnError = stopOnError;
-    }
-
-    public String getJsonPackage() {
-        return jsonPackage;
-    }
-
-    public void setJsonPackage(String jsonPackage) {
-        this.jsonPackage = jsonPackage;
-    }
-    
-    public int getPduType() {
-    	return pduType;
-    }
-    
-    public void setPduType(int pduType) {
-    	this.pduType = pduType;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/cluster/LockMasterListener.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/LockMasterListener.java b/server/src/com/cloud/cluster/LockMasterListener.java
index cc10e2c..c295a65 100644
--- a/server/src/com/cloud/cluster/LockMasterListener.java
+++ b/server/src/com/cloud/cluster/LockMasterListener.java
@@ -32,12 +32,12 @@ public class LockMasterListener implements ClusterManagerListener {
     }
 
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
     }
 
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for (ManagementServerHostVO node : nodeList) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost node : nodeList) {
             _lockMaster.cleanupForServer(node.getMsid());
         }
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
index 9e8b905..2385b16 100755
--- a/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
+++ b/server/src/com/cloud/ha/HighAvailabilityManagerImpl.java
@@ -37,7 +37,7 @@ import org.apache.cloudstack.context.CallContext;
 import com.cloud.agent.AgentManager;
 import com.cloud.alert.AlertManager;
 import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.ManagementServerHost;
 import com.cloud.configuration.Config;
 import com.cloud.configuration.dao.ConfigurationDao;
 import com.cloud.dc.ClusterDetailsDao;
@@ -868,12 +868,12 @@ public class HighAvailabilityManagerImpl extends ManagerBase implements HighAvai
     }
 
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
     }
 
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for (ManagementServerHostVO node : nodeList) {
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+        for (ManagementServerHost node : nodeList) {
             _haDao.releaseWorkItems(node.getMsid());
         }
     }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/storage/StorageManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/StorageManagerImpl.java b/server/src/com/cloud/storage/StorageManagerImpl.java
index 9614166..2e2b6ff 100755
--- a/server/src/com/cloud/storage/StorageManagerImpl.java
+++ b/server/src/com/cloud/storage/StorageManagerImpl.java
@@ -86,7 +86,7 @@ import com.cloud.capacity.CapacityState;
 import com.cloud.capacity.CapacityVO;
 import com.cloud.capacity.dao.CapacityDao;
 import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ManagementServerHostVO;
+import com.cloud.cluster.ManagementServerHost;
 import com.cloud.configuration.Config;
 import com.cloud.configuration.ConfigurationManager;
 import com.cloud.configuration.dao.ConfigurationDao;
@@ -152,6 +152,7 @@ import com.cloud.user.dao.AccountDao;
 import com.cloud.user.dao.UserDao;
 import com.cloud.utils.NumbersUtil;
 import com.cloud.utils.Pair;
+import com.cloud.utils.StringUtils;
 import com.cloud.utils.UriUtils;
 import com.cloud.utils.component.ComponentContext;
 import com.cloud.utils.component.ManagerBase;
@@ -645,8 +646,7 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     
     @Override
     public String getStoragePoolTags(long poolId) {
-        return _configMgr.listToCsvTags(_storagePoolDao
-                .searchForStoragePoolDetails(poolId, "true"));
+        return StringUtils.listToCsvTags(_storagePoolDao.searchForStoragePoolDetails(poolId, "true"));
     }
 
     @Override
@@ -1496,16 +1496,16 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
     }
 
     @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList,
+    public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList,
             long selfNodeId) {
         // TODO Auto-generated method stub
 
     }
 
     @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList,
+    public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList,
             long selfNodeId) {
-        for (ManagementServerHostVO vo : nodeList) {
+        for (ManagementServerHost vo : nodeList) {
             if (vo.getMsid() == _serverId) {
                 s_logger.info("Cleaning up storage maintenance jobs associated with Management server"
                         + vo.getMsid());
@@ -1513,8 +1513,7 @@ public class StorageManagerImpl extends ManagerBase implements StorageManager, C
                         .searchForPoolIdsForPendingWorkJobs(vo.getMsid());
                 if (poolIds.size() > 0) {
                     for (Long poolId : poolIds) {
-                        StoragePoolVO pool = _storagePoolDao
-                                .findById(poolId);
+                        StoragePoolVO pool = _storagePoolDao.findById(poolId);
                         // check if pool is in an inconsistent state
                         if (pool != null
                                 && (pool.getStatus().equals(

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java b/server/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java
deleted file mode 100644
index 8f84055..0000000
--- a/server/src/com/cloud/vm/ClusteredVirtualMachineManagerImpl.java
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.vm;
-
-import java.util.List;
-import java.util.Map;
-
-import javax.ejb.Local;
-import javax.inject.Inject;
-import javax.naming.ConfigurationException;
-
-import com.cloud.cluster.ClusterManager;
-import com.cloud.cluster.ClusterManagerListener;
-import com.cloud.cluster.ManagementServerHostVO;
-
-@Local(value=VirtualMachineManager.class)
-public class ClusteredVirtualMachineManagerImpl extends VirtualMachineManagerImpl implements ClusterManagerListener {
-    @Inject
-    ClusterManager _clusterMgr;
-
-    protected ClusteredVirtualMachineManagerImpl() {
-    }
-    
-    @Override
-    public void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        
-    }
-    
-    @Override
-    public void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId) {
-        for (ManagementServerHostVO node : nodeList) {
-            cancelWorkItems(node.getMsid());
-        }
-    }
-
-    @Override
-	public void onManagementNodeIsolated() {
-	}
-    
-    @Override
-    public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
-        super.configure(name, xmlParams);
-        
-        _clusterMgr.registerListener(this);
-        
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/aff0220d/server/test/com/cloud/async/TestAsyncJobManager.java
----------------------------------------------------------------------
diff --git a/server/test/com/cloud/async/TestAsyncJobManager.java b/server/test/com/cloud/async/TestAsyncJobManager.java
index 74c22f8..320c68f 100644
--- a/server/test/com/cloud/async/TestAsyncJobManager.java
+++ b/server/test/com/cloud/async/TestAsyncJobManager.java
@@ -22,8 +22,18 @@ import java.util.Date;
 import java.util.List;
 
 import javax.inject.Inject;
+
 import junit.framework.TestCase;
 
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
 import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.framework.jobs.AsyncJobConstants;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
@@ -38,15 +48,6 @@ import org.apache.cloudstack.framework.jobs.impl.SyncQueueItemVO;
 import org.apache.cloudstack.framework.jobs.impl.SyncQueueVO;
 import org.apache.cloudstack.framework.messagebus.MessageBus;
 import org.apache.cloudstack.framework.messagebus.PublishScope;
-import org.apache.cloudstack.messagebus.TopicConstants;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mockito;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
 import com.cloud.cluster.ClusterManager;
 import com.cloud.user.AccountManager;
@@ -70,7 +71,8 @@ public class TestAsyncJobManager extends TestCase {
     @Inject SyncQueueDao syncQueueDao;
     @Inject SyncQueueItemDao syncQueueItemDao;
     
-    @Before                                                  
+    @Override
+    @Before
     public void setUp() {
     	ComponentContext.initComponentsLifeCycle();
     	Mockito.when(clusterMgr.getManagementNodeId()).thenReturn(1L);
@@ -96,11 +98,12 @@ public class TestAsyncJobManager extends TestCase {
 				}
 			}
 		}
-    }                                                        
+    }
                                                              
-    @After                                                   
-    public void tearDown() {                                 
-    	Transaction.currentTxn().close();                    
+    @Override
+    @After
+    public void tearDown() {
+    	Transaction.currentTxn().close();
     }
     
     @Test
@@ -220,9 +223,10 @@ public class TestAsyncJobManager extends TestCase {
 		jobMonitor.registerActiveTask(1, 1);
 		
     	asyncMgr.waitAndCheck(new String[] {"VM"}, 5000L, 10000L, new Predicate() {
-    		public boolean checkCondition() {
+    		@Override
+            public boolean checkCondition() {
     			System.out.println("Check condition to exit");
-    			messageBus.publish(null, TopicConstants.JOB_HEARTBEAT, PublishScope.LOCAL, 1L);
+                messageBus.publish(null, AsyncJob.Topics.JOB_HEARTBEAT, PublishScope.LOCAL, 1L);
     			return false;
     		}
     	});