You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/11/14 13:11:29 UTC
[3/3] git commit: updated refs/heads/4.2-workplace to 1148b6f
Much better version of the vm state detection based on xenserver events
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/1148b6fe
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/1148b6fe
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/1148b6fe
Branch: refs/heads/4.2-workplace
Commit: 1148b6fed478516e5430af92d4331dd3bd9597ed
Parents: b323630
Author: Alex Huang <al...@citrix.com>
Authored: Thu Nov 14 03:44:26 2013 -0800
Committer: Alex Huang <al...@citrix.com>
Committed: Thu Nov 14 04:11:26 2013 -0800
----------------------------------------------------------------------
.../xen/resource/CitrixResourceBase.java | 306 +++++++++----------
1 file changed, 148 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/1148b6fe/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java
----------------------------------------------------------------------
diff --git a/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java b/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java
index c2a3e7b..90ec6a4 100644
--- a/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java
+++ b/plugins/hypervisors/xen/src/com/cloud/hypervisor/xen/resource/CitrixResourceBase.java
@@ -82,10 +82,8 @@ import com.xensource.xenapi.Types;
import com.xensource.xenapi.Types.BadAsyncResult;
import com.xensource.xenapi.Types.BadServerResponse;
import com.xensource.xenapi.Types.ConsoleProtocol;
-import com.xensource.xenapi.Types.EventsLost;
import com.xensource.xenapi.Types.IpConfigurationMode;
import com.xensource.xenapi.Types.OperationNotAllowed;
-import com.xensource.xenapi.Types.SessionNotRegistered;
import com.xensource.xenapi.Types.SrFull;
import com.xensource.xenapi.Types.VbdType;
import com.xensource.xenapi.Types.VmBadPowerState;
@@ -4967,8 +4965,10 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
if (_host.uuid.equals(hostr.uuid)) {
HashMap<String, Ternary<String, VirtualMachine.State, String>> allStates=fullClusterSync(conn);
cmd.setClusterVMStateChanges(allStates);
- _listener = new VmEventListener();
+ _listener = new VmEventListener(true);
_listener.start();
+ } else {
+ _listener = new VmEventListener(false);
}
} catch (Throwable e) {
s_logger.warn("Check for master failed, failing the FULL Cluster sync command");
@@ -8061,45 +8061,14 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
}
protected Answer execute(final ClusterSyncCommand cmd) {
- Connection conn = getConnection();
- //check if this is master
- Pool pool;
- try {
- pool = Pool.getByUuid(conn, _host.pool);
- Pool.Record poolr = pool.getRecord(conn);
-
- Host.Record hostr = poolr.master.getRecord(conn);
- if (!_host.uuid.equals(hostr.uuid)) {
- return new Answer(cmd);
- }
- } catch (Throwable e) {
- s_logger.warn("Check for master failed, failing the Cluster sync command");
- return new Answer(cmd);
+ if (!_listener.isListening()) {
+ return new Answer(cmd);
}
- HashMap<String, Ternary<String, VirtualMachine.State, String>> newStates = deltaClusterSync(conn);
- return new ClusterSyncAnswer(cmd.getClusterId(), newStates);
- }
-
- protected boolean isMaster() {
- Connection conn = getConnection();
- //check if this is master
- Pool pool;
- try {
- pool = Pool.getByUuid(conn, _host.pool);
- Pool.Record poolr = pool.getRecord(conn);
- Host.Record hostr = poolr.master.getRecord(conn);
- return _host.uuid.equals(hostr.uuid);
- } catch (BadServerResponse e) {
- throw new CloudRuntimeException("Unable to determine if this is the master: " + _host.ip, e);
- } catch (XenAPIException e) {
- throw new CloudRuntimeException("Unable to determine if this is the master: " + _host.ip, e);
- } catch (XmlRpcException e) {
- throw new CloudRuntimeException("Unable to determine if this is the master: " + _host.ip, e);
- }
+ HashMap<String, Ternary<String, VirtualMachine.State, String>> newStates = _listener.getChanges();
+ return new ClusterSyncAnswer(cmd.getClusterId(), newStates);
}
-
protected HashMap<String, Ternary<String, VirtualMachine.State, String>> fullClusterSync(Connection conn) {
synchronized (_cluster.intern()) {
s_vms.clear(_cluster);
@@ -8612,163 +8581,184 @@ public abstract class CitrixResourceBase implements ServerResource, HypervisorRe
protected class VmEventListener extends Thread {
boolean _stop = false;
- HashMap<String, Ternary<String, VirtualMachine.State, String>> changes = new HashMap<String, Ternary<String, VirtualMachine.State, String>>();
+ HashMap<String, Ternary<String, VirtualMachine.State, String>> _changes = new HashMap<String, Ternary<String, VirtualMachine.State, String>>();
+ boolean _isMaster;
+ Set<String> _classes;
+ String _token = "";
+
+ public VmEventListener(boolean isMaster) {
+ _isMaster = isMaster;
+ _classes = new HashSet<String>();
+ _classes.add("VM");
+ }
@Override
public void run() {
setName("XS-Listener-" + _host.ip);
- String token = "";
- Set<String> classes = new HashSet<String>();
- classes.add("VM");
while (!_stop) {
- Connection conn = getConnection();
- Map results;
try {
- results = Event.properFrom(conn, classes, token, new Double(30));
- } catch (BadServerResponse e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- continue;
- } catch (SessionNotRegistered e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- continue;
- } catch (EventsLost e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- continue;
- } catch (XenAPIException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- continue;
- } catch (XmlRpcException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- continue;
- }
-
- token = (String)results.get("token");
- Set<Event.Record> events = (Set<Event.Record>)results.get("events");
- for (Event.Record event : events) {
+ Connection conn = getConnection();
+ Map<?, ?> results;
try {
- if (!(event.snapshot instanceof VM.Record)) {
- s_logger.debug("The snapshot is not a VM: " + event);
- continue;
- }
- VM.Record vm = (VM.Record)event.snapshot;
- if (!VirtualMachineName.isValidCloudStackVmName(vm.nameLabel, _instance)) {
- s_logger.debug("Skipping over VMs that does not form to the CloudStack naming convention: " + vm.nameLabel);
- continue;
- }
- recordChanges(conn, vm, vm.residentOn.getUuid(conn));
- } catch (BadServerResponse e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (XenAPIException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (XmlRpcException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ results = Event.properFrom(conn, _classes, _token, new Double(30));
+ } catch (Exception e) {
+ s_logger.error("Retrying the waiting on VM events due to: ", e);
+ continue;
}
+ _token = (String)results.get("token");
+ @SuppressWarnings("unchecked")
+ Set<Event.Record> events = (Set<Event.Record>)results.get("events");
+ for (Event.Record event : events) {
+ try {
+ if (!(event.snapshot instanceof VM.Record)) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("The snapshot is not a VM: " + event);
+ }
+ continue;
+ }
+ VM.Record vm = (VM.Record)event.snapshot;
+
+ String hostUuid = null;
+ if (vm.residentOn != null && !vm.residentOn.toWireString().contains("OpaqueRef:NULL")) {
+ hostUuid = vm.residentOn.getUuid(conn);
+ }
+ recordChanges(conn, vm, hostUuid);
+ } catch (Exception e) {
+ s_logger.error("Skipping over " + event, e);
+ }
+ }
+ } catch (Throwable th) {
+ s_logger.error("Exception caught in eventlistener thread: ", th);
}
}
}
protected void recordChanges(Connection conn, VM.Record rec, String hostUuid) {
- synchronized (_cluster.intern()) {
- String xstoolsversion = null;
- VirtualMachine.State newState = convertToState(rec.powerState);
- String vm = rec.nameLabel;
- Ternary<String, VirtualMachine.State, String> oldState = s_vms.get(_cluster, vm);
+ String vm = rec.nameLabel;
+ if (!VirtualMachineName.isValidCloudStackVmName(vm, _instance)) {
+ s_logger.debug("Skipping over VMs that does not conform to CloudStack naming convention: " + vm);
+ return;
+ }
- if (oldState == null) {
- // If we can't find this VM in the previous states then it must have been stopped.
- oldState = new Ternary<String, VirtualMachine.State, String>(null, VirtualMachine.State.Stopped, null);
- s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
- changes.put(vm, new Ternary<String, VirtualMachine.State, String>(hostUuid, newState, null));
- }
-
- // check if xstoolsversion changed
- if (xstoolsversion != null) {
- if (xstoolsversion != oldState.third() && newState != VirtualMachine.State.Stopped && newState != VirtualMachine.State.Stopping) {
- s_logger.warn("Detecting a change in xstoolsversion for " + vm);
- changes.put(vm, new Ternary<String, VirtualMachine.State, String>(hostUuid, newState, xstoolsversion));
+ VirtualMachine.State currentState = convertToState(rec.powerState);
+ if (vm.startsWith("migrating")) {
+ s_logger.warn("Skipping " + vm + " because it is migrating.");
+ return;
+ }
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("11. The VM " + vm + " is in " + newState + " state");
- }
- s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
- return;
- }
+ if (currentState == VirtualMachine.State.Stopped) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Double check the power state to make sure we got the correct state for " + vm);
}
-
- //check if host is changed
- if (hostUuid != null) {
- if (!hostUuid.equals(oldState.first()) && newState != VirtualMachine.State.Stopped && newState != VirtualMachine.State.Stopping) {
- s_logger.warn("Detecting a change in host for " + vm);
- changes.put(vm, new Ternary<String, VirtualMachine.State, String>(hostUuid, newState, null));
+ currentState = getRealPowerState(conn, vm);
+ }
- s_logger.debug("11. The VM " + vm + " is in " + newState + " state");
- s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
- return;
- }
- }
+ boolean updateMap = false;
+ boolean reportChange = false;
- if (newState == VirtualMachine.State.Stopped && oldState.second() != VirtualMachine.State.Stopping && oldState.second() != VirtualMachine.State.Stopped) {
- newState = getRealPowerState(conn, vm);
+ // NOTE: For now we only record change when the VM is stopped. We don't find out any VMs starting for now.
+ synchronized (_cluster.intern()) {
+ Ternary<String, VirtualMachine.State, String> oldState = s_vms.get(_cluster, vm);
+ if (oldState == null) {
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Unable to find " + vm + " from previous map. Assuming it was in Stopped state.");
+ }
+ oldState = new Ternary<String, VirtualMachine.State, String>(null, VirtualMachine.State.Stopped, null);
}
-
+
if (s_logger.isTraceEnabled()) {
- s_logger.trace("VM " + vm + ": xen has state " + newState + " and we have state " + (oldState != null ? oldState.toString() : "null"));
+ s_logger.trace(vm + ": current state=" + currentState + ", previous state=" + oldState);
}
- if (vm.startsWith("migrating")) {
- s_logger.warn("Migrating from xen detected. Skipping");
- return;
- }
-
if (oldState.second() == VirtualMachine.State.Starting) {
- if (newState == VirtualMachine.State.Running) {
- s_logger.debug("12. The VM " + vm + " is in " + VirtualMachine.State.Running + " state");
- s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
- } else if (newState == VirtualMachine.State.Stopped) {
- s_logger.warn("Ignoring vm " + vm + " because of a lag in starting the vm.");
+ if (currentState == VirtualMachine.State.Running) {
+ updateMap = true;
+ reportChange = false;
+ } else if (currentState == VirtualMachine.State.Stopped) {
+ updateMap = false;
+ reportChange = false;
}
} else if (oldState.second() == VirtualMachine.State.Migrating) {
- if (newState == VirtualMachine.State.Running) {
- s_logger.debug("Detected that an migrating VM is now running: " + vm);
- s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
- }
+ updateMap = true;
+ reportChange = false;
} else if (oldState.second() == VirtualMachine.State.Stopping) {
- if (newState == VirtualMachine.State.Stopped) {
- s_logger.debug("13. The VM " + vm + " is in " + VirtualMachine.State.Stopped + " state");
- s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
- } else if (newState == VirtualMachine.State.Running) {
- s_logger.warn("Ignoring vm " + vm + " because of a lag in stopping the vm. ");
+ if (currentState == VirtualMachine.State.Stopped) {
+ updateMap = true;
+ reportChange = false;
+ } else if (currentState == VirtualMachine.State.Running) {
+ updateMap = false;
+ reportChange = false;
}
- } else if (oldState.second() != newState) {
- s_logger.debug("14. The VM " + vm + " is in " + newState + " state was " + oldState.second());
- s_vms.put(_cluster, hostUuid, vm, newState, xstoolsversion);
- if (newState == VirtualMachine.State.Stopped) {
- /*
- * if (s_vmsKilled.remove(vm)) { s_logger.debug("VM " + vm + " has been killed for storage. ");
- * newState = VirtualMachine.State.Error; }
- */
+ } else if (oldState.second() != currentState) {
+ updateMap = true;
+ reportChange = true;
+ } else if (hostUuid != null && !hostUuid.equals(oldState.first())) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Detecting " + vm + " moved from " + oldState.first() + " to " + hostUuid);
}
- changes.put(vm, new Ternary<String, VirtualMachine.State, String>(hostUuid, newState, null));
+ reportChange = true;
+ updateMap = true;
}
+
+ if (updateMap) {
+ s_vms.put(_cluster, hostUuid, vm, currentState);
+ if (s_logger.isTraceEnabled()) {
+ s_logger.trace("Updated " + vm + " to [" + hostUuid + ", " + currentState);
+ }
+ }
+ if (reportChange) {
+ Ternary<String, VirtualMachine.State, String> change = _changes.get(vm);
+ if (hostUuid == null) {
+ // This is really strange code. It looks like the sync
+ // code wants this to be set, which is extremely weird
+ // for VMs that are dead. Why would I want to set the
+ // hostUuid if the VM is stopped.
+ hostUuid = oldState.first();
+ if (hostUuid == null) {
+ hostUuid = _host.uuid;
+ }
+ }
+ if (change == null) {
+ change = new Ternary<String, VirtualMachine.State, String>(hostUuid, currentState, null);
+ } else {
+ change.first(hostUuid);
+ change.second(currentState);
+ }
+ _changes.put(vm, change);
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ if (_isMaster) {
+ // Throw away the initial set of events because they're history
+ Connection conn = getConnection();
+ Map<?, ?> results;
+ try {
+ results = Event.properFrom(conn, _classes, _token, new Double(30));
+ } catch (Exception e) {
+ s_logger.error("Retrying the waiting on VM events due to: ", e);
+ throw new CloudRuntimeException("Unable to start a listener thread to listen to VM events", e);
+ }
+ _token = (String)results.get("token");
+ s_logger.debug("Starting the event listener thread for " + _host.uuid);
+ super.start();
}
}
+ public boolean isListening() {
+ return _isMaster;
+ }
+
public HashMap<String, Ternary<String, VirtualMachine.State, String>> getChanges() {
synchronized(_cluster.intern()) {
- if (changes.size() == 0) {
+ if (_changes.size() == 0) {
return null;
}
- HashMap<String, Ternary<String, VirtualMachine.State, String>> diff = changes;
- changes = new HashMap<String, Ternary<String, VirtualMachine.State, String>>();
+ HashMap<String, Ternary<String, VirtualMachine.State, String>> diff = _changes;
+ _changes = new HashMap<String, Ternary<String, VirtualMachine.State, String>>();
return diff;
}
}