You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2014/01/09 23:33:55 UTC

[07/13] git commit: updated refs/heads/master to 1e2e1ea

CLOUDSTACK-669: Finalize VM work dispatching mechanism to avoid big switch statement


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/05873822
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/05873822
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/05873822

Branch: refs/heads/master
Commit: 0587382265ea5adec3fa382f5c278459ca71c52a
Parents: a6f126d
Author: Kelven Yang <ke...@gmail.com>
Authored: Tue Dec 31 10:50:43 2013 -0800
Committer: Kelven Yang <ke...@gmail.com>
Committed: Thu Jan 9 14:29:12 2014 -0800

----------------------------------------------------------------------
 .../src/com/cloud/vm/VmWorkJobHandler.java      |   3 +-
 .../src/com/cloud/vm/VmWorkJobHandlerProxy.java | 121 ++++++++++++
 .../cloud/vm/snapshot/VMSnapshotManager.java    |   1 -
 ...spring-engine-orchestration-core-context.xml |   2 +-
 .../com/cloud/vm/VirtualMachineManagerImpl.java | 193 ++++++++++++-------
 .../src/com/cloud/vm/VmWorkJobDispatcher.java   |   7 +-
 .../com/cloud/storage/VolumeApiServiceImpl.java | 113 +++--------
 .../vm/snapshot/VMSnapshotManagerImpl.java      |  93 +++------
 8 files changed, 311 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/05873822/engine/components-api/src/com/cloud/vm/VmWorkJobHandler.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/vm/VmWorkJobHandler.java b/engine/components-api/src/com/cloud/vm/VmWorkJobHandler.java
index 6ab1bbc..d49a041 100644
--- a/engine/components-api/src/com/cloud/vm/VmWorkJobHandler.java
+++ b/engine/components-api/src/com/cloud/vm/VmWorkJobHandler.java
@@ -16,11 +16,10 @@
 // under the License.
 package com.cloud.vm;
 
