You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/11/14 13:11:29 UTC

[3/3] git commit: updated refs/heads/4.2-workplace to 1148b6f

Much better version of the vm state detection based on xenserver events


Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/1148b6fe
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/1148b6fe
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/1148b6fe

Branch: refs/heads/4.2-workplace
Commit: 1148b6fed478516e5430af92d4331dd3bd9597ed
Parents: b323630
Author: Alex Huang <al...@citrix.com>
Authored: Thu Nov 14 03:44:26 2013 -0800
Committer: Alex Huang <al...@citrix.com>
Committed: Thu Nov 14 04:11:26 2013 -0800

----------------------------------------------------------------------
 .../xen/resource/CitrixResourceBase.java        | 306 +++++++++----------
 1 file changed, 148 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1148b6fe/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java
----------------------------------------------------------------------
diff --git a/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java b/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java
index c2a3e7b..90ec6a4 100644
--- a/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java
+++ b/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java
@@ -82,10 +82,8 @@ import com.xensource.xenapi.Types;
 import com.xensource.xenapi.Types.BadAsyncResult;
 import com.xensource.xenapi.Types.BadServerResponse;
 import com.xensource.xenapi.Types.ConsoleProtocol;
-import com.xensource.xenapi.Types.EventsLost;
 import com.xensource.xenapi.Types.IpConfigurationMode;
 import com.xensource.xenapi.Types.OperationNotAllowed;
-import com.xensource.xenapi.Types.SessionNotRegistered;
 import com.xensource.xenapi.Types.SrFull;
 import com.xensource.xenapi.Types.VbdType;
 import com.xensource.xenapi.Types.VmBadPowerState;
