You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ke...@apache.org on 2013/06/20 02:55:12 UTC

git commit: updated refs/heads/vmsync to 0bfc817

Updated Branches:
  refs/heads/vmsync e2edae171 -> 0bfc817bc


Handle transitional states across management server restart


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/0bfc817b
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/0bfc817b
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/0bfc817b

Branch: refs/heads/vmsync
Commit: 0bfc817bc6b8c2887e9ff83f22cd75263cffbb54
Parents: e2edae1
Author: Kelven Yang <ke...@gmail.com>
Authored: Wed Jun 19 17:47:14 2013 -0700
Committer: Kelven Yang <ke...@gmail.com>
Committed: Wed Jun 19 17:47:14 2013 -0700

----------------------------------------------------------------------
 api/src/com/cloud/vm/VirtualMachine.java        |   6 +-
 .../com/cloud/vm/VirtualMachineManagerImpl.java | 201 ++++++++++---------
 .../cloudstack/vm/jobs/VmWorkJobDaoImpl.java    |  12 +-
 .../framework/jobs/dao/AsyncJobDaoImpl.java     |   2 +-
 4 files changed, 123 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/api/src/com/cloud/vm/VirtualMachine.java
----------------------------------------------------------------------
diff --git a/api/src/com/cloud/vm/VirtualMachine.java b/api/src/com/cloud/vm/VirtualMachine.java
index 3ac9aed..fe0ea76 100755
--- a/api/src/com/cloud/vm/VirtualMachine.java
+++ b/api/src/com/cloud/vm/VirtualMachine.java
@@ -108,8 +108,12 @@ public interface VirtualMachine extends RunningOn, ControlledEntity, Identity, I
             s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
             s_fsm.addTransition(State.Stopped, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
             s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
-
             s_fsm.addTransition(State.Migrating, VirtualMachine.Event.FollowAgentPowerOnReport, State.Running);
+
+            s_fsm.addTransition(State.Starting, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
+            s_fsm.addTransition(State.Stopping, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
+            s_fsm.addTransition(State.Running, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
+            s_fsm.addTransition(State.Migrating, VirtualMachine.Event.FollowAgentPowerOffReport, State.Stopped);
         }
         
         public static boolean isVmStarted(State oldState, Event e, State newState) {

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
index 8989235..141fcb4 100755
--- a/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/vm/VirtualMachineManagerImpl.java
@@ -457,12 +457,12 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
         _vmDao.remove(vm.getId());
     }
     
-    
-
     @Override
     public boolean start() {
         _executor.scheduleAtFixedRate(new TransitionTask(), _pingInterval.value(), _pingInterval.value(), TimeUnit.SECONDS);
         _executor.scheduleAtFixedRate(new CleanupTask(), _pingInterval.value()*2, _pingInterval.value()*2, TimeUnit.SECONDS);
+
+        // cancel jobs left-over from last run
         cancelWorkItems(_nodeId);
         
         return true;
@@ -1801,42 +1801,42 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
 
     protected void cancelWorkItems(long nodeId) {
         /*
-                GlobalLock scanLock = GlobalLock.getInternLock("vmmgr.cancel.workitem");
+            GlobalLock scanLock = GlobalLock.getInternLock("vmmgr.cancel.workitem");
 
-                try {
-                    if (scanLock.lock(3)) {
-                        try {
-                            List<VmWorkJobVO> works = _workDao.listWorkInProgressFor(nodeId);
-                            for (VmWorkJobVO work : works) {
-                                s_logger.info("Handling unfinished work item: " + work);
-                                try {
-                                    VMInstanceVO vm = _vmDao.findById(work.getInstanceId());
-                                    if (vm != null) {
-                                        if (work.getType() == State.Starting) {
-                                            _haMgr.scheduleRestart(vm, true);
-                                            work.setManagementServerId(_nodeId);
-                                            _workDao.update(work.getId(), work);
-                                        } else if (work.getType() == State.Stopping) {
-                                            _haMgr.scheduleStop(vm, vm.getHostId(), WorkType.CheckStop);
-                                            work.setManagementServerId(_nodeId);
-                                            _workDao.update(work.getId(), work);
-                                        } else if (work.getType() == State.Migrating) {
-                                            _haMgr.scheduleMigration(vm);
-                                            work.setStep(Step.Done);
-                                            _workDao.update(work.getId(), work);
-                                        }
+            try {
+                if (scanLock.lock(3)) {
+                    try {
+                        List<VmWorkJobVO> works = _workDao.listWorkInProgressFor(nodeId);
+                        for (VmWorkJobVO work : works) {
+                            s_logger.info("Handling unfinished work item: " + work);
+                            try {
+                                VMInstanceVO vm = _vmDao.findById(work.getInstanceId());
+                                if (vm != null) {
+                                    if (work.getType() == State.Starting) {
+                                        _haMgr.scheduleRestart(vm, true);
+                                        work.setManagementServerId(_nodeId);
+                                        _workDao.update(work.getId(), work);
+                                    } else if (work.getType() == State.Stopping) {
+                                        _haMgr.scheduleStop(vm, vm.getHostId(), WorkType.CheckStop);
+                                        work.setManagementServerId(_nodeId);
+                                        _workDao.update(work.getId(), work);
+                                    } else if (work.getType() == State.Migrating) {
+                                        _haMgr.scheduleMigration(vm);
+                                        work.setStep(Step.Done);
+                                        _workDao.update(work.getId(), work);
                                     }
-                                } catch (Exception e) {
-                                    s_logger.error("Error while handling " + work, e);
                                 }
+                            } catch (Exception e) {
+                                s_logger.error("Error while handling " + work, e);
                             }
-                        } finally {
-                            scanLock.unlock();
                         }
+                    } finally {
+                        scanLock.unlock();
                     }
-                } finally {
-                    scanLock.releaseRef();
                 }
+            } finally {
+                scanLock.releaseRef();
+            }
         */
     }
 
@@ -3475,7 +3475,7 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     		}
       		_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
         			VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Running) from out-of-context transition. VM network environment may need to be reset");
-          		break;
+          	break;
     		
     	case Destroyed :
     	case Expunging :
@@ -3501,30 +3501,29 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     
     private void handlePowerOffReportWithNoPendingJobsOnVM(VMInstanceVO vm) {
 
-    	// TODO :
     	// 	1) handle left-over transitional VM states
     	//	2) handle out of sync stationary states, schedule force-stop to release resources
     	//
     	switch(vm.getState()) {
     	case Starting :
-    		break;
-    		
-    	case Running :
-    		break;
-    		
     	case Stopping :
-    		break;
-    		
     	case Stopped :
-    		break;
+    	case Migrating :
+    		try {
+    			stateTransitTo(vm, VirtualMachine.Event.FollowAgentPowerOffReport, vm.getPowerHostId());
+    		} catch(NoTransitionException e) {
+    			s_logger.warn("Unexpected VM state transition exception, race-condition?", e);
+    		}
+      		_alertMgr.sendAlert(AlertManager.ALERT_TYPE_SYNC, vm.getDataCenterId(), vm.getPodIdToDeployIn(),
+        			VM_SYNC_ALERT_SUBJECT, "VM " + vm.getHostName() + "(" + vm.getInstanceName() + ") state is sync-ed (" + vm.getState() + " -> Stopped) from out-of-context transition.");
+      		// TODO: we need to forcely release all resource allocation
+          	break;
     		
+    	case Running :
     	case Destroyed :
     	case Expunging :
     		break;
     		
-    	case Migrating :
-    		break;
-    		
     	case Error :
     	default :
     		break;
@@ -3582,79 +3581,99 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac
     
     
     // VMs that in transitional state without recent power state report
-    @DB
     private List<Long> listStalledVMInTransitionStateOnUpHost(long hostId, Date cutTime) {
     	String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
                      "AND h.id = ? AND i.power_state_update_time < ? AND i.host_id = h.id " +
     			     "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
-    			     "AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)";
+    			     "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
     	
     	List<Long> l = new ArrayList<Long>();
-        Transaction txn = Transaction.currentTxn();;
-        PreparedStatement pstmt = null;
-        try {
-            pstmt = txn.prepareAutoCloseStatement(sql);
-            
-            pstmt.setLong(1, hostId);
- 	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
-            ResultSet rs = pstmt.executeQuery();
-            while(rs.next()) {
-            	l.add(rs.getLong(1));
-            }
-        } catch (SQLException e) {
-        } catch (Throwable e) {
-        }
+    	Transaction txn = null;
+    	try {
+    		txn = Transaction.open(Transaction.CLOUD_DB);
+    	
+	        PreparedStatement pstmt = null;
+	        try {
+	            pstmt = txn.prepareAutoCloseStatement(sql);
+	            
+	            pstmt.setLong(1, hostId);
+	 	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+	 	        pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
+	            ResultSet rs = pstmt.executeQuery();
+	            while(rs.next()) {
+	            	l.add(rs.getLong(1));
+	            }
+	        } catch (SQLException e) {
+	        } catch (Throwable e) {
+	        }
+        
+    	} finally {
+    		if(txn != null)
+    			txn.close();
+    	}
         return l;
     }
     
     // VMs that in transitional state and recently have power state update
-    @DB
     private List<Long> listVMInTransitionStateWithRecentReportOnUpHost(long hostId, Date cutTime) {
     	String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status = 'UP' " +
                      "AND h.id = ? AND i.power_state_update_time > ? AND i.host_id = h.id " +
     			     "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
-    			     "AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)";
+    			     "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
     	
     	List<Long> l = new ArrayList<Long>();
-        Transaction txn = Transaction.currentTxn();;
-        PreparedStatement pstmt = null;
-        try {
-            pstmt = txn.prepareAutoCloseStatement(sql);
-            
-            pstmt.setLong(1, hostId);
- 	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
-            ResultSet rs = pstmt.executeQuery();
-            while(rs.next()) {
-            	l.add(rs.getLong(1));
-            }
-        } catch (SQLException e) {
-        } catch (Throwable e) {
-        }
-        return l;
+    	Transaction txn = null;
+    	try {
+    		txn = Transaction.open(Transaction.CLOUD_DB);
+	        PreparedStatement pstmt = null;
+	        try {
+	            pstmt = txn.prepareAutoCloseStatement(sql);
+	            
+	            pstmt.setLong(1, hostId);
+	 	        pstmt.setString(2, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+	 	        pstmt.setInt(3, JobInfo.Status.IN_PROGRESS.ordinal());
+	            ResultSet rs = pstmt.executeQuery();
+	            while(rs.next()) {
+	            	l.add(rs.getLong(1));
+	            }
+	        } catch (SQLException e) {
+	        } catch (Throwable e) {
+	        }
+	        return l;
+    	} finally {
+    		if(txn != null)
+    			txn.close();
+    	}
     }
     
-    @DB
     private List<Long> listStalledVMInTransitionStateOnDisconnectedHosts(Date cutTime) {
     	String sql = "SELECT i.* FROM vm_instance as i, host as h WHERE h.status != 'UP' " +
                  "AND i.power_state_update_time < ? AND i.host_id = h.id " +
 			     "AND (i.state ='Starting' OR i.state='Stopping' OR i.state='Migrating') " +
-			     "AND i.id NOT IN (SELECT vm_instance_id FROM vm_work_job)";
+			     "AND i.id NOT IN (SELECT w.vm_instance_id FROM vm_work_job AS w JOIN async_job AS j ON w.id = j.id WHERE j.job_status = ?)";
 	
     	List<Long> l = new ArrayList<Long>();
-    	Transaction txn = Transaction.currentTxn();;
-    	PreparedStatement pstmt = null;
+    	Transaction txn = null;
     	try {
-	       pstmt = txn.prepareAutoCloseStatement(sql);
-	       
-	       pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
-	       ResultSet rs = pstmt.executeQuery();
-	       while(rs.next()) {
-	       	l.add(rs.getLong(1));
-	       }
-    	} catch (SQLException e) {
-    	} catch (Throwable e) {
+    		txn = Transaction.open(Transaction.CLOUD_DB);
+	    	PreparedStatement pstmt = null;
+	    	try {
+		       pstmt = txn.prepareAutoCloseStatement(sql);
+		       
+		       pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), cutTime));
+		       pstmt.setInt(2, JobInfo.Status.IN_PROGRESS.ordinal());
+		       ResultSet rs = pstmt.executeQuery();
+		       while(rs.next()) {
+		       	l.add(rs.getLong(1));
+		       }
+	    	} catch (SQLException e) {
+	    	} catch (Throwable e) {
+	    	}
+	    	return l;
+    	} finally {
+    		if(txn != null)
+    			txn.close();
     	}
-    	return l;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
index 6361a23..f353357 100644
--- a/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
+++ b/engine/schema/src/org/apache/cloudstack/vm/jobs/VmWorkJobDaoImpl.java
@@ -44,12 +44,14 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
 	@PostConstruct
 	public void init() {
 		PendingWorkJobSearch = createSearchBuilder();
+		PendingWorkJobSearch.and("jobStatus", PendingWorkJobSearch.entity().getStatus(), Op.EQ);
 		PendingWorkJobSearch.and("vmType", PendingWorkJobSearch.entity().getVmType(), Op.EQ);
 		PendingWorkJobSearch.and("vmInstanceId", PendingWorkJobSearch.entity().getVmInstanceId(), Op.EQ);
 		PendingWorkJobSearch.and("step", PendingWorkJobSearch.entity().getStep(), Op.NEQ);
 		PendingWorkJobSearch.done();
 
 		PendingWorkJobByCommandSearch = createSearchBuilder();
+		PendingWorkJobByCommandSearch.and("jobStatus", PendingWorkJobByCommandSearch.entity().getStatus(), Op.EQ);
 		PendingWorkJobByCommandSearch.and("vmType", PendingWorkJobByCommandSearch.entity().getVmType(), Op.EQ);
 		PendingWorkJobByCommandSearch.and("vmInstanceId", PendingWorkJobByCommandSearch.entity().getVmInstanceId(), Op.EQ);
 		PendingWorkJobByCommandSearch.and("step", PendingWorkJobByCommandSearch.entity().getStep(), Op.NEQ);
@@ -58,7 +60,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
 		
 		ExpungeWorkJobSearch = createSearchBuilder();
 		ExpungeWorkJobSearch.and("lastUpdated", ExpungeWorkJobSearch.entity().getLastUpdated(), Op.LT);
-		ExpungeWorkJobSearch.and("status", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
+		ExpungeWorkJobSearch.and("jobStatus", ExpungeWorkJobSearch.entity().getStatus(), Op.NEQ);
 		ExpungeWorkJobSearch.done();
 	}
 	
@@ -66,9 +68,9 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
     public VmWorkJobVO findPendingWorkJob(VirtualMachine.Type type, long instanceId) {
 		
 		SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
+		sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
 		sc.setParameters("vmType", type);
 		sc.setParameters("vmInstanceId", instanceId);
-		sc.setParameters("step", Step.Done);
 		
 		Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
 		List<VmWorkJobVO> result = this.listBy(sc, filter);
@@ -82,9 +84,9 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
     public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId) {
 		
 		SearchCriteria<VmWorkJobVO> sc = PendingWorkJobSearch.create();
+		sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
 		sc.setParameters("vmType", type);
 		sc.setParameters("vmInstanceId", instanceId);
-		sc.setParameters("step", Step.Done);
 		
 		Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
 		return this.listBy(sc, filter);
@@ -94,9 +96,9 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
     public List<VmWorkJobVO> listPendingWorkJobs(VirtualMachine.Type type, long instanceId, String jobCmd) {
 		
 		SearchCriteria<VmWorkJobVO> sc = PendingWorkJobByCommandSearch.create();
+		sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
 		sc.setParameters("vmType", type);
 		sc.setParameters("vmInstanceId", instanceId);
-		sc.setParameters("step", Step.Done);
 		sc.setParameters("cmd", jobCmd);
 		
 		Filter filter = new Filter(VmWorkJobVO.class, "created", true, null, null);
@@ -115,7 +117,7 @@ public class VmWorkJobDaoImpl extends GenericDaoBase<VmWorkJobVO, Long> implemen
     public void expungeCompletedWorkJobs(Date cutDate) {
 		SearchCriteria<VmWorkJobVO> sc = ExpungeWorkJobSearch.create();
 		sc.setParameters("lastUpdated",cutDate);
-        sc.setParameters("status", JobInfo.Status.IN_PROGRESS);
+        sc.setParameters("jobStatus", JobInfo.Status.IN_PROGRESS);
 		
 		expunge(sc);
 	}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/0bfc817b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
----------------------------------------------------------------------
diff --git a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
index 9b1eda6..fb3845c 100644
--- a/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
+++ b/framework/jobs/src/org/apache/cloudstack/framework/jobs/dao/AsyncJobDaoImpl.java
@@ -180,7 +180,7 @@ public class AsyncJobDaoImpl extends GenericDaoBase<AsyncJobVO, Long> implements
 	public void resetJobProcess(long msid, int jobResultCode, String jobResultMessage) {
         String sql = "UPDATE async_job SET job_status=" + JobInfo.Status.FAILED.ordinal() + ", job_result_code=" + jobResultCode
                 + ", job_result='" + jobResultMessage + "' where job_status=" + JobInfo.Status.IN_PROGRESS.ordinal()
-                + " AND (job_complete_msid=? OR (job_complete_msid IS NULL AND job_init_msid=?))";
+                + " AND (job_executing_msid=? OR (job_executing_msid IS NULL AND job_init_msid=?))";
 		
         Transaction txn = Transaction.currentTxn();
         PreparedStatement pstmt = null;