-import org.apache.cloudstack.framework.jobs.AsyncJob;
 import org.apache.cloudstack.jobs.JobInfo;
 
 import com.cloud.utils.Pair;
 
 public interface VmWorkJobHandler {
-    Pair<JobInfo.Status, String> handleVmWorkJob(AsyncJob job, VmWork work) throws Exception;
+    Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/05873822/engine/components-api/src/com/cloud/vm/VmWorkJobHandlerProxy.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/vm/VmWorkJobHandlerProxy.java b/engine/components-api/src/com/cloud/vm/VmWorkJobHandlerProxy.java
new file mode 100644
index 0000000..6550281
--- /dev/null
+++ b/engine/components-api/src/com/cloud/vm/VmWorkJobHandlerProxy.java
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.cloud.vm;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+import com.google.gson.Gson;
+
+import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
+import org.apache.cloudstack.jobs.JobInfo;
+
+import com.cloud.serializer.GsonHelper;
+import com.cloud.utils.Pair;
+
+/**
+ * VmWorkJobHandlerProxy can not be used as standalone due to run-time
+ * reflection usage in its implementation, run-time reflection conflicts with Spring proxy mode.
+ * It means that we can not instantiate VmWorkJobHandlerProxy beans directly in Spring and expect
+ * it can handle VmWork directly from there.
+ *
+ */
+public class VmWorkJobHandlerProxy implements VmWorkJobHandler {
+
+    private static final Logger s_logger = Logger.getLogger(VmWorkJobHandlerProxy.class);
+
+    private Object _target;
+    private Map<Class<?>, Method> _handlerMethodMap = new HashMap<Class<?>, Method>();
+
+    private Gson _gsonLogger;
+
+    public VmWorkJobHandlerProxy(Object target) {
+        _gsonLogger = GsonHelper.getGsonLogger();
+
+        buildLookupMap(target.getClass());
+        _target = target;
+    }
+
+    private void buildLookupMap(Class<?> hostClass) {
+        Class<?> clz = hostClass;
+        while (clz != null && clz != Object.class) {
+            Method[] hostHandlerMethods = clz.getDeclaredMethods();
+
+            for (Method method : hostHandlerMethods) {
+                if (isVmWorkJobHandlerMethod(method)) {
+                    Class<?> paramType = method.getParameterTypes()[0];
+                    assert (_handlerMethodMap.get(paramType) == null);
+
+                    method.setAccessible(true);
+                    _handlerMethodMap.put(paramType, method);
+                }
+            }
+
+            clz = clz.getSuperclass();
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private boolean isVmWorkJobHandlerMethod(Method method) {
+        if (method.getParameterTypes().length != 1)
+            return false;
+
+        Class<?> returnType = method.getReturnType();
+        if (!Pair.class.isAssignableFrom(returnType))
+            return false;
+
+        Class<?> paramType = method.getParameterTypes()[0];
+        if (!VmWork.class.isAssignableFrom(paramType))
+            return false;
+
+        return true;
+    }
+
+    private Method getHandlerMethod(Class<?> paramType) {
+        return _handlerMethodMap.get(paramType);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
+
+        Method method = getHandlerMethod(work.getClass());
+        if (method != null) {
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Execute VM work job: " + work.getClass().getName() + _gsonLogger.toJson(work));
+
+            Object obj = method.invoke(_target, work);
+
+            if (s_logger.isDebugEnabled())
+                s_logger.debug("Done executing VM work job: " + work.getClass().getName() + _gsonLogger.toJson(work));
+
+            assert (obj instanceof Pair);
+            return (Pair<JobInfo.Status, String>)obj;
+        } else {
+            s_logger.error("Unable to find handler for VM work job: " + work.getClass().getName() + _gsonLogger.toJson(work));
+
+            RuntimeException e = new RuntimeException("Unsupported VM work job: " + work.getClass().getName() + _gsonLogger.toJson(work));
+            String exceptionJson = JobSerializerHelper.toSerializedString(e);
+
+            s_logger.error("Serialize exception object into json: " + exceptionJson);
+            return new Pair<JobInfo.Status, String>(JobInfo.Status.FAILED, exceptionJson);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/05873822/engine/components-api/src/com/cloud/vm/snapshot/VMSnapshotManager.java
----------------------------------------------------------------------
diff --git a/engine/components-api/src/com/cloud/vm/snapshot/VMSnapshotManager.java b/engine/components-api/src/com/cloud/vm/snapshot/VMSnapshotManager.java
index 7d233ca..e7e3372 100644
--- a/engine/components-api/src/com/cloud/vm/snapshot/VMSnapshotManager.java
+++ b/engine/components-api/src/com/cloud/vm/snapshot/VMSnapshotManager.java
@@ -42,5 +42,4 @@ public interface VMSnapshotManager extends VMSnapshotService, Manager {
     boolean syncVMSnapshot(VMInstanceVO vm, Long hostId);
 
     boolean hasActiveVMSnapshotTasks(Long vmId);
-
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/05873822/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
----------------------------------------------------------------------
diff --git a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
index 7445102..fd5299c 100644
--- a/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
+++ b/engine/orchestration/resources/META-INF/cloudstack/core/spring-engine-orchestration-core-context.xml
@@ -72,7 +72,7 @@
     <bean id="virtualMachineEntityImpl" class="org.apache.cloudstack.engine.cloud.entity.api.VirtualMachineEntityImpl" />
     
     <bean id="virtualMachinePowerStateSyncImpl" class="com.cloud.vm.VirtualMachinePowerStateSyncImpl" />
-      
+    
     <bean id= "vmWorkJobDispatcher" class="com.cloud.vm.VmWorkJobDispatcher">
         <property name="name">
             <util:constant static-field="com.cloud.vm.VmWorkConstants.VM_WORK_JOB_DISPATCHER"/>

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/05873822/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 430a4a4..71952dd 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -316,6 +316,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     @Inject
     protected AsyncJobManager _jobMgr;
 
+    VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
+
     Map<VirtualMachine.Type, VirtualMachineGuru> _vmGurus = new HashMap<VirtualMachine.Type, VirtualMachineGuru>();
     protected StateMachine2<State, VirtualMachine.Event, VirtualMachine> _stateMachine;
 
@@ -526,6 +528,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
     @Override
     public boolean start() {
+        // TODO, initial delay is hardcoded
+        _executor.scheduleAtFixedRate(new TransitionTask(), 5000, VmJobStateReportInterval.value(), TimeUnit.SECONDS);
         _executor.scheduleAtFixedRate(new CleanupTask(), VmOpCleanupInterval.value(), VmOpCleanupInterval.value(), TimeUnit.SECONDS);
         cancelWorkItems(_nodeId);
         return true;
@@ -2942,9 +2946,9 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
                 return;
             }
             try {
-                lock.addRef();
-                List<VMInstanceVO> instances =
-                    _vmDao.findVMInTransition(new Date(new Date().getTime() - (AgentManager.Wait.value() * 1000)), State.Starting, State.Stopping);
+                scanStalledVMInTransitionStateOnDisconnectedHosts();
+
+                List<VMInstanceVO> instances = _vmDao.findVMInTransition(new Date(new Date().getTime() - (AgentManager.Wait.value() * 1000)), State.Starting, State.Stopping);
                 for (VMInstanceVO instance : instances) {
                     State state = instance.getState();
                     if (state == State.Stopping) {
@@ -3974,7 +3978,6 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         }
     }
 
-
     // VMs that in transitional state without recent power state report
     private List<Long> listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) {
         String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
@@ -4693,74 +4696,132 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         return new VmJobVirtualMachineOutcome((VmWorkJobVO)context.getContextParameter("workJob"), vm.getId());
     }
 
-    @Override
-    public Pair<JobInfo.Status, String> handleVmWorkJob(AsyncJob job, VmWork work) throws Exception {
+    private Pair<JobInfo.Status, String> orchestrateStart(VmWorkStart work) throws Exception {
+        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
+        if (vm == null) {
+            s_logger.info("Unable to find vm " + work.getVmId());
+        }
+        assert (vm != null);
+
+        orchestrateStart(vm.getUuid(), work.getParams(), work.getPlan(), null);
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
 
+    private Pair<JobInfo.Status, String> orchestrateStop(VmWorkStop work) throws Exception {
         VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
         if (vm == null) {
             s_logger.info("Unable to find vm " + work.getVmId());
         }
         assert (vm != null);
-        if (work instanceof VmWorkStart) {
-            VmWorkStart workStart = (VmWorkStart)work;
-            orchestrateStart(vm.getUuid(), workStart.getParams(), workStart.getPlan(), null);
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-        } else if (work instanceof VmWorkStop) {
-            VmWorkStop workStop = (VmWorkStop)work;
-            orchestrateStop(vm.getUuid(), workStop.isCleanup());
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-        } else if (work instanceof VmWorkMigrate) {
-            VmWorkMigrate workMigrate = (VmWorkMigrate)work;
-            orchestrateMigrate(vm.getUuid(), workMigrate.getSrcHostId(), workMigrate.getDeployDestination());
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-        } else if (work instanceof VmWorkMigrateWithStorage) {
-            VmWorkMigrateWithStorage workMigrateWithStorage = (VmWorkMigrateWithStorage)work;
-            orchestrateMigrateWithStorage(vm.getUuid(),
-                    workMigrateWithStorage.getSrcHostId(),
-                    workMigrateWithStorage.getDestHostId(),
-                    workMigrateWithStorage.getVolumeToPool());
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-        } else if (work instanceof VmWorkMigrateForScale) {
-            VmWorkMigrateForScale workMigrateForScale = (VmWorkMigrateForScale)work;
-            orchestrateMigrateForScale(vm.getUuid(),
-                    workMigrateForScale.getSrcHostId(),
-                    workMigrateForScale.getDeployDestination(),
-                    workMigrateForScale.getNewServiceOfferringId());
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-        } else if (work instanceof VmWorkReboot) {
-            VmWorkReboot workReboot = (VmWorkReboot)work;
-            orchestrateReboot(vm.getUuid(), workReboot.getParams());
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-        } else if (work instanceof VmWorkAddVmToNetwork) {
-            VmWorkAddVmToNetwork workAddVmToNetwork = (VmWorkAddVmToNetwork)work;
-            NicProfile nic = orchestrateAddVmToNetwork(vm, workAddVmToNetwork.getNetwork(),
-                    workAddVmToNetwork.getRequestedNicProfile());
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(nic));
-        } else if (work instanceof VmWorkRemoveNicFromVm) {
-            VmWorkRemoveNicFromVm workRemoveNicFromVm = (VmWorkRemoveNicFromVm)work;
-            boolean result = orchestrateRemoveNicFromVm(vm, workRemoveNicFromVm.getNic());
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED,
-                    JobSerializerHelper.toObjectSerializedString(new Boolean(result)));
-        } else if (work instanceof VmWorkRemoveVmFromNetwork) {
-            VmWorkRemoveVmFromNetwork workRemoveVmFromNetwork = (VmWorkRemoveVmFromNetwork)work;
-            boolean result = orchestrateRemoveVmFromNetwork(vm,
-                    workRemoveVmFromNetwork.getNetwork(), workRemoveVmFromNetwork.getBroadcastUri());
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED,
-                    JobSerializerHelper.toObjectSerializedString(new Boolean(result)));
-        } else if (work instanceof VmWorkReconfigure) {
-            VmWorkReconfigure workReconfigure = (VmWorkReconfigure)work;
-            reConfigureVm(vm.getUuid(), workReconfigure.getNewServiceOffering(),
-                    workReconfigure.isSameHost());
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-        } else if (work instanceof VmWorkStorageMigration) {
-            VmWorkStorageMigration workStorageMigration = (VmWorkStorageMigration)work;
-            orchestrateStorageMigration(vm.getUuid(), workStorageMigration.getDestStoragePool());
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-        } else {
-            RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd());
-            String exceptionJson = JobSerializerHelper.toSerializedString(e);
-            s_logger.error("Serialize exception object into json: " + exceptionJson);
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.FAILED, exceptionJson);
+
+        orchestrateStop(vm.getUuid(), work.isCleanup());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
+
+    private Pair<JobInfo.Status, String> orchestrateMigrate(VmWorkMigrate work) throws Exception {
+        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
+        if (vm == null) {
+            s_logger.info("Unable to find vm " + work.getVmId());
+        }
+        assert (vm != null);
+
+        orchestrateMigrate(vm.getUuid(), work.getSrcHostId(), work.getDeployDestination());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
+
+    private Pair<JobInfo.Status, String> orchestrateMigrateWithStorage(VmWorkMigrateWithStorage work) throws Exception {
+        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
+        if (vm == null) {
+            s_logger.info("Unable to find vm " + work.getVmId());
+        }
+        assert (vm != null);
+        orchestrateMigrateWithStorage(vm.getUuid(),
+                work.getSrcHostId(),
+                work.getDestHostId(),
+                work.getVolumeToPool());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
+
+    private Pair<JobInfo.Status, String> orchestrateMigrateForScale(VmWorkMigrateForScale work) throws Exception {
+        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
+        if (vm == null) {
+            s_logger.info("Unable to find vm " + work.getVmId());
+        }
+        assert (vm != null);
+        orchestrateMigrateForScale(vm.getUuid(),
+                work.getSrcHostId(),
+                work.getDeployDestination(),
+                work.getNewServiceOfferringId());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
+
+    private Pair<JobInfo.Status, String> orchestrateReboot(VmWorkReboot work) throws Exception {
+        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
+        if (vm == null) {
+            s_logger.info("Unable to find vm " + work.getVmId());
         }
+        assert (vm != null);
+        orchestrateReboot(vm.getUuid(), work.getParams());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
+
+    private Pair<JobInfo.Status, String> orchestrateAddVmToNetwork(VmWorkAddVmToNetwork work) throws Exception {
+        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
+        if (vm == null) {
+            s_logger.info("Unable to find vm " + work.getVmId());
+        }
+        assert (vm != null);
+        NicProfile nic = orchestrateAddVmToNetwork(vm, work.getNetwork(),
+                work.getRequestedNicProfile());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, _jobMgr.marshallResultObject(nic));
+    }
+
+    private Pair<JobInfo.Status, String> orchestrateRemoveNicFromVm(VmWorkRemoveNicFromVm work) throws Exception {
+        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
+        if (vm == null) {
+            s_logger.info("Unable to find vm " + work.getVmId());
+        }
+        assert (vm != null);
+        boolean result = orchestrateRemoveNicFromVm(vm, work.getNic());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED,
+                _jobMgr.marshallResultObject(new Boolean(result)));
+    }
+
+    private Pair<JobInfo.Status, String> orchestrateRemoveVmFromNetwork(VmWorkRemoveVmFromNetwork work) throws Exception {
+        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
+        if (vm == null) {
+            s_logger.info("Unable to find vm " + work.getVmId());
+        }
+        assert (vm != null);
+        boolean result = orchestrateRemoveVmFromNetwork(vm,
+                work.getNetwork(), work.getBroadcastUri());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED,
+                _jobMgr.marshallResultObject(new Boolean(result)));
+    }
+
+    private Pair<JobInfo.Status, String> orchestrateReconfigure(VmWorkReconfigure work) throws Exception {
+        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
+        if (vm == null) {
+            s_logger.info("Unable to find vm " + work.getVmId());
+        }
+        assert (vm != null);
+        reConfigureVm(vm.getUuid(), work.getNewServiceOffering(),
+                work.isSameHost());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
+
+    private Pair<JobInfo.Status, String> orchestrateStorageMigration(VmWorkStorageMigration work) throws Exception {
+        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
+        if (vm == null) {
+            s_logger.info("Unable to find vm " + work.getVmId());
+        }
+        assert (vm != null);
+        orchestrateStorageMigration(vm.getUuid(), work.getDestStoragePool());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
+
+    @Override
+    public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
+        return _jobHandlerProxy.handleVmWorkJob(work);
     }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/05873822/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
index dea64da..1af0dac 100644
--- a/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
+++ b/engine/orchestration/src/com/cloud/vm/VmWorkJobDispatcher.java
@@ -74,8 +74,8 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
             }
 
             work = VmWorkSerializer.deserialize(workClz, job.getCmdInfo());
-            assert (work != null);
-            if (work == null) {
+            assert(work != null);
+            if(work == null) {
                 s_logger.error("Unable to deserialize VM work " + job.getCmd() + ", job info: " + job.getCmdInfo());
                 _asyncJobMgr.completeAsyncJob(job.getId(), JobInfo.Status.FAILED, 0, "Unable to deserialize VM work");
                 return;
@@ -97,8 +97,9 @@ public class VmWorkJobDispatcher extends AdapterBase implements AsyncJobDispatch
 
             CallContext.register(work.getUserId(), work.getAccountId(), job.getRelated());
 
-            Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(job, work);
+            Pair<JobInfo.Status, String> result = handler.handleVmWorkJob(work);
             _asyncJobMgr.completeAsyncJob(job.getId(), result.first(), 0, result.second());
+
         } catch(Throwable e) {
             s_logger.error("Unable to complete " + job, e);
 

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/05873822/server/src/com/cloud/storage/VolumeApiServiceImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/storage/VolumeApiServiceImpl.java b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
index 0da8ebd..e7d5164 100644
--- a/server/src/com/cloud/storage/VolumeApiServiceImpl.java
+++ b/server/src/com/cloud/storage/VolumeApiServiceImpl.java
@@ -61,7 +61,6 @@ import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 import org.apache.cloudstack.framework.jobs.Outcome;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
-import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
 import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
 import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
 import org.apache.cloudstack.jobs.JobInfo;
@@ -329,6 +328,8 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
     @Inject
     protected AsyncJobManager _jobMgr;
 
+    VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
+
     // TODO
     static final ConfigKey<Boolean> VmJobEnabled = new ConfigKey<Boolean>("Advanced", Boolean.class, "vm.job.enabled", "false",
             "True to enable new VM sync model. false to use the old way", false);
@@ -2353,92 +2354,38 @@ public class VolumeApiServiceImpl extends ManagerBase implements VolumeApiServic
                 snapshotId);
     }
 
-    @Override
-    public Pair<JobInfo.Status, String> handleVmWorkJob(AsyncJob job, VmWork work) throws Exception {
-        VMInstanceVO vm = _entityMgr.findById(VMInstanceVO.class, work.getVmId());
-        if (vm == null) {
-            s_logger.info("Unable to find vm " + work.getVmId());
-        }
-        assert (vm != null);
-
-        if (work instanceof VmWorkAttachVolume) {
-
-            VmWorkAttachVolume attachWork = (VmWorkAttachVolume)work;
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Attach-Volume within VM work job context. vmId: " + attachWork.getVmId() + ", volId: " + attachWork.getVolumeId() + ", deviceId: "
-                        + attachWork.getDeviceId());
-
-            orchestrateAttachVolumeToVM(attachWork.getVmId(), attachWork.getVolumeId(), attachWork.getDeviceId());
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Done executing Attach-Volume within VM work job context. vmId: " + attachWork.getVmId() + ", volId: " + attachWork.getVolumeId() + ", deviceId: "
-                        + attachWork.getDeviceId());
-
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-        } else if (work instanceof VmWorkDetachVolume) {
-            VmWorkDetachVolume detachWork = (VmWorkDetachVolume)work;
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Detach-Volume within VM work job context. vmId: " + detachWork.getVmId() + ", volId: " + detachWork.getVolumeId());
-
-            orchestrateDetachVolumeFromVM(detachWork.getVmId(), detachWork.getVolumeId());
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Done executing Detach-Volume within VM work job context. vmId: " + detachWork.getVmId() + ", volId: " + detachWork.getVolumeId());
-
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-        } else if (work instanceof VmWorkResizeVolume) {
-            VmWorkResizeVolume resizeWork = (VmWorkResizeVolume)work;
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Resize-Volume within VM work job context. vmId: " + resizeWork.getVmId()
-                        + ", volId: " + resizeWork.getVolumeId() + ", size " + resizeWork.getCurrentSize() + " -> " + resizeWork.getNewSize());
-
-            orchestrateResizeVolume(resizeWork.getVolumeId(), resizeWork.getCurrentSize(), resizeWork.getNewSize(),
-                    resizeWork.getNewServiceOfferingId(), resizeWork.isShrinkOk());
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Done executing Resize-Volume within VM work job context. vmId: " + resizeWork.getVmId()
-                        + ", volId: " + resizeWork.getVolumeId() + ", size " + resizeWork.getCurrentSize() + " -> " + resizeWork.getNewSize());
-
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-
-        } else if (work instanceof VmWorkMigrateVolume) {
-            VmWorkMigrateVolume migrateWork = (VmWorkMigrateVolume)work;
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Migrate-Volume within VM work job context. vmId: " + migrateWork.getVmId()
-                        + ", volId: " + migrateWork.getVolumeId() + ", destPoolId: " + migrateWork.getDestPoolId() + ", live: " + migrateWork.isLiveMigrate());
-
-            Volume newVol = orchestrateMigrateVolume(migrateWork.getVolumeId(), migrateWork.getDestPoolId(), migrateWork.isLiveMigrate());
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Done executing Migrate-Volume within VM work job context. vmId: " + migrateWork.getVmId()
-                        + ", volId: " + migrateWork.getVolumeId() + ", destPoolId: " + migrateWork.getDestPoolId() + ", live: " + migrateWork.isLiveMigrate());
+    private Pair<JobInfo.Status, String> orchestrateAttachVolumeToVM(VmWorkAttachVolume work) throws Exception {
+        orchestrateAttachVolumeToVM(work.getVmId(), work.getVolumeId(), work.getDeviceId());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
 
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Long(newVol.getId())));
-        } else if (work instanceof VmWorkTakeVolumeSnapshot) {
-            VmWorkTakeVolumeSnapshot snapshotWork = (VmWorkTakeVolumeSnapshot)work;
+    private Pair<JobInfo.Status, String> orchestrateDetachVolumeFromVM(VmWorkAttachVolume work) throws Exception {
+        orchestrateDetachVolumeFromVM(work.getVmId(), work.getVolumeId());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
 
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Take-Volume-Snapshot within VM work job context. vmId: " + snapshotWork.getVmId()
-                        + ", volId: " + snapshotWork.getVolumeId() + ", policyId: " + snapshotWork.getPolicyId() + ", quiesceVm: " + snapshotWork.isQuiesceVm());
+    private Pair<JobInfo.Status, String> orchestrateResizeVolume(VmWorkResizeVolume work) throws Exception {
+        orchestrateResizeVolume(work.getVolumeId(), work.getCurrentSize(), work.getNewSize(),
+                work.getNewServiceOfferingId(), work.isShrinkOk());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
 
-            Account account = _accountDao.findById(snapshotWork.getAccountId());
-            orchestrateTakeVolumeSnapshot(snapshotWork.getVolumeId(), snapshotWork.getPolicyId(), snapshotWork.getSnapshotId(),
-                    account, snapshotWork.isQuiesceVm());
+    private Pair<JobInfo.Status, String> orchestrateMigrateVolume(VmWorkMigrateVolume work) throws Exception {
+        Volume newVol = orchestrateMigrateVolume(work.getVolumeId(), work.getDestPoolId(), work.isLiveMigrate());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED,
+                _jobMgr.marshallResultObject(new Long(newVol.getId())));
+    }
 
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Done executing Take-Volume-Snapshot within VM work job context. vmId: " + snapshotWork.getVmId()
-                        + ", volId: " + snapshotWork.getVolumeId() + ", policyId: " + snapshotWork.getPolicyId() + ", quiesceVm: " + snapshotWork.isQuiesceVm());
+    private Pair<JobInfo.Status, String> orchestrateTakeVolumeSnapshot(VmWorkTakeVolumeSnapshot work) throws Exception {
+        Account account = _accountDao.findById(work.getAccountId());
+        orchestrateTakeVolumeSnapshot(work.getVolumeId(), work.getPolicyId(), work.getSnapshotId(),
+                account, work.isQuiesceVm());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED,
+                _jobMgr.marshallResultObject(work.getSnapshotId()));
+    }
 
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(snapshotWork.getSnapshotId()));
-        } else {
-            RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd());
-            String exceptionJson = JobSerializerHelper.toSerializedString(e);
-            s_logger.error("Serialize exception object into json: " + exceptionJson);
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.FAILED, exceptionJson);
-        }
+    @Override
+    public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
+        return _jobHandlerProxy.handleVmWorkJob(work);
     }
 }

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/05873822/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
index 6a954de..f7c1120 100644
--- a/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
+++ b/server/src/com/cloud/vm/snapshot/VMSnapshotManagerImpl.java
@@ -42,7 +42,6 @@ import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
 import org.apache.cloudstack.framework.jobs.AsyncJobManager;
 import org.apache.cloudstack.framework.jobs.Outcome;
 import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