@@ -4967,8 +4965,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
             if (_host.uuid.equals(hostr.uuid)) {
                 HashMap<String, Ternary<String, VirtualMachine.State, String>> allStates=fullClusterSync(conn);
                 cmd.setClusterVMStateChanges(allStates);
-                _listener = new VmEventListener();
+                _listener = new VmEventListener(true);
                 _listener.start();
+            } else {
+                _listener = new VmEventListener(false);
             }
         } catch (Throwable e) {
             s_logger.warn("Check for master failed, failing the FULL Cluster sync command");
@@ -8061,45 +8061,14 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
     }
 
     protected Answer execute(final ClusterSyncCommand cmd) {
-        Connection conn = getConnection();
-        //check if this is master
-        Pool pool;
-        try {
-            pool = Pool.getByUuid(conn, _host.pool);
-            Pool.Record poolr = pool.getRecord(conn);
-
-            Host.Record hostr = poolr.master.getRecord(conn);
-            if (!_host.uuid.equals(hostr.uuid)) {
-                return new Answer(cmd);
-            }
-        } catch (Throwable e) {
-            s_logger.warn("Check for master failed, failing the Cluster sync command");
-            return  new Answer(cmd);
+        if (!_listener.isListening()) {
+            return new Answer(cmd);
         }
-        HashMap<String, Ternary<String, VirtualMachine.State, String>> newStates = deltaClusterSync(conn);
-        return new ClusterSyncAnswer(cmd.getClusterId(), newStates);
-    }
-
-    protected boolean isMaster() {
-        Connection conn = getConnection();
-        //check if this is master
-        Pool pool;
-        try {
-            pool = Pool.getByUuid(conn, _host.pool);
-            Pool.Record poolr = pool.getRecord(conn);
 
-            Host.Record hostr = poolr.master.getRecord(conn);
-            return _host.uuid.equals(hostr.uuid);
-        } catch (BadServerResponse e) {
-            throw new CloudRuntimeException("Unable to determine if this is the master: " + _host.ip, e);
-        } catch (XenAPIException e) {
-            throw new CloudRuntimeException("Unable to determine if this is the master: " + _host.ip, e);
-        } catch (XmlRpcException e) {
-            throw new CloudRuntimeException("Unable to determine if this is the master: " + _host.ip, e);
-        }
+        HashMap<String, Ternary<String, VirtualMachine.State, String>> newStates = _listener.getChanges();
+        return new ClusterSyncAnswer(cmd.getClusterId(), newStates);
     }
 
-
     protected HashMap<String, Ternary<String, VirtualMachine.State, String>> fullClusterSync(Connection conn) {
         synchronized (_cluster.intern()) {
             s_vms.clear(_cluster);
@@ -8612,163 +8581,184 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
 
     protected class VmEventListener extends Thread {
         boolean _stop = false;
-        HashMap<String, Ternary<String, VirtualMachine.State, String>> changes = new HashMap<String, Ternary<String, VirtualMachine.State, String>>();
+        HashMap<String, Ternary<String, VirtualMachine.State, String>> _changes = new HashMap<String, Ternary<String, VirtualMachine.State, String>>();
+        boolean _isMaster;
+        Set<String> _classes;
+        String _token = "";
+
+        public VmEventListener(boolean isMaster) {
+            _isMaster = isMaster;
+            _classes = new HashSet<String>();
+            _classes.add("VM");
+        }
         
         @Override
         public void run() {
             setName("XS-Listener-" + _host.ip);
-            String token = "";
-            Set<String> classes = new HashSet<String>();
-            classes.add("VM");
             while (!_stop) {
-                Connection conn = getConnection();
-                Map results;
                 try {
-                    results = Event.properFrom(conn, classes, token, new Double(30));
-                } catch (BadServerResponse e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                    continue;
-                } catch (SessionNotRegistered e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                    continue;
-                } catch (EventsLost e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                    continue;
-                } catch (XenAPIException e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                    continue;
-                } catch (XmlRpcException e) {
-                    // TODO Auto-generated catch block
-                    e.printStackTrace();
-                    continue;
-                }
-
-                token = (String)results.get("token");
-                Set<Event.Record> events = (Set<Event.Record>)results.get("events");
-                for (Event.Record event : events) {
+                    Connection conn = getConnection();
+                    Map<?, ?> results;
                     try {
-                        if (!(event.snapshot instanceof VM.Record)) {
-                            s_logger.debug("The snapshot is not a VM: " + event);
-                            continue;
-                        }
-                        VM.Record vm = (VM.Record)event.snapshot;
-                        if (!VirtualMachineName.isValidCloudStackVmName(vm.nameLabel, _instance)) {
-                            s_logger.debug("Skipping over VMs that does not form to the CloudStack naming convention: " + vm.nameLabel);
-                            continue;
-                        }
-                        recordChanges(conn, vm, vm.residentOn.getUuid(conn));
-                    } catch (BadServerResponse e) {
-                        // TODO Auto-generated catch block
-                        e.printStackTrace();
-                    } catch (XenAPIException e) {
-                        // TODO Auto-generated catch block
-                        e.printStackTrace();
-                    } catch (XmlRpcException e) {
-                        // TODO Auto-generated catch block
-                        e.printStackTrace();
+                        results = Event.properFrom(conn, _classes, _token, new Double(30));
+                    } catch (Exception e) {
+                        s_logger.error("Retrying the waiting on VM events due to: ", e);
+                        continue;
                     }
 
+                    _token = (String)results.get("token");
+                    @SuppressWarnings("unchecked")
+                    Set<Event.Record> events = (Set<Event.Record>)results.get("events");
+                    for (Event.Record event : events) {
+                        try {
+                            if (!(event.snapshot instanceof VM.Record)) {
+                                if (s_logger.isDebugEnabled()) {
+                                    s_logger.debug("The snapshot is not a VM: " + event);
+                                }
+                                continue;
+                            }
+                            VM.Record vm = (VM.Record)event.snapshot;
+
+                            String hostUuid = null;
+                            if (vm.residentOn != null && !vm.residentOn.toWireString().contains("OpaqueRef:NULL")) {
+                                hostUuid = vm.residentOn.getUuid(conn);
+                            }
+                            recordChanges(conn, vm, hostUuid);
+                        } catch (Exception e) {
+                            s_logger.error("Skipping over " + event, e);
+                        }
+                    }
+                } catch (Throwable th) {
+                    s_logger.error("Exception caught in eventlistener thread: ", th);
                 }
             }
         }
 
         protected void recordChanges(Connection conn, VM.Record rec, String hostUuid) {
-            synchronized (_cluster.intern()) {
-                String xstoolsversion = null;
-                VirtualMachine.State newState = convertToState(rec.powerState);
-                String vm = rec.nameLabel;
-                Ternary<String, VirtualMachine.State, String> oldState = s_vms.get(_cluster, vm);
+            String vm = rec.nameLabel;
+            if (!VirtualMachineName.isValidCloudStackVmName(vm, _instance)) {
+                s_logger.debug("Skipping over VMs that does not conform to CloudStack naming convention: " + vm);
+                return;
+            }
 
-                if (oldState == null) {
-                    // If we can't find this VM in the previous states then it must have been stopped.
-                    oldState = new Ternary<String, VirtualMachine.State, String>(null, VirtualMachine.State.Stopped, null);
-                    s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
-                    changes.put(vm, new Ternary<String, VirtualMachine.State, String>(hostUuid, newState, null));
-                }
-                
-                // check if xstoolsversion changed
-                if (xstoolsversion != null) {
-                    if (xstoolsversion != oldState.third() && newState != VirtualMachine.State.Stopped && newState != VirtualMachine.State.Stopping) {
-                        s_logger.warn("Detecting a change in xstoolsversion for " + vm);
-                        changes.put(vm, new Ternary<String, VirtualMachine.State, String>(hostUuid, newState, xstoolsversion));
+            VirtualMachine.State currentState = convertToState(rec.powerState);
+            if (vm.startsWith("migrating")) {
+                s_logger.warn("Skipping " + vm + " because it is migrating.");
+                return;
+            }
 
-                        if (s_logger.isDebugEnabled()) {
-                            s_logger.debug("11. The VM " + vm + " is in " + newState + " state");
-                        }
-                        s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
-                        return;
-                    }
+            if (currentState == VirtualMachine.State.Stopped) {
+                if (s_logger.isTraceEnabled()) {
+                    s_logger.trace("Double check the power state to make sure we got the correct state for " + vm);
                 }
-                
-                //check if host is changed
-                if (hostUuid != null) {
-                    if (!hostUuid.equals(oldState.first()) && newState != VirtualMachine.State.Stopped && newState != VirtualMachine.State.Stopping) {
-                        s_logger.warn("Detecting a change in host for " + vm);
-                        changes.put(vm, new Ternary<String, VirtualMachine.State, String>(hostUuid, newState, null));
+                currentState = getRealPowerState(conn, vm);
+            }
 
-                        s_logger.debug("11. The VM " + vm + " is in " + newState + " state");
-                        s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
-                        return;
-                    }
-                }
+            boolean updateMap = false;
+            boolean reportChange = false;
 
-                if (newState == VirtualMachine.State.Stopped && oldState.second() != VirtualMachine.State.Stopping && oldState.second() != VirtualMachine.State.Stopped) {
-                    newState = getRealPowerState(conn, vm);
+            // NOTE: For now we only record change when the VM is stopped.  We don't find out any VMs starting for now.
+            synchronized (_cluster.intern()) {
+                Ternary<String, VirtualMachine.State, String> oldState = s_vms.get(_cluster, vm);
+                if (oldState == null) {
+                    if (s_logger.isTraceEnabled()) {
+                        s_logger.trace("Unable to find " + vm + " from previous map.  Assuming it was in Stopped state.");
+                    }
+                    oldState = new Ternary<String, VirtualMachine.State, String>(null, VirtualMachine.State.Stopped, null);
                 }
-
+                
                 if (s_logger.isTraceEnabled()) {
-                    s_logger.trace("VM " + vm + ": xen has state " + newState + " and we have state " + (oldState != null ? oldState.toString() : "null"));
+                    s_logger.trace(vm + ": current state=" + currentState + ", previous state=" + oldState);
                 }
 
-                if (vm.startsWith("migrating")) {
-                    s_logger.warn("Migrating from xen detected.  Skipping");
-                    return;
-                }
-                
                 if (oldState.second() == VirtualMachine.State.Starting) {
-                    if (newState == VirtualMachine.State.Running) {
-                        s_logger.debug("12. The VM " + vm + " is in " + VirtualMachine.State.Running + " state");
-                        s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
-                    } else if (newState == VirtualMachine.State.Stopped) {
-                        s_logger.warn("Ignoring vm " + vm + " because of a lag in starting the vm.");
+                    if (currentState == VirtualMachine.State.Running) {
+                        updateMap = true;
+                        reportChange = false;
+                    } else if (currentState == VirtualMachine.State.Stopped) {
+                        updateMap = false;
+                        reportChange = false;
                     }
                 } else if (oldState.second() == VirtualMachine.State.Migrating) {
-                    if (newState == VirtualMachine.State.Running) {
-                        s_logger.debug("Detected that an migrating VM is now running: " + vm);
-                        s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
-                    }
+                    updateMap = true;
+                    reportChange = false;
                 } else if (oldState.second() == VirtualMachine.State.Stopping) {
-                    if (newState == VirtualMachine.State.Stopped) {
-                        s_logger.debug("13. The VM " + vm + " is in " + VirtualMachine.State.Stopped + " state");
-                        s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
-                    } else if (newState == VirtualMachine.State.Running) {
-                        s_logger.warn("Ignoring vm " + vm + " because of a lag in stopping the vm. ");
+                    if (currentState == VirtualMachine.State.Stopped) {
+                        updateMap = true;
+                        reportChange = false;
+                    } else if (currentState == VirtualMachine.State.Running) {
+                        updateMap = false;
+                        reportChange = false;
                     }
-                } else if (oldState.second() != newState) {
-                    s_logger.debug("14. The VM " + vm + " is in " + newState + " state was " + oldState.second());
-                    s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
-                    if (newState == VirtualMachine.State.Stopped) {
-                        /*
-                         * if (s_vmsKilled.remove(vm)) { s_logger.debug("VM " + vm + " has been killed for storage. ");
-                         * newState = VirtualMachine.State.Error; }
-                         */
+                } else if (oldState.second() != currentState) {
+                    updateMap = true;
+                    reportChange = true;
+                } else if (hostUuid != null && !hostUuid.equals(oldState.first())) {
+                    if (s_logger.isDebugEnabled()) {
+                        s_logger.debug("Detecting " + vm + " moved from " + oldState.first() + " to " + hostUuid);
                     }
-                    changes.put(vm, new Ternary<String, VirtualMachine.State, String>(hostUuid, newState, null));
+                    reportChange = true;
+                    updateMap = true;
                 }
+
+                if (updateMap) {
+                    s_vms.put(_cluster, hostUuid, vm, currentState);
+                    if (s_logger.isTraceEnabled()) {
+                        s_logger.trace("Updated " + vm + " to [" + hostUuid + ", " + currentState);
+                    }
+                }
+                if (reportChange) {
+                    Ternary<String, VirtualMachine.State, String> change = _changes.get(vm);
+                    if (hostUuid == null) {
+                        // This is really strange code.  It looks like the sync
+                        // code wants this to be set, which is extremely weird
+                        // for VMs that are dead.  Why would I want to set the
+                        // hostUuid if the VM is stopped.
+                        hostUuid = oldState.first();
+                        if (hostUuid == null) {
+                            hostUuid = _host.uuid;
+                        }
+                    }
+                    if (change == null) {
+                        change = new Ternary<String, VirtualMachine.State, String>(hostUuid, currentState, null);
+                    } else {
+                        change.first(hostUuid);
+                        change.second(currentState);
+                    }
+                    _changes.put(vm, change);
+                }
+            }
+        }
+
+        @Override
+        public void start() {
+            if (_isMaster) {
+                // Throw away the initial set of events because they're history
+                Connection conn = getConnection();
+                Map<?, ?> results;
+                try {
+                    results = Event.properFrom(conn, _classes, _token, new Double(30));
+                } catch (Exception e) {
+                    s_logger.error("Retrying the waiting on VM events due to: ", e);
+                    throw new CloudRuntimeException("Unable to start a listener thread to listen to VM events", e);
+                }
+                _token = (String)results.get("token");
+                s_logger.debug("Starting the event listener thread for " + _host.uuid);
+                super.start();
             }
         }
 
+        public boolean isListening() {
+            return _isMaster;
+        }
+
         public HashMap<String, Ternary<String, VirtualMachine.State, String>> getChanges() {
             synchronized(_cluster.intern()) {
-                if (changes.size() == 0) {
+                if (_changes.size() == 0) {
                     return null;
                 }
-                HashMap<String, Ternary<String, VirtualMachine.State, String>> diff = changes;
-                changes = new HashMap<String, Ternary<String, VirtualMachine.State, String>>();
+                HashMap<String, Ternary<String, VirtualMachine.State, String>> diff = _changes;
+                _changes = new HashMap<String, Ternary<String, VirtualMachine.State, String>>();
                 return diff;
             }
         }