-import org.apache.cloudstack.framework.jobs.impl.JobSerializerHelper;
 import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
 import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
 import org.apache.cloudstack.jobs.JobInfo;
@@ -91,6 +90,7 @@ import com.cloud.vm.VirtualMachineProfile;
 import com.cloud.vm.VmWork;
 import com.cloud.vm.VmWorkConstants;
 import com.cloud.vm.VmWorkJobHandler;
+import com.cloud.vm.VmWorkJobHandlerProxy;
 import com.cloud.vm.VmWorkSerializer;
 import com.cloud.vm.dao.UserVmDao;
 import com.cloud.vm.dao.VMInstanceDao;
@@ -124,6 +124,8 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
     @Inject
     AsyncJobManager _jobMgr;
 
+    VmWorkJobHandlerProxy _jobHandlerProxy = new VmWorkJobHandlerProxy(this);
+
     int _vmSnapshotMax;
     int _wait;
 
@@ -478,7 +480,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         }
     }
 
-    public boolean orchestrateDeleteVMSnapshot(Long vmSnapshotId) {
+    private boolean orchestrateDeleteVMSnapshot(Long vmSnapshotId) {
         Account caller = getCaller();
 
         VMSnapshotVO vmSnapshot = _vmSnapshotDao.findById(vmSnapshotId);
@@ -585,7 +587,7 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
         }
     }
 
-    public UserVm orchestrateRevertToVMSnapshot(Long vmSnapshotId) throws InsufficientCapacityException, ResourceUnavailableException, ConcurrentOperationException {
+    private UserVm orchestrateRevertToVMSnapshot(Long vmSnapshotId) throws InsufficientCapacityException, ResourceUnavailableException, ConcurrentOperationException {
 
         // check if VM snapshot exists in DB
         VMSnapshotVO vmSnapshotVo = _vmSnapshotDao.findById(vmSnapshotId);
@@ -984,72 +986,31 @@ public class VMSnapshotManagerImpl extends ManagerBase implements VMSnapshotMana
                 vmId);
     }
 
-    @Override
-    public Pair<JobInfo.Status, String> handleVmWorkJob(AsyncJob job, VmWork work) throws Exception {
-
-        if (work instanceof VmWorkCreateVMSnapshot) {
-            VmWorkCreateVMSnapshot createWork = (VmWorkCreateVMSnapshot)work;
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Create-VM-Snapshot within VM work job context. vmId: " + createWork.getVmId()
-                        + ", VM snapshotId: " + createWork.getVmSnapshotId() + "quiesce: " + createWork.isQuiesceVm());
-
-            VMSnapshot vmSnapshot = orchestrateCreateVMSnapshot(createWork.getVmId(), createWork.getVmSnapshotId(), createWork.isQuiesceVm());
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Create-VM-Snapshot within VM work job context. vmId: " + createWork.getVmId()
-                        + ", VM snapshotId: " + createWork.getVmSnapshotId() + "quiesce: " + createWork.isQuiesceVm());
-
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Long(vmSnapshot.getId())));
-        } else if (work instanceof VmWorkDeleteVMSnapshot) {
-            VmWorkDeleteVMSnapshot deleteWork = (VmWorkDeleteVMSnapshot)work;
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Delete-VM-Snapshot within VM work job context. vmId: " + deleteWork.getVmId()
-                        + ", VM snapshotId: " + deleteWork.getVmSnapshotId());
-
-            boolean result = orchestrateDeleteVMSnapshot(deleteWork.getVmSnapshotId());
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Done executing Delete-VM-Snapshot within VM work job context. vmId: " + deleteWork.getVmId()
-                        + ", VM snapshotId: " + deleteWork.getVmSnapshotId());
-
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Boolean(result)));
-
-        } else if (work instanceof VmWorkRevertToVMSnapshot) {
-            VmWorkRevertToVMSnapshot revertWork = (VmWorkRevertToVMSnapshot)work;
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Revert-VM-Snapshot within VM work job context. vmId: " + revertWork.getVmId()
-                        + ", VM snapshotId: " + revertWork.getVmSnapshotId());
-
-            orchestrateRevertToVMSnapshot(revertWork.getVmSnapshotId());
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Done executing Revert-VM-Snapshot within VM work job context. vmId: " + revertWork.getVmId()
-                        + ", VM snapshotId: " + revertWork.getVmSnapshotId());
-
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
-
-        } else if (work instanceof VmWorkDeleteAllVMSnapshots) {
-            VmWorkDeleteAllVMSnapshots deleteAllWork = (VmWorkDeleteAllVMSnapshots)work;
-
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Delete-All-VM-Snapshot within VM work job context. vmId: " + deleteAllWork.getVmId());
-
-            boolean result = orchestrateDeleteAllVMSnapshots(deleteAllWork.getVmId(), deleteAllWork.getSnapshotType());
+    public Pair<JobInfo.Status, String> orchestrateCreateVMSnapshot(VmWorkCreateVMSnapshot work) throws Exception {
+        VMSnapshot snapshot = orchestrateCreateVMSnapshot(work.getVmId(), work.getVmSnapshotId(), work.isQuiesceVm());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED,
+                _jobMgr.marshallResultObject(new Long(snapshot.getId())));
+    }
 
-            if (s_logger.isDebugEnabled())
-                s_logger.debug("Execute Delete-All-VM-Snapshot within VM work job context. vmId: " + deleteAllWork.getVmId());
+    public Pair<JobInfo.Status, String> orchestrateDeleteVMSnapshot(VmWorkDeleteVMSnapshot work) {
+        boolean result = orchestrateDeleteVMSnapshot(work.getVmSnapshotId());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED,
+                _jobMgr.marshallResultObject(new Boolean(result)));
+    }
 
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, JobSerializerHelper.toObjectSerializedString(new Boolean(result)));
+    public Pair<JobInfo.Status, String> orchestrateRevertToVMSnapshot(VmWorkRevertToVMSnapshot work) throws Exception {
+        orchestrateRevertToVMSnapshot(work.getVmSnapshotId());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
+    }
 
-        } else {
+    public Pair<JobInfo.Status, String> orchestrateDeleteAllVMSnapshots(VmWorkDeleteAllVMSnapshots work) {
+        boolean result = orchestrateDeleteAllVMSnapshots(work.getVmId(), work.getSnapshotType());
+        return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED,
+                _jobMgr.marshallResultObject(new Boolean(result)));
+    }
 
-            RuntimeException e = new RuntimeException("Unsupported VM work command: " + job.getCmd());
-            String exceptionJson = JobSerializerHelper.toSerializedString(e);
-            s_logger.error("Serialize exception object into json: " + exceptionJson);
-            return new Pair<JobInfo.Status, String>(JobInfo.Status.FAILED, exceptionJson);
-        }
+    @Override
+    public Pair<JobInfo.Status, String> handleVmWorkJob(VmWork work) throws Exception {
+        return _jobHandlerProxy.handleVmWorkJob(work);
     }
 }