You are viewing a plain text version of this content. The canonical link for it is here.
Posted to tashi-commits@incubator.apache.org by st...@apache.org on 2012/02/08 04:44:11 UTC

svn commit: r1241773 [1/3] - in /incubator/tashi/branches/oldstable: ./ etc/ src/ src/tashi/ src/tashi/agents/ src/tashi/client/ src/tashi/clustermanager/ src/tashi/clustermanager/data/ src/tashi/nodemanager/ src/tashi/nodemanager/vmcontrol/ src/tashi/...

Author: stroucki
Date: Wed Feb  8 04:44:09 2012
New Revision: 1241773

URL: http://svn.apache.org/viewvc?rev=1241773&view=rev
Log:
merge oldstable from stablefix (201111)

Added:
    incubator/tashi/branches/oldstable/src/tashi/agents/primitive_zoni.py
      - copied unchanged from r1203835, incubator/tashi/trunk/src/tashi/agents/primitive_zoni.py
    incubator/tashi/branches/oldstable/src/zoni/data/reservation.py
      - copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/data/reservation.py
    incubator/tashi/branches/oldstable/src/zoni/data/reservationmanagementinterface.py
      - copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/data/reservationmanagementinterface.py
    incubator/tashi/branches/oldstable/src/zoni/hardware/f10s50switch.py
      - copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/hardware/f10s50switch.py
    incubator/tashi/branches/oldstable/src/zoni/hardware/hpilo.py
      - copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/hardware/hpilo.py
    incubator/tashi/branches/oldstable/src/zoni/hardware/systemmanagement.py
      - copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/hardware/systemmanagement.py
    incubator/tashi/branches/oldstable/src/zoni/install/dnsdhcp/
      - copied from r1203835, incubator/tashi/trunk/src/zoni/install/dnsdhcp/
    incubator/tashi/branches/oldstable/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
      - copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/install/dnsdhcp/zoniDnsDhcpSetup.py
    incubator/tashi/branches/oldstable/src/zoni/services/
      - copied from r1203835, incubator/tashi/trunk/src/zoni/services/
    incubator/tashi/branches/oldstable/src/zoni/services/__init__.py
      - copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/services/__init__.py
    incubator/tashi/branches/oldstable/src/zoni/services/pcvciservice.py
      - copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/services/pcvciservice.py
    incubator/tashi/branches/oldstable/src/zoni/services/rpycservices.py
      - copied, changed from r1203835, incubator/tashi/trunk/src/zoni/services/rpycservices.py
    incubator/tashi/branches/oldstable/src/zoni/services/zonimanager.py
      - copied unchanged from r1203835, incubator/tashi/trunk/src/zoni/services/zonimanager.py
Modified:
    incubator/tashi/branches/oldstable/   (props changed)
    incubator/tashi/branches/oldstable/Makefile   (contents, props changed)
    incubator/tashi/branches/oldstable/NOTICE
    incubator/tashi/branches/oldstable/etc/   (props changed)
    incubator/tashi/branches/oldstable/etc/NodeManager.cfg
    incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg
    incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg   (contents, props changed)
    incubator/tashi/branches/oldstable/src/   (props changed)
    incubator/tashi/branches/oldstable/src/tashi/agents/dhcpdns.py
    incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py
    incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py
    incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py
    incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py
    incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py
    incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py
    incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
    incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py
    incubator/tashi/branches/oldstable/src/tashi/rpycservices/rpycservices.py
    incubator/tashi/branches/oldstable/src/tashi/rpycservices/rpyctypes.py
    incubator/tashi/branches/oldstable/src/tashi/util.py
    incubator/tashi/branches/oldstable/src/tashi/version.py
    incubator/tashi/branches/oldstable/src/zoni/Makefile   (props changed)
    incubator/tashi/branches/oldstable/src/zoni/__init__.py   (props changed)
    incubator/tashi/branches/oldstable/src/zoni/agents/dhcpdns.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/bootstrap/bootstrapinterface.py   (props changed)
    incubator/tashi/branches/oldstable/src/zoni/bootstrap/pxe.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/client/zoni-cli.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/data/resourcequerysql.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/data/usermanagement.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/data/usermanagementinterface.py   (props changed)
    incubator/tashi/branches/oldstable/src/zoni/extra/util.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/hardware/delldrac.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/hardware/dellswitch.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/hardware/hpswitch.py   (props changed)
    incubator/tashi/branches/oldstable/src/zoni/hardware/hwswitchinterface.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/hardware/ipmi.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/hardware/raritanpdu.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/hardware/systemmanagementinterface.py   (contents, props changed)
    incubator/tashi/branches/oldstable/src/zoni/install/db/zoniDbSetup.py
    incubator/tashi/branches/oldstable/src/zoni/install/pxe/base-menu
    incubator/tashi/branches/oldstable/src/zoni/install/pxe/zoniPxeSetup.py
    incubator/tashi/branches/oldstable/src/zoni/install/www/zoniWebSetup.py
    incubator/tashi/branches/oldstable/src/zoni/system/registration/register/register_automate
    incubator/tashi/branches/oldstable/src/zoni/system/registration/register/register_node
    incubator/tashi/branches/oldstable/src/zoni/system/registration/www/include/zoni_functions.php
    incubator/tashi/branches/oldstable/src/zoni/system/registration/www/zoni-register.php
    incubator/tashi/branches/oldstable/src/zoni/version.py   (contents, props changed)

Propchange: incubator/tashi/branches/oldstable/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Wed Feb  8 04:44:09 2012
@@ -0,0 +1,4 @@
+/incubator/tashi/branches/cmu:1178106-1187632
+/incubator/tashi/branches/stablefix:1203848-1241771
+/incubator/tashi/branches/zoni-dev/trunk:1034098-1177646
+/incubator/tashi/trunk:1081943-1203835,1203845

Modified: incubator/tashi/branches/oldstable/Makefile
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/Makefile?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/Makefile (original)
+++ incubator/tashi/branches/oldstable/Makefile Wed Feb  8 04:44:09 2012
@@ -110,8 +110,12 @@ rmdoc:
 bin/zoni-cli.py:
 	@echo Symlinking in zoni-cli...
 	(cd bin; ln -s ../src/zoni/client/zoni-cli.py .)
+usr/local/bin/zoni:
+	@echo Creating /usr/local/bin/zoni
+	(echo '#!/bin/bash\nPYTHONPATH=$(shell pwd)/src $(shell pwd)/bin/zoni-cli.py $$*' > /usr/local/bin/zoni; chmod 755 /usr/local/bin/zoni)
 rmzoni-cli:
 	if test -e bin/zoni-cli.py; then echo Removing zoni-cli symlink...; rm bin/zoni-cli.py; fi
+	if test -e /usr/local/bin/zoni; then echo Removing zoni...; rm /usr/local/bin/zoni; fi
 
 ## for now only print warnings having to do with bad indentation. pylint doesn't make it easy to enable only 1,2 checks
 disabled_warnings=$(shell pylint --list-msgs|grep :W0| awk -F: '{ORS=","; if ($$2 != "W0311" && $$2 != "W0312"){ print $$2}}')

Propchange: incubator/tashi/branches/oldstable/Makefile
            ('svn:mergeinfo' removed)

Modified: incubator/tashi/branches/oldstable/NOTICE
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/NOTICE?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/NOTICE (original)
+++ incubator/tashi/branches/oldstable/NOTICE Wed Feb  8 04:44:09 2012
@@ -1,5 +1,5 @@
 Apache Tashi
-Copyright 2008 The Apache Software Foundation
+Copyright 2008-2011 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).

Propchange: incubator/tashi/branches/oldstable/etc/
            ('svn:mergeinfo' removed)

Modified: incubator/tashi/branches/oldstable/etc/NodeManager.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/etc/NodeManager.cfg?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/etc/NodeManager.cfg (original)
+++ incubator/tashi/branches/oldstable/etc/NodeManager.cfg Wed Feb  8 04:44:09 2012
@@ -73,6 +73,7 @@ defaultBridgeFormat=br%s
 [NodeManagerService]
 convertExceptions = True
 port = 9883
+registerHost = False
 registerFrequency = 10.0
 infoFile = /var/tmp/nm.dat
 clusterManagerHost = localhost ; Clustermanager hostname

Modified: incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg (original)
+++ incubator/tashi/branches/oldstable/etc/TashiDefaults.cfg Wed Feb  8 04:44:09 2012
@@ -45,7 +45,8 @@ publisher = tashi.messaging.GangliaPubli
 nodeManagerPort = 9883
 
 [ClusterManagerService]
-host = localhost ; Clustermanager hostname
+# Clustermanager hostname
+host = localhost 
 convertExceptions = True
 port = 9882
 expireHostTime = 30.0
@@ -97,9 +98,11 @@ publisher = tashi.messaging.GangliaPubli
 [NodeManagerService]
 convertExceptions = True
 port = 9883
+registerHost = False
 registerFrequency = 10.0
 infoFile = /var/tmp/nm.dat
-clusterManagerHost = localhost ;  Clustermanger hostname
+# Clustermanger hostname
+clusterManagerHost = localhost 
 clusterManagerPort = 9882
 statsInterval = 0.0
 ;bind = 0.0.0.0 ; not supported (Thrift is missing support to specify what to bind to!)
@@ -115,6 +118,8 @@ migrateTimeout = 300.0
 maxParallelMigrations = 10
 useMigrateArgument = False
 statsInterval = 0.0
+scratchDir = /tmp
+scratchVg = vgscratch
 
 [XenPV]
 vmNamePrefix = tashi
@@ -135,7 +140,8 @@ staticLayout = /location/of/layout/file
 
 # Client configuration
 [Client]
-clusterManagerHost = localhost ; Clustermanager hostname
+# Clustermanager hostname
+clusterManagerHost = localhost 
 clusterManagerPort = 9882
 clusterManagerTimeout = 5.0
 
@@ -157,17 +163,21 @@ defaultJobTime = 8640000000
 
 [DhcpDns]
 dnsEnabled = True
-dnsKeyFile = /location/of/private/key/for/dns
+dnsSecretKey = ABcdEf12GhIJKLmnOpQrsT==
+dnsKeyName = name_of_dns_key_hostname
 dnsServer = 1.2.3.4 53
 dnsDomain = tashi.example.com
 dnsExpire = 300
 dhcpEnabled = True
 dhcpServer = 1.2.3.4
+# Host key name
 dhcpKeyName = OMAPI
 dhcpSecretKey = ABcdEf12GhIJKLmnOpQrsT==
+#  ipRangeX - where X is the vlan number 
 ipRange1 = 172.16.128.2-172.16.255.254
 reverseDns = True
-clustermanagerhost = localhost ; Clustermanager hostname
+# Clustermanager hostname
+clustermanagerhost = localhost
 clustermanagerport = 9886
 
 [GangliaPublisher]

Modified: incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg (original)
+++ incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg Wed Feb  8 04:44:09 2012
@@ -26,6 +26,11 @@ INSTALL_BASE_DIR = /usr/local/tashi
 [logging]
 LOG_FILE = /var/tmp/zoni_logfile.txt
 
+[management]
+#  Specify data store
+INFO_STORE = sql
+USER_MANAGEMENT = ldap
+
 #  DB host
 [dbConnection]
 DB_HOST = xx_hostname_or_ip_
@@ -50,8 +55,9 @@ TFTP_UPDATE_FILE = /var/lib/tftpboot/pxe
 TFTP_BASE_FILE = /var/lib/tftpboot/pxelinux.cfg/base.zoni
 TFTP_BASE_MENU_FILE = /var/lib/tftpboot/pxelinux.cfg/base-menu
 PXE_SERVER_IP = IP_OF_PXE_SERVER_IN_DOMAIN_0
-#  Relative to TFTP_ROOT_DIR
-INITRD_ROOT = initrd
+#  Must be relative to TFTP_ROOT_DIR
+INITRD_ROOT = builds/initrd
+KERNEL_ROOT = builds/kernel
 
 [www]
 WWW_DOCUMENT_ROOT = /var/www
@@ -71,7 +77,8 @@ ZONI_IPMI_NETWORK = 10.10.16.0/20
 VLAN_MAX = 4095
 
 [hardware]
-HARDWARE_CONTROL = ["ipmi", "drac", "pdu"]
+#  Hardware control available for the cluster 
+HARDWARE_CONTROL = {"ipmi": {"class":"zoni.hardware.ipmi.Ipmi"}, "drac": {"class":"zoni.hardware.delldrac.dellDrac"}, "pdu":{"class": "zoni.hardware.raritanpdu.raritanDominionPx"}, "dellswitch" : {"class":"zoni.hardware.dellswitch.HwDellSwitch", "accessmode" : "ssh"}}
 HARDWARE_PDU = "raritan"
 HARDWARE_DRAC = "DELL DRAC"
 
@@ -79,7 +86,9 @@ HARDWARE_DRAC = "DELL DRAC"
 dnsEnabled = True
 reverseDns = True
 #  Key file must be in the same directory or this will get denied
-dnsKeyFile = xx_Kname.+157+36480.private_xx
+#dnsKeyFile = xx_Kname.+157+36480.private_xx
+dnsKeyName = xx_dnsKeyName__xx
+dnsSecretKey = xx_secretkey_xx
 dnsServer = xx_dns_server_ip_xx xx_port_xx
 dnsDomain = xx_fqdn_xx
 dnsExpire = 60

Propchange: incubator/tashi/branches/oldstable/etc/ZoniDefaults.cfg
            ('svn:mergeinfo' removed)

Propchange: incubator/tashi/branches/oldstable/src/
            ('svn:mergeinfo' removed)

Modified: incubator/tashi/branches/oldstable/src/tashi/agents/dhcpdns.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/agents/dhcpdns.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/agents/dhcpdns.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/agents/dhcpdns.py Wed Feb  8 04:44:09 2012
@@ -28,7 +28,8 @@ from tashi import boolean
 class DhcpDns(InstanceHook):
 	def __init__(self, config, client, post=False):
 		InstanceHook.__init__(self, config, client, post)
-		self.dnsKeyFile = self.config.get('DhcpDns', 'dnsKeyFile')
+		self.dnsKeyName = self.config.get('DhcpDns', 'dnsKeyName')
+		self.dnsSecretKey = self.config.get('DhcpDns', 'dnsSecretKey')
 		self.dnsServer = self.config.get('DhcpDns', 'dnsServer')
 		self.dnsDomain = self.config.get('DhcpDns', 'dnsDomain')
 		self.dnsExpire = int(self.config.get('DhcpDns', 'dnsExpire'))
@@ -153,14 +154,12 @@ class DhcpDns(InstanceHook):
 			self.removeDns(name)
 		except:
 			pass
-		if (self.dnsKeyFile != ""):
-			cmd = "nsupdate -k %s" % (self.dnsKeyFile)
-		else:
-			cmd = "nsupdate"
+		cmd = "nsupdate"
 		child = subprocess.Popen(args=cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
 		try:
 			(stdin, stdout) = (child.stdin, child.stdout)
 			stdin.write("server %s\n" % (self.dnsServer))
+			stdin.write("key %s %s\n" % (self.dnsKeyName, self.dnsSecretKey))
 			stdin.write("update add %s.%s %d A %s\n" % (name, self.dnsDomain, self.dnsExpire, ip))
 			stdin.write("\n")
 			if (self.reverseDns):
@@ -181,14 +180,12 @@ class DhcpDns(InstanceHook):
 				(pid, status) = os.waitpid(child.pid, os.WNOHANG)
 	
 	def removeDns(self, name):
-		if (self.dnsKeyFile != ""):
-			cmd = "nsupdate -k %s" % (self.dnsKeyFile)
-		else:
-			cmd = "nsupdate"
+		cmd = "nsupdate"
 		child = subprocess.Popen(args=cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
 		try:
 			(stdin, stdout) = (child.stdin, child.stdout)
 			stdin.write("server %s\n" % (self.dnsServer))
+			stdin.write("key %s %s\n" % (self.dnsKeyName, self.dnsSecretKey))
 			if (self.reverseDns):
 				ip = socket.gethostbyname(name)
 				ipSegments = map(int, ip.split("."))

Modified: incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/agents/primitive.py Wed Feb  8 04:44:09 2012
@@ -1,4 +1,4 @@
-#! /usr/bin/env python
+#!/usr/bin/python
 
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -23,6 +23,7 @@ import socket
 import sys
 import threading
 import time
+import random
 import logging.config
 
 from tashi.rpycservices.rpyctypes import *
@@ -30,9 +31,9 @@ from tashi.util import getConfig, create
 import tashi
 
 class Primitive(object):
-	def __init__(self, config, client):
+	def __init__(self, config, cmclient):
 		self.config = config
-		self.client = client
+		self.cm = cmclient
 		self.hooks = []
 		self.log = logging.getLogger(__file__)
 		self.scheduleDelay = float(self.config.get("Primitive", "scheduleDelay"))
@@ -44,108 +45,252 @@ class Primitive(object):
 			name = name.lower()
 			if (name.startswith("hook")):
 				try:
-					self.hooks.append(instantiateImplementation(value, config, client, False))
+					self.hooks.append(instantiateImplementation(value, config, cmclient, False))
 				except:
 					self.log.exception("Failed to load hook %s" % (value))
+	        self.hosts = {}
+		self.load = {}
+		self.instances = {}
+		self.muffle = {}
+		self.lastScheduledHost = 0
+		self.clearHints = {}
+					
+					
+	def __getState(self):
+		# Generate a list of hosts and
+		# current loading of VMs per host
+		hosts = {}
+		# load's keys are the host id, or None if not on a host. values are instance ids
+		load = {}
+		ctr = 0
+
+		for h in self.cm.getHosts():
+			#XXXstroucki get all hosts here?
+			#if (h.up == True and h.state == HostState.Normal):
+				hosts[ctr] = h
+				ctr = ctr + 1
+				load[h.id] = []
+			
+		load[None] = []
+		_instances = self.cm.getInstances()
+		instances = {}
+		for i in _instances:
+			instances[i.id] = i
+			
+		# XXXstroucki put held machines behind pending ones
+		heldInstances = []
+		for i in instances.itervalues():
+			if (i.hostId or i.state == InstanceState.Pending):
+				# Nonrunning VMs will have hostId of None
+				load[i.hostId] = load[i.hostId] + [i.id]
+			elif (i.hostId is None and i.state == InstanceState.Held):
+				heldInstances = heldInstances + [i.id]
+
+		load[None] = load[None] + heldInstances
+
+		self.hosts = hosts
+		self.load = load
+		self.instances = instances
+
+	def __checkCapacity(self, host, inst):
+		# ensure host can carry new load
+		memUsage = reduce(lambda x, y: x + self.instances[y].memory, self.load[host.id], inst.memory)
+		coreUsage = reduce(lambda x, y: x + self.instances[y].cores, self.load[host.id], inst.cores)
+		if (memUsage <= host.memory and coreUsage <= host.cores):
+			return True
+		
+		return False
+
+	def __clearHints(self, hint, name):
+		#  remove the clearHint if the host comes back to normal mode
+		if name in self.clearHints[hint]:
+			popit = self.clearHints[hint].index(name)
+			self.clearHints[hint].pop(popit)
 	
+	def __scheduleInstance(self, inst):
+
+		try:
+
+			minMax = None
+			minMaxHost = None
+			minMaxCtr = None
+
+			densePack = inst.hints.get("densePack", None)
+			if (densePack is None):
+				densePack = self.densePack
+			else:
+				densePack = boolean(densePack)
+			
+			#  Grab the targetHost config options if passed
+			targetHost = inst.hints.get("targetHost", None)
+			#  Check to see if we have already handled this hint
+			clearHints = self.clearHints
+			clearHints["targetHost"] = clearHints.get("targetHost", [])
+			#  If we handled the hint, don't look at it anymore
+			if targetHost in clearHints["targetHost"]:
+				targetHost = None
+
+			try:
+				allowElsewhere = boolean(inst.hints.get("allowElsewhere", "False"))
+			except Exception, e:
+				allowElsewhere = False
+			# has a host preference been expressed?
+			if (targetHost != None):
+				for h in self.hosts.values():
+					if (h.state == HostState.Normal):
+						self.__clearHints("targetHost", h.name)
+					# if this is not the host we are looking for, continue
+					if ((str(h.id) != targetHost and h.name != targetHost)):
+						continue
+					# we found the targetHost
+					#  If a host machine is reserved, only allow if userid is in reserved list
+					if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
+						# Machine is reserved and not available for userId.
+						# XXXstroucki: Should we log something here for analysis?
+						break
+					if self.__checkCapacity(h, inst):
+						minMax = len(self.load[h.id])
+						minMaxHost = h
+
+		
+			# end targethost != none
+		
+		
+			# If we don't have a host yet, find one here
+			if ((targetHost == None or allowElsewhere) and minMaxHost == None):
+				# cycle list
+				#  Adding this to catch if this gets set to None.  Fix
+				if self.lastScheduledHost == None:
+					self.lastScheduledHost = 0
+				for ctr in range(self.lastScheduledHost, len(self.hosts)) + range(0, self.lastScheduledHost):
+					h = self.hosts[ctr]
+
+					# XXXstroucki if it's down, find another machine
+					if (h.up == False):
+						continue
+
+					#  If the host not in normal operating state, 
+					#  find another machine
+					if (h.state != HostState.Normal):
+						continue
+					else:
+						#  If the host is back to normal, get rid of the entry in clearHints
+						self.__clearHints("targetHost", h.name)
+		
+					# if it's reserved, see if we can use it
+					if ((len(h.reserved) > 0) and inst.userId not in h.reserved):
+						# reserved for somebody else, so find another machine
+						continue
+		
+					# implement dense packing policy:
+					# consider this host if
+					# minMax has not been modified  or
+					# the number of vms here is greater than minmax if we're dense packing or
+					# the number of vms here is less than minmax if we're not dense packing
+					if (minMax is None or (densePack and len(self.load[h.id]) > minMax) or (not densePack and len(self.load[h.id]) < minMax)):
+						if self.__checkCapacity(h, inst):
+							minMax = len(self.load[h.id])
+							minMaxHost = h
+							minMaxCtr = ctr
+
+					#  check that VM image isn't mounted persistent already
+					#  Should set a status code to alert user
+					#  Tried to update the state of the instance and set persistent=False but 
+					#  couldn't do it, should work until we find a better way to do this
+					if inst.disks[0].persistent == True:
+						count = 0
+						myDisk = inst.disks[0].uri
+						for i in self.cm.getInstances():
+							if myDisk == i.disks[0].uri and i.disks[0].persistent == True:
+								count += 1
+						if count > 1:
+								minMaxHost = None
+
+			if (minMaxHost):
+				# found a host
+				if (not inst.hints.get("__resume_source", None)):
+					# only run preCreate hooks if newly starting
+					for hook in self.hooks:
+						hook.preCreate(inst)
+				self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))	
+				rv = "fail"
+				try:
+					rv = self.cm.activateVm(inst.id, minMaxHost)
+					if rv == "success":
+						self.lastScheduledHost = minMaxCtr
+						self.load[minMaxHost.id] = self.load[minMaxHost.id] + [inst.id]
+						# get rid of its possible entry in muffle if VM is scheduled to a host
+						if (inst.name in self.muffle):
+							self.muffle.pop(inst.name)
+					else:
+						self.log.warning("Instance %s failed to activate on host %s" % (inst.name, minMaxHost.name))
+				except TashiException, e :
+					#  If we try to activate the VM and get errno 10, host not in normal mode, add it to the list
+					#  check for other errors later
+					if e.errno == Errors.HostStateError:
+						self.clearHints["targetHost"] = self.clearHints.get("targetHost", [])
+						self.clearHints["targetHost"].append(targetHost)
+
+			else:
+				# did not find a host
+				if (inst.name not in self.muffle):
+					self.log.info("Failed to find a suitable place to schedule %s" % (inst.name))
+					self.muffle[inst.name] = True
+						
+		except Exception, e:
+			# XXXstroucki: how can we get here?
+			if (inst.name not in self.muffle):
+				self.log.exception("Failed to schedule or activate %s" % (inst.name))
+				self.muffle[inst.name] = True
+									
 	def start(self):
 		oldInstances = {}
-		muffle = {}
+
 		while True:
 			try:
-				# Generate a list of VMs/host
-				hosts = {}
-				load = {}
-				for h in self.client.getHosts():
-					hosts[h.id] = h
-					load[h.id] = []
-				load[None] = []
-				_instances = self.client.getInstances()
-				instances = {}
-				for i in _instances:
-					instances[i.id] = i
-				for i in instances.itervalues():
-					if (i.hostId or i.state == InstanceState.Pending):
-						load[i.hostId] = load[i.hostId] + [i.id]
-				# Check for VMs that have exited
+				self.__getState()
+				
+				# Check for VMs that have exited and call
+				# postDestroy hook
 				for i in oldInstances:
-					if (i not in instances and oldInstances[i].state != InstanceState.Pending):
+					# XXXstroucki what about paused and saved VMs?
+					# XXXstroucki: do we need to look at Held VMs here?
+					if (i not in self.instances and (oldInstances[i].state == InstanceState.Running or oldInstances[i].state == InstanceState.Destroying)):
+						self.log.info("VM exited: %s" % (oldInstances[i].name))
 						for hook in self.hooks:
 							hook.postDestroy(oldInstances[i])
-				# Schedule new VMs
-				oldInstances = instances
-				if (len(load.get(None, [])) > 0):
-					load[None].sort()
-					for i in load[None]:
-						inst = instances[i]
-						try:
-							minMax = None
-							minMaxHost = None
-							targetHost = inst.hints.get("targetHost", None)
-							try:
-								allowElsewhere = boolean(inst.hints.get("allowElsewhere", "False"))
-							except Exception, e:
-								allowElsewhere = False
-							if (targetHost != None):
-								for h in hosts.values():
-									if ((str(h.id) == targetHost or h.name == targetHost)):
-										#  make sure that host is up, in a normal state and is not reserved
-										if (h.up == True and h.state == HostState.Normal and len(h.reserved) == 0):
-											memUsage = reduce(lambda x, y: x + instances[y].memory, load[h.id], inst.memory)
-											coreUsage = reduce(lambda x, y: x + instances[y].cores, load[h.id], inst.cores)
-											if (memUsage <= h.memory and coreUsage <= h.cores):
-												minMax = len(load[h.id])
-												minMaxHost = h
-								
-										#  If a host machine is reserved, only allow if userid is in reserved list
-										if ((len(h.reserved) > 0) and inst.userId in h.reserved):
-											memUsage = reduce(lambda x, y: x + instances[y].memory, load[h.id], inst.memory)
-											coreUsage = reduce(lambda x, y: x + instances[y].cores, load[h.id], inst.cores)
-											if (memUsage <= h.memory and coreUsage <= h.cores):
-												minMax = len(load[h.id])
-												minMaxHost = h
-
-
-							if ((targetHost == None or allowElsewhere) and minMaxHost == None):
-								for h in hosts.values():
-									if (h.up == True and h.state == HostState.Normal and len(h.reserved) == 0):
-										if (minMax is None or (self.densePack and len(load[h.id]) > minMax) or (not self.densePack and len(load[h.id]) < minMax)):
-											memUsage = reduce(lambda x, y: x + instances[y].memory, load[h.id], inst.memory)
-											coreUsage = reduce(lambda x, y: x + instances[y].cores, load[h.id], inst.cores)
-											if (memUsage <= h.memory and coreUsage <= h.cores):
-												minMax = len(load[h.id])
-												minMaxHost = h
-							if (minMaxHost):
-								if (not inst.hints.get("__resume_source", None)):
-									for hook in self.hooks:
-										hook.preCreate(inst)
-								self.log.info("Scheduling instance %s (%d mem, %d cores, %d uid) on host %s" % (inst.name, inst.memory, inst.cores, inst.userId, minMaxHost.name))	
-								self.client.activateVm(i, minMaxHost)
-								load[minMaxHost.id] = load[minMaxHost.id] + [i]
-								muffle.clear()
-							else:
-								if (inst.name not in muffle):
-									self.log.info("Failed to find a suitable place to schedule %s" % (inst.name))
-									muffle[inst.name] = True
-						except Exception, e:
-							if (inst.name not in muffle):
-								self.log.exception("Failed to schedule or activate %s" % (inst.name))
-								muffle[inst.name] = True
-				time.sleep(self.scheduleDelay)
+
+				oldInstances = self.instances
+
+
+				if (len(self.load.get(None, [])) > 0):
+					# Schedule VMs if they are waiting
+
+					# sort by id number (FIFO?)
+					self.load[None].sort()
+					for i in self.load[None]:
+						inst = self.instances[i]
+						self.__scheduleInstance(inst)
+					# end for unassigned vms
+
+
 			except TashiException, e:
 				self.log.exception("Tashi exception")
-				time.sleep(self.scheduleDelay)
+
 			except Exception, e:
-				self.log.exception("General exception")
-				time.sleep(self.scheduleDelay)
+				self.log.warning("Scheduler iteration failed")
+
+
+			# wait to do the next iteration
+			time.sleep(self.scheduleDelay)
 
 def main():
 	(config, configFiles) = getConfig(["Agent"])
 	publisher = instantiateImplementation(config.get("Agent", "publisher"), config)
 	tashi.publisher = publisher
-	client = createClient(config)
+	cmclient = createClient(config)
 	logging.config.fileConfig(configFiles)
-	agent = Primitive(config, client)
+	agent = Primitive(config, cmclient)
 	agent.start()
 
 if __name__ == "__main__":

Modified: incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/client/tashi-client.py Wed Feb  8 04:44:09 2012
@@ -157,8 +157,26 @@ def getVmLayout():
 			hosts[i.hostId].usedCores += i.cores
 	return hosts.values()
 
+def getSlots(cores, memory):
+	hosts = getVmLayout()
+	count = 0
+
+	for h in hosts:
+		if h.up is False or h.state != HostState.Normal:
+			continue
+		countbycores = int((h.cores - h.usedCores) / cores)
+		countbymemory = int((h.memory - h.usedMemory) / memory)
+		count += min(countbycores, countbymemory)
+
+	print "%d" % (count),
+	print (lambda:"instances", lambda:"instance")[count == 1](),
+	print "with %d" % (cores),
+	print (lambda:"cores", lambda:"core")[cores == 1](),
+	print "and %d MB memory could be created." % (memory)
+	
 def createMany(instance, count):
-	l = len(str(count))
+	# will create instances from 0 to count-1
+	l = len(str(count - 1))
 	basename = instance.name
 	instances = []
 	for i in range(0, count):
@@ -190,6 +208,9 @@ def getMyInstances():
 
 # Used to define default views on functions and to provide extra functionality (getVmLayout)
 extraViews = {
+'getSlots': (getSlots, None),
+'getImages': (None, ['id', 'imageName', 'imageSize']), 
+'copyImage': (None, None), 
 'createMany': (createMany, ['id', 'hostId', 'name', 'user', 'state', 'disk', 'memory', 'cores']),
 'destroyMany': (destroyMany, None),
 'getVmLayout': (getVmLayout, ['id', 'name', 'state', 'instances', 'usedMemory', 'memory', 'usedCores', 'cores']),
@@ -209,6 +230,9 @@ argLists = {
 'migrateVm': [('instance', checkIid, lambda: requiredArg('instance'), True), ('targetHostId', int, lambda: requiredArg('targetHostId'), True)],
 'pauseVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
 'unpauseVm': [('instance', checkIid, lambda: requiredArg('instance'), True)],
+'getSlots': [('cores', int, lambda: 1, False), ('memory', int, lambda: 128, False)],
+'getImages': [],
+'copyImage': [('src', str, lambda: requiredArg('src'),True), ('dst', str, lambda: requiredArg('dst'), True)],
 'getHosts': [],
 'getUsers': [],
 'getNetworks': [],
@@ -219,7 +243,7 @@ argLists = {
 'unregisterHost': [('hostId', int, lambda: requiredArg('hostId'), True)],
 }
 
-# Used to convert the dictionary built from the arguments into an object that can be used by thrift
+# Used to convert the dictionary built from the arguments into an object that can be used by rpyc
 convertArgs = {
 'createVm': '[Instance(d={"userId":userId,"name":name,"cores":cores,"memory":memory,"disks":disks,"nics":nics,"hints":hints})]',
 'createMany': '[Instance(d={"userId":userId,"name":basename,"cores":cores,"memory":memory,"disks":disks,"nics":nics,"hints":hints}), count]',
@@ -233,6 +257,8 @@ convertArgs = {
 'unpauseVm': '[instance]',
 'vmmSpecificCall': '[instance, arg]',
 'unregisterHost' : '[hostId]',
+'getSlots' : '[cores, memory]',
+'copyImage' : '[src, dst]',
 }
 
 # Descriptions
@@ -247,14 +273,17 @@ description = {
 'migrateVm': 'Live-migrates a VM to a different host',
 'pauseVm': 'Pauses a running VM',
 'unpauseVm': 'Unpauses a paused VM',
+'getSlots': 'Get a count of how many VMs could be started in the cluster',
 'getHosts': 'Gets a list of hosts running Node Managers',
 'getUsers': 'Gets a list of users',
 'getNetworks': 'Gets a list of available networks for VMs to be placed on',
-'getInstances': 'Gets a list of all VMs in Tashi',
+'getInstances': 'Gets a list of all VMs in the cluster',
 'getMyInstances': 'Utility function that only lists VMs owned by the current user',
 'getVmLayout': 'Utility function that displays what VMs are placed on what hosts',
-'vmmSpecificCall': 'Direct access to VMM-specific functionality',
+'vmmSpecificCall': 'Direct access to VM manager specific functionality',
 'unregisterHost' : 'Unregisters host. Registration happens when starting node manager',
+'getImages' : 'Gets a list of available VM images',
+'copyImage' : 'Copies a VM image',
 }
 
 # Example use strings
@@ -266,15 +295,18 @@ examples = {
 'destroyMany': ['--basename foobar'],
 'suspendVm': ['--instance 12345', '--instance foobar'],
 'resumeVm': ['--instance 12345', '--instance foobar'],
-'migrateVm': ['--instanc 12345 --targetHostId 73', '--instance foobar --targetHostId 73'],
+'migrateVm': ['--instance 12345 --targetHostId 73', '--instance foobar --targetHostId 73'],
 'pauseVm': ['--instance 12345', '--instance foobar'],
-'unpauseVm': ['---instance 12345', '--instance foobar'],
+'unpauseVm': ['--instance 12345', '--instance foobar'],
+'getSlots': ['--cores 1 --memory 128'],
 'getHosts': [''],
 'getUsers': [''],
 'getNetworks': [''],
 'getInstances': [''],
 'getMyInstances': [''],
 'getVmLayout': [''],
+'getImages': [''],
+'copyImage': ['--src src.qcow2 --dst dst.qcow2'],
 'vmmSpecificCall': ['--instance 12345 --arg startVnc', '--instance foobar --arg stopVnc'],
 'unregisterHost' : ['--hostId 2'],
 }
@@ -359,7 +391,6 @@ def makeTable(list, keys=None):
 		stdout = os.popen("stty size")
 		r = stdout.read()
 		stdout.close()
-		(consoleHeight, consoleWidth) = map(lambda x: int(x.strip()), r.split())
 	except:
 		pass
 	for obj in list:
@@ -480,30 +511,57 @@ def main():
 		usage()
 	function = matchFunction(sys.argv[1])
 	(config, configFiles) = getConfig(["Client"])
-	possibleArgs = argLists[function]
+
+	# build a structure of possible arguments
+	possibleArgs = {}
+	argList = argLists[function]
+	for i in range(0, len(argList)):
+		possibleArgs[argList[i][0]]=argList[i]
+
 	args = sys.argv[2:]
-	for arg in args:
-		if (arg == "--help" or arg == "--examples"):
-			usage(function)
+
+	vals = {}
+
 	try:
-		vals = {}
+		# create client handle
 		client = createClient(config)
-		for parg in possibleArgs:
+
+		# set defaults
+		for parg in possibleArgs.values():
 			(parg, conv, default, required) = parg
-			val = None
-			for i in range(0, len(args)):
-				arg = args[i]
-				if (arg.startswith("--") and arg[2:] == parg):
-					val = conv(args[i+1])
-			if (val == None):
-				val = default()
-			vals[parg] = val
-		for arg in args:
+			if (required is False):
+				vals[parg] = default()
+
+		while (len(args) > 0):
+			arg = args.pop(0)
+
+			if (arg == "--help" or arg == "--examples"):
+				usage(function)
+				# this exits
+
 			if (arg.startswith("--hide-")):
 				show_hide.append((False, arg[7:]))
+				continue
+
 			if (arg.startswith("--show-")):
 				show_hide.append((True, arg[7:]))
+				continue
+
+			if (arg.startswith("--")):
+				if (arg[2:] in possibleArgs):
+					(parg, conv, default, required) = possibleArgs[arg[2:]]
+					val = conv(args.pop(0))
+					if (val == None):
+						val = default()
+
+					vals[parg] = val
+					continue
+
+			raise ValueError("Unknown argument %s" % (arg)) 
+
+		
 		f = getattr(client, function, None)
+
 		if (f is None):
 			f = extraViews[function][0]
 		if (function in convertArgs):
@@ -526,8 +584,8 @@ def main():
 		print "TashiException:"
 		print e.msg
 		exitCode = e.errno
-	except Exception, e:
-		print e
+# 	except Exception, e:
+# 		print e
 		# XXXstroucki: exception may be unrelated to usage of function
 		# so don't print usage on exception as if there were a problem
 		# with the arguments

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanager.py Wed Feb  8 04:44:09 2012
@@ -30,7 +30,9 @@ import tashi
 
 from tashi.rpycservices import rpycservices
 from rpyc.utils.server import ThreadedServer
-from rpyc.utils.authenticators import VdbAuthenticator
+from rpyc.utils.authenticators import TlsliteVdbAuthenticator
+
+log = None
 
 def startClusterManager(config):
 	global service, data
@@ -47,11 +49,11 @@ def startClusterManager(config):
 				users[user.name] = user.passwd
 		users[config.get('AllowedUsers', 'nodeManagerUser')] = config.get('AllowedUsers', 'nodeManagerPassword')
 		users[config.get('AllowedUsers', 'agentUser')] = config.get('AllowedUsers', 'agentPassword')
-		authenticator = VdbAuthenticator.from_dict(users)
+		authenticator = TlsliteVdbAuthenticator.from_dict(users)
 		t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False, authenticator=authenticator)
 	else:
 		t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('ClusterManagerService', 'port')), auto_register=False)
-	t.logger.quiet = True
+	t.logger.setLevel(logging.ERROR)
 	t.service.service = service
 	t.service._type = 'ClusterManagerService'
 
@@ -64,6 +66,8 @@ def startClusterManager(config):
 
 @signalHandler(signal.SIGTERM)
 def handleSIGTERM(signalNumber, stackFrame):
+	global log
+
 	log.info('Exiting cluster manager after receiving a SIGINT signal')
 	sys.exit(0)
 	

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/clustermanagerservice.py Wed Feb  8 04:44:09 2012
@@ -1,4 +1,4 @@
- # Licensed to the Apache Software Foundation (ASF) under one
+# Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
 # regarding copyright ownership.  The ASF licenses this file
@@ -42,21 +42,26 @@ class ClusterManagerService(object):
 		self.dfs = dfs
 		self.convertExceptions = boolean(config.get('ClusterManagerService', 'convertExceptions'))
 		self.log = logging.getLogger(__name__)
-		self.lastContacted = {}
-		self.decayedHosts = {}
-		self.decayedInstances = {}
+		self.hostLastContactTime = {}
+		#self.hostLastUpdateTime = {}
+		self.instanceLastContactTime = {}
 		self.expireHostTime = float(self.config.get('ClusterManagerService', 'expireHostTime'))
 		self.allowDecayed = float(self.config.get('ClusterManagerService', 'allowDecayed'))
 		self.allowMismatchedVersions = boolean(self.config.get('ClusterManagerService', 'allowMismatchedVersions'))
 		self.maxMemory = int(self.config.get('ClusterManagerService', 'maxMemory'))
 		self.maxCores = int(self.config.get('ClusterManagerService', 'maxCores'))
 		self.allowDuplicateNames = boolean(self.config.get('ClusterManagerService', 'allowDuplicateNames'))
-		now = time.time()
+		now = self.__now()
 		for instance in self.data.getInstances().itervalues():
 			instanceId = instance.id
 			instance = self.data.acquireInstance(instanceId)
 			instance.decayed = False
-			self.stateTransition(instance, None, InstanceState.Orphaned)
+
+			if instance.hostId is None:
+				self.stateTransition(instance, None, InstanceState.Pending)
+			else:
+				self.stateTransition(instance, None, InstanceState.Orphaned)
+
 			self.data.releaseInstance(instance)
 		for host in self.data.getHosts().itervalues():
 			hostId = host.id
@@ -64,101 +69,177 @@ class ClusterManagerService(object):
 			host.up = False
 			host.decayed = False
 			self.data.releaseHost(host)
-		self.decayLock = threading.Lock()
-		threading.Thread(target=self.monitorHosts).start()
+		threading.Thread(target=self.monitorCluster).start()
 
 	def stateTransition(self, instance, old, cur):
 		if (old and instance.state != old):
-			self.data.releaseInstance(instance)
 			raise TashiException(d={'errno':Errors.IncorrectVmState,'msg':"VmState is not %s - it is %s" % (vmStates[old], vmStates[instance.state])})
+		if (instance.state == cur):
+			# don't do anything if we're already at current state
+			return
+
 		instance.state = cur
+		# pass something down to the NM?
 
-	def updateDecay(self, set, obj):
-		now = time.time()
-		self.decayLock.acquire()
-		if (obj.decayed and obj.id not in set):
-			set[obj.id] = now
-		elif (not obj.decayed and obj.id in set):
-			del set[obj.id]
-		self.decayLock.release()
+	def __now(self):
+		return time.time()
+
+	def __downHost(self, host):
+		self.log.warning('Host %s is down' % (host.name))
+		host.up = False
+		host.decayed = False
+
+		self.__orphanInstances(host)
+
+	def __upHost(self, host):
+		self.log.warning('Host %s is up' % (host.name))
+		host.up = True
+		host.decayed = True
+
+	def __orphanInstances(self, host):
+		# expects lock to be held on host
+		instances = [instance.id for instance in self.data.getInstances().itervalues() if instance.hostId == host.id]
+
+		for instanceId in instances:
+			instance = self.data.acquireInstance(instanceId)
+			if instance.hostId == host.id:
+				instance.decayed = True
+				self.stateTransition(instance, None, InstanceState.Orphaned)
+
+			self.data.releaseInstance(instance)
+
+	def __checkHosts(self):
+		# Check if hosts have been heard from recently
+		# Otherwise, see if it is alive
+
+		for hostId in self.hostLastContactTime.keys():
+			if (self.hostLastContactTime[hostId] < (self.__now() - self.expireHostTime)):
+				host = self.data.acquireHost(hostId)
+				string = None
+				try:
+					string = self.proxy[host.name].liveCheck()
+				except:
+					pass
+
+				if string != "alive":
+					self.__downHost(host)
+					del self.hostLastContactTime[hostId]
+				else:
+					self.__upHost(host)
+					self.hostLastContactTime[hostId] = self.__now()
+
+				self.data.releaseHost(host)
+
+	def __checkInstances(self):
+		# Reconcile instances with nodes
+
+		# obtain a list of instances I know about
+		myInstancesError = False
+		try:
+			myInstances = self.data.getInstances()
+		except:
+			myInstancesError = True
+			self.log.warning('Failure communicating with my database')
+
+		if myInstancesError == True:
+			return
+
+		# iterate through all hosts I believe are up
+		for hostId in self.hostLastContactTime.keys():
+			#self.log.warning("iterate %d" % hostId)
+			host = self.data.acquireHost(hostId)
+			if (self.hostLastContactTime[hostId] < (self.__now() - self.allowDecayed)):
+				host.decayed = True
+
+				self.log.info('Fetching state from host %s because it is decayed' % (host.name))
+				
+				myInstancesThisHost = [i for i in myInstances.values() if i.hostId == host.id]
+
+				# get a list of VMs running on host
+				try:
+					hostProxy = self.proxy[host.name]
+					remoteInstances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
+				except:
+					self.log.warning('Failure getting instances from host %s' % (host.name))
+					self.data.releaseHost(host)
+					continue
+
+				# register instances I don't know about
+				for instance in remoteInstances:
+					if (instance.id not in myInstances):
+						instance.hostId = host.id
+						instance = self.data.registerInstance(instance)
+						self.data.releaseInstance(instance)
+				remoteInstanceIds = [i.id for i in remoteInstances]
+				# remove instances that shouldn't be running
+				for instance in myInstancesThisHost:
+					if (instance.id not in remoteInstanceIds):
+						# XXXstroucki before 20110902 excepted here with host lock
+						try:
+							instance = self.data.acquireInstance(instance.id)
+						except:
+							continue
+
+						# XXXstroucki destroy?
+						try:
+							del self.instanceLastContactTime[instance.id]
+						except:
+							pass
+						self.data.removeInstance(instance)
+
+				self.hostLastContactTime[hostId] = self.__now()
+				host.decayed = False
+
+			self.data.releaseHost(host)
+			#self.log.warning("iterate %d done" % hostId)
 		
+		# iterate through all VMs I believe are active
+		for instanceId in self.instanceLastContactTime.keys():
+			if (self.instanceLastContactTime[instanceId] < (self.__now() - self.allowDecayed)):
+				try:
+					instance = self.data.acquireInstance(instanceId)
+				except:
+					continue
+
+				instance.decayed = True
+				self.log.info('Fetching state on instance %s because it is decayed' % (instance.name))
+				if instance.hostId is None: raise AssertionError
 
-	def monitorHosts(self):
-		# XXX: retry multiple hosts (iterate through them even with an exception)
+				# XXXstroucki check if host is down?
+				host = self.data.getHost(instance.hostId)
+
+				# get updated state on VM
+				try:
+					hostProxy = self.proxy[host.name]
+					newInstance = hostProxy.getVmInfo(instance.vmId)
+				except:
+					self.log.warning('Failure getting data for instance %s from host %s' % (instance.name, host.name))
+					self.data.releaseInstance(instance)
+					continue
+
+				# replace existing state with new state
+				# XXXstroucki more?
+				instance.state = newInstance.state
+				self.instanceLastContactTime[instanceId] = self.__now()
+				instance.decayed = False
+				self.data.releaseInstance(instance)
+
+
+
+	def monitorCluster(self):
 		while True:
-			now = time.time()
 			sleepFor = min(self.expireHostTime, self.allowDecayed)
-			try:
-				for k in self.lastContacted.keys():
-					if (self.lastContacted[k] < (now-self.expireHostTime)):
-						host = self.data.acquireHost(k)
-						try: 
-							self.log.warning('Host %s has expired after %f seconds' % (host.name, self.expireHostTime))
-							for instanceId in [instance.id for instance in self.data.getInstances().itervalues() if instance.hostId == host.id]:
-								instance = self.data.acquireInstance(instanceId)
-								instance.decayed = True
-								self.stateTransition(instance, None, InstanceState.Orphaned)
-								self.data.releaseInstance(instance)
-							host.up = False
-							host.decayed = False
-						finally:
-							self.data.releaseHost(host)
-						del self.lastContacted[k]
-					else:
-						sleepFor = min(self.lastContacted[k] + self.expireHostTime - now, sleepFor)
-				for hostId in self.decayedHosts.keys():
-					# XXXstroucki: what if the host undecays here?
-					if (self.decayedHosts[hostId] < (now-self.allowDecayed)):
-						host = self.data.getHost(hostId)
-						self.log.warning('Fetching state from host %s because it is decayed' % (host.name))
-						hostProxy = self.proxy[host.name]
-						oldInstances = [i for i in self.data.getInstances().values() if i.hostId == host.id]
-						instances = [hostProxy.getVmInfo(vmId) for vmId in hostProxy.listVms()]
-						instanceIds = [i.id for i in instances]
-						for instance in instances:
-							if (instance.id not in self.data.getInstances()):
-								instance.hostId = host.id
-								instance = self.data.registerInstance(instance)
-								self.data.releaseInstance(instance)
-						for instance in oldInstances:
-							if (instance.id not in instanceIds):
-								instance = self.data.acquireInstance(instance.id)
-								self.data.removeInstance(instance)
-						self.decayedHosts[hostId] = now
-					else:
-						sleepFor = min(self.decayedHosts[hostId] + self.allowDecayed - now, sleepFor)
-				for instanceId in self.decayedInstances.keys():
-					try:
-						if (self.decayedInstances[instanceId] < (now-self.allowDecayed)):
-							self.log.warning('Fetching state on instance %d because it is decayed' % (instanceId))
-							try:
-								instance = self.data.getInstance(instanceId)
-								if instance.hostId is None: raise AssertionError
-							except TashiException, e:
-								if (e.errno == Errors.NoSuchInstanceId):
-									del self.decayedInstances[instanceId]
-									continue
-								else:
-									raise
-							host = self.data.getHost(instance.hostId)
-							hostProxy = self.proxy[host.name]
-							instance = hostProxy.getVmInfo(instance.vmId)
-							oldInstance = self.data.acquireInstance(instanceId)
-							oldInstance.state = instance.state
-							self.data.releaseInstance(oldInstance)
-							self.decayedInstances[instanceId] = now
-						else:
-							sleepFor = min(self.decayedInstances[instanceId] + self.allowDecayed - now, sleepFor)
-					except (KeyError, TashiException):
-						self.log.warning("Don't know about instance %d anymore." % instanceId)
-						self.data.removeInstance(instance)
-					except Exception, e:
-						self.log.exception('Exception in monitorHosts trying to get instance information')
-			except Exception, e:
-				self.log.exception('Exception in monitorHosts')
 
+			try:
+				self.__checkHosts()
+				self.__checkInstances()
+			except:
+				self.log.exception('monitorCluster iteration failed')
+			#  XXXrgass too chatty.  Remove
+			#self.log.info("Sleeping for %d seconds" % sleepFor)
 			time.sleep(sleepFor)
-	
+
+
 	def normalize(self, instance):
 		instance.id = None
 		instance.vmId = None
@@ -166,7 +247,7 @@ class ClusterManagerService(object):
 		instance.decayed = False
 		instance.name = scrubString(instance.name, allowed="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-.")
 		instance.state = InstanceState.Pending
-		# At some point, check userId
+		# XXXstroucki At some point, check userId
 		if (not self.allowDuplicateNames):
 			for i in self.data.getInstances().itervalues():
 				if (i.name == instance.name):
@@ -181,7 +262,7 @@ class ClusterManagerService(object):
 			raise TashiException(d={'errno':Errors.InvalidInstance,'msg':"Amount of memory must be <= %d" % (self.maxMemory)})
 		# Make sure disk spec is valid
 		# Make sure network spec is valid
-		# Ignore hints
+		# Ignore internal hints
 		for hint in instance.hints:
 			if (hint.startswith("__")):
 				del instance.hints[hint]
@@ -189,6 +270,7 @@ class ClusterManagerService(object):
 	
 	def createVm(self, instance):
 		"""Function to add a VM to the list of pending VMs"""
+		# XXXstroucki: check for exception here
 		instance = self.normalize(instance)
 		instance = self.data.registerInstance(instance)
 		self.data.releaseInstance(instance)
@@ -217,8 +299,6 @@ class ClusterManagerService(object):
 			# XXXstroucki: This is a problem with keeping
 			# clean state.
 			self.stateTransition(instance, None, InstanceState.Destroying)
-			self.data.releaseInstance(instance)
-
 			if instance.hostId is None:
 				self.data.removeInstance(instance)
 			else:
@@ -226,8 +306,9 @@ class ClusterManagerService(object):
 				try:
 					if hostname is not None:
 						self.proxy[hostname].destroyVm(instance.vmId)
-				except Exception:
-					self.log.exception('destroyVm failed on host %s vmId %d' % (hostname, instance.vmId))
+						self.data.releaseInstance(instance)
+				except:
+					self.log.exception('destroyVm failed on host %s vmId %s' % (hostname, str(instance.vmId)))
 					self.data.removeInstance(instance)
 
 
@@ -284,11 +365,12 @@ class ClusterManagerService(object):
 		except Exception, e:
 			self.log.exception('migrateVm failed')
 			raise
-		#instance = self.data.acquireInstance(instance.id)
-		#try:
-		#	instance.hostId = targetHost.id
-		#finally:
-		#	self.data.releaseInstance(instance)
+		try:
+			instance = self.data.acquireInstance(instance.id)
+			instance.hostId = targetHost.id
+		finally:
+			self.data.releaseInstance(instance)
+
 		try:
 			# Notify the target
 			vmId = self.proxy[targetHost.name].receiveVm(instance, cookie)
@@ -338,7 +420,23 @@ class ClusterManagerService(object):
 	
 	def getInstances(self):
 		return self.data.getInstances().values()
+
+	def getImages(self):
+		return self.data.getImages()
 	
+	def copyImage(self, src, dst):
+		imageSrc = self.dfs.getLocalHandle("images/" + src)
+		imageDst = self.dfs.getLocalHandle("images/" + dst)
+		try:
+			#  Attempt to restrict to the image directory
+			if ".." not in imageSrc and ".." not in imageDst:
+				self.dfs.copy(imageSrc, imageDst)
+				self.log.info('DFS image copy: %s->%s' % (imageSrc, imageDst))
+			else:
+				self.log.warning('DFS image copy bad path: %s->%s' % (imageSrc, imageDst))
+		except Exception, e:
+			self.log.exception('DFS image copy failed: %s (%s->%s)' % (e, imageSrc, imageDst))
+
 	def vmmSpecificCall(self, instanceId, arg):
 		instance = self.data.getInstance(instanceId)
 		hostname = self.data.getHost(instance.hostId).name
@@ -352,93 +450,75 @@ class ClusterManagerService(object):
 #	@timed
 	def registerNodeManager(self, host, instances):
 		"""Called by the NM every so often as a keep-alive/state polling -- state changes here are NOT AUTHORITATIVE"""
+
+		# Handle a new registration
 		if (host.id == None):
 			hostList = [h for h in self.data.getHosts().itervalues() if h.name == host.name]
 			if (len(hostList) != 1):
 				raise TashiException(d={'errno':Errors.NoSuchHost, 'msg':'A host with name %s is not identifiable' % (host.name)})
 			host.id = hostList[0].id
+
+		# Check if remote host information matches mine
 		oldHost = self.data.acquireHost(host.id)
 		if (oldHost.name != host.name):
 			self.data.releaseHost(oldHost)
 			raise TashiException(d={'errno':Errors.NoSuchHostId, 'msg':'Host id and hostname mismatch'})
-		try:
-			try:
-				self.lastContacted[host.id] = time.time()
-				oldHost.version = host.version
-				oldHost.memory = host.memory
-				oldHost.cores = host.cores
-				oldHost.up = True
-				oldHost.decayed = False
-
-# compare whether CM / NM versions are compatible
-				if (host.version != version and not self.allowMismatchedVersions):
-					oldHost.state = HostState.VersionMismatch
-				if (host.version == version and oldHost.state == HostState.VersionMismatch):
-					oldHost.state = HostState.Normal
-				for instance in instances:
-					try:
-						oldInstance = self.data.acquireInstance(instance.id)
-					except TashiException, e:
-						if (e.errno == Errors.NoSuchInstanceId):
-							self.log.info('Host %s reported an instance %d that did not previously exist (decay)' % (host.name, instance.id))
-							oldHost.decayed = True
-							continue
-							#oldInstance = self.data.registerInstance(instance)
-						else:
-							raise
-					try:
-						if (oldInstance.hostId != host.id):
-							self.log.info('Host %s is claiming instance %d actually owned by hostId %s (decay)' % (host.name, oldInstance.id, str(oldInstance.hostId)))
-							oldHost.decayed = True
-							continue
-						oldInstance.decayed = (oldInstance.state != instance.state)
-						self.updateDecay(self.decayedInstances, oldInstance)
-						if (oldInstance.decayed):
-							self.log.info('State reported as %s instead of %s for instance %d on host %s (decay)' % (vmStates[instance.state], vmStates[oldInstance.state], instance.id, host.name))
-					finally:
-						self.data.releaseInstance(oldInstance)
-				instanceIds = [instance.id for instance in instances]
-				for instanceId in [instance.id for instance in self.data.getInstances().itervalues() if instance.hostId == host.id]:
-					if (instanceId not in instanceIds):
-						self.log.info('instance %d was not reported by host %s as expected (decay)' % (instanceId, host.name))
-						instance = self.data.acquireInstance(instanceId)
-						instance.decayed = True
-						self.updateDecay(self.decayedInstances, instance)
-						oldHost.decayed = True
-						self.data.releaseInstance(instance)
-			except Exception, e:
-				oldHost.decayed = True
-				raise
-		finally:
-			self.updateDecay(self.decayedHosts, oldHost)
-			self.data.releaseHost(oldHost)
 
+		if oldHost.up == False:
+			self.__upHost(oldHost)
+		self.hostLastContactTime[host.id] = time.time()
+		#self.hostLastUpdateTime[host.id] = time.time()
+		oldHost.version = host.version
+		oldHost.memory = host.memory
+		oldHost.cores = host.cores
+
+		# compare whether CM / NM versions are compatible
+		if (host.version != version and not self.allowMismatchedVersions):
+			oldHost.state = HostState.VersionMismatch
+		if (host.version == version and oldHost.state == HostState.VersionMismatch):
+			oldHost.state = HostState.Normal
+
+		# let the host communicate what it is running
+		# XXXrgass - This is too chatty for the console, I think we should remove this.
+		# XXXstroucki - My install depends on this, but I output to log files. This should be handled by a separate accounting server in future.
+		for instance in instances:
+			self.log.info('Accounting: id %d host %d vmId %d user %d cores %d memory %d' % (instance.id, host.id, instance.vmId, instance.userId, instance.cores, instance.memory))
+			self.instanceLastContactTime.setdefault(instance.id, 0)
+
+		self.data.releaseHost(oldHost)
 		return host.id
 	
 	def vmUpdate(self, instanceId, instance, oldState):
 		try:
 			oldInstance = self.data.acquireInstance(instanceId)
 		except TashiException, e:
+			# shouldn't have a lock to clean up after here
 			if (e.errno == Errors.NoSuchInstanceId):
-				self.log.exception('Got vmUpdate for unknown instanceId %d' % (instanceId))
+				self.log.warning('Got vmUpdate for unknown instanceId %d' % (instanceId))
 				return
-			else:
-				raise
+		except:
+			self.log.exception("Could not acquire instance")
+			raise
+
+		self.instanceLastContactTime[instanceId] = time.time()
+		oldInstance.decayed = False
+
 		if (instance.state == InstanceState.Exited):
-			oldInstance.decayed = False
-			self.updateDecay(self.decayedInstances, oldInstance)
+			# determine why a VM has exited
 			hostname = self.data.getHost(oldInstance.hostId).name
 			if (oldInstance.state not in [InstanceState.ShuttingDown, InstanceState.Destroying, InstanceState.Suspending]):
-				self.log.warning('Unexpected exit on %s of instance %d (vmId %d)' % (hostname, instanceId, oldInstance.vmId))
+				self.log.warning('Unexpected exit on %s of instance %s (vmId %d)' % (hostname, oldInstance.name, oldInstance.vmId))
 			if (oldInstance.state == InstanceState.Suspending):
 				self.stateTransition(oldInstance, InstanceState.Suspending, InstanceState.Suspended)
 				oldInstance.hostId = None
 				oldInstance.vmId = None
 				self.data.releaseInstance(oldInstance)
 			else:
+				del self.instanceLastContactTime[oldInstance.id]
 				self.data.removeInstance(oldInstance)
 		else:
 			if (instance.state):
+				# XXXstroucki does this matter?
 				if (oldState and oldInstance.state != oldState):
 					self.log.warning('Got vmUpdate of state from %s to %s, but the instance was previously %s' % (vmStates[oldState], vmStates[instance.state], vmStates[oldInstance.state]))
 				oldInstance.state = instance.state
@@ -452,13 +532,14 @@ class ClusterManagerService(object):
 						for oldNic in oldInstance.nics:
 							if (oldNic.mac == nic.mac):
 								oldNic.ip = nic.ip
-			oldInstance.decayed = False
-			self.updateDecay(self.decayedInstances, oldInstance)
+
 			self.data.releaseInstance(oldInstance)
-		return
+
+		return "success"
 	
 	def activateVm(self, instanceId, host):
 		dataHost = self.data.acquireHost(host.id)
+
 		if (dataHost.name != host.name):
 			self.data.releaseHost(dataHost)
 			raise TashiException(d={'errno':Errors.HostNameMismatch,'msg':"Mismatched target host"})
@@ -468,14 +549,20 @@ class ClusterManagerService(object):
 		if (dataHost.state != HostState.Normal):
 			self.data.releaseHost(dataHost)
 			raise TashiException(d={'errno':Errors.HostStateError,'msg':"Target host state is not normal"})
+
 		self.data.releaseHost(dataHost)
 		instance = self.data.acquireInstance(instanceId)
+
 		if ('__resume_source' in instance.hints):
 			self.stateTransition(instance, InstanceState.Pending, InstanceState.Resuming)
 		else:
-			self.stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
+			# XXXstroucki should held VMs be continually tried? Or be explicitly set back to pending?
+			#self.stateTransition(instance, InstanceState.Pending, InstanceState.Activating)
+			self.stateTransition(instance, None, InstanceState.Activating)
+
 		instance.hostId = host.id
 		self.data.releaseInstance(instance)
+
 		try:
 			if ('__resume_source' in instance.hints):
 				vmId = self.proxy[host.name].resumeVm(instance, instance.hints['__resume_source'])
@@ -486,24 +573,32 @@ class ClusterManagerService(object):
 			if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization and initialization fails
 				self.data.removeInstance(instance)
 			else:
+				# XXXstroucki what can we do about pending hosts in the scheduler?
+				# put them at the end of the queue and keep trying?
 				self.stateTransition(instance, None, InstanceState.Held)
 				instance.hostId = None
 				self.data.releaseInstance(instance)
-			raise
+			return "failure"
+
 		instance = self.data.acquireInstance(instanceId)
 		instance.vmId = vmId
+
 		if (instance.state is InstanceState.Destroying): # Special case for if destroyVm is called during initialization
-			self.data.releaseInstance(instance)
 			try:
 				self.proxy[host.name].destroyVm(vmId)
+				self.data.removeInstance(instance)
 			except Exception:
 				self.log.exception('destroyVm failed for host %s vmId %d' % (host.name, instance.vmId))
-				raise
+				self.data.releaseInstance(instance)
+				return "failure"
 		else:
 			if ('__resume_source' not in instance.hints):
-				self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
-			self.data.releaseInstance(instance)
-		return
+				# XXXstroucki should we just wait for NM to update?
+				#self.stateTransition(instance, InstanceState.Activating, InstanceState.Running)
+				pass
+
+		self.data.releaseInstance(instance)
+		return "success"
 
         def registerHost(self, hostname, memory, cores, version):
                 hostId, alreadyRegistered = self.data.registerHost(hostname, memory, cores, version)

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/datainterface.py Wed Feb  8 04:44:09 2012
@@ -63,7 +63,7 @@ class DataInterface(object):
 	
 	def getUser(self, id):
 		raise NotImplementedError
-		
+
 	def registerHost(self, hostname, memory, cores, version):
 		raise NotImplementedError
 	

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/getentoverride.py Wed Feb  8 04:44:09 2012
@@ -17,14 +17,17 @@
 
 import subprocess
 import time
-from tashi.rpycservices.rpyctypes import User
+import os
+from tashi.rpycservices.rpyctypes import User, LocalImages
 from tashi.clustermanager.data import DataInterface
-from tashi.util import instantiateImplementation
+from tashi.util import instantiateImplementation, humanReadable
 
 class GetentOverride(DataInterface):
 	def __init__(self, config):
 		DataInterface.__init__(self, config)
 		self.baseDataObject = instantiateImplementation(config.get("GetentOverride", "baseData"), config)
+		self.dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
+
 		self.users = {}
 		self.lastUserUpdate = 0.0
 		self.fetchThreshold = float(config.get("GetentOverride", "fetchThreshold"))
@@ -64,6 +67,17 @@ class GetentOverride(DataInterface):
 	
 	def getNetwork(self, id):
 		return self.baseDataObject.getNetwork(id)
+
+	def getImages(self):
+		count = 0
+		myList = []
+		for i in self.dfs.list("images"):
+			myFile = self.dfs.getLocalHandle("images/" + i)
+			if os.path.isfile(myFile):
+				image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+				myList.append(image)
+				count += 1
+		return myList
 	
 	def fetchFromGetent(self):
 		now = time.time()

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/ldapoverride.py Wed Feb  8 04:44:09 2012
@@ -67,7 +67,7 @@ class LdapOverride(DataInterface):
 	
 	def getNetwork(self, id):
 		return self.baseDataObject.getNetwork(id)
-	
+
 	def fetchFromLdap(self):
 		now = time.time()
 		if (now - self.lastUserUpdate > self.fetchThreshold):

Modified: incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/clustermanager/data/sql.py Wed Feb  8 04:44:09 2012
@@ -19,20 +19,23 @@ import logging
 import threading
 import time
 import types
+# XXXstroucki getImages needs os?
+import os
 from tashi.rpycservices.rpyctypes import *
 from tashi.clustermanager.data.datainterface import DataInterface
-from tashi.util import stringPartition, boolean
+from tashi.util import stringPartition, boolean, instantiateImplementation, humanReadable
 
 class SQL(DataInterface):
 	def __init__(self, config):
 		DataInterface.__init__(self, config)
 		self.uri = self.config.get("SQL", "uri")
 		self.log = logging.getLogger(__name__)
+		self.dfs = instantiateImplementation(config.get("ClusterManager", "dfs"), config)
 
 		if (self.uri.startswith("sqlite://")):
 			import sqlite
 			self.dbEngine = "sqlite"
-			self.conn = sqlite.connect(self.uri[9:], autocommit=1)
+			self.conn = sqlite.connect(self.uri[9:], autocommit=1, timeout=1500)
 		elif (self.uri.startswith("mysql://")):
 			import MySQLdb
 			self.dbEngine = "mysql"
@@ -49,6 +52,7 @@ class SQL(DataInterface):
 		self.instanceLock = threading.Lock()
 		self.instanceIdLock = threading.Lock()
 		self.instanceLocks = {}
+		self.instanceBusy = {}
 		self.hostLock = threading.Lock()
 		self.hostLocks = {}
 		self.maxInstanceId = 1
@@ -63,15 +67,20 @@ class SQL(DataInterface):
 			try:
 				cur.execute(stmt)
 			except:
-				self.log.exception('Exception executing SQL statement')
+				self.log.exception('Exception executing SQL statement %s' % stmt)
 		finally:
 			self.sqlLock.release()
 		return cur
 		
 	def getNewInstanceId(self):
 		self.instanceIdLock.acquire()
-		instanceId = self.maxInstanceId
+		cur = self.executeStatement("SELECT MAX(id) FROM instances")
+		self.maxInstanceId = cur.fetchone()[0]
+		# XXXstroucki perhaps this can be handled nicer
+		if (self.maxInstanceId is None):
+			self.maxInstanceId = 0
 		self.maxInstanceId = self.maxInstanceId + 1
+		instanceId = self.maxInstanceId
 		self.instanceIdLock.release()
 		return instanceId
 	
@@ -135,6 +144,7 @@ class SQL(DataInterface):
 			instance._lock = threading.Lock()
 			self.instanceLocks[instance.id] = instance._lock
 			instance._lock.acquire()
+			self.instanceBusy[instance.id] = True
 			l = self.makeInstanceList(instance)
 			self.executeStatement("INSERT INTO instances VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" % tuple(l))
 		finally:
@@ -142,7 +152,13 @@ class SQL(DataInterface):
 		return instance
 	
 	def acquireInstance(self, instanceId):
-		self.instanceLock.acquire()
+		busyCheck = True
+		while busyCheck == True:
+			self.instanceLock.acquire()
+			busyCheck = self.instanceBusy.setdefault(instanceId, False)
+			if busyCheck:
+				self.instanceLock.release()
+
 		try:
 			cur = self.executeStatement("SELECT * from instances WHERE id = %d" % (instanceId))
 			l = cur.fetchone()
@@ -152,8 +168,10 @@ class SQL(DataInterface):
 			self.instanceLocks[instance.id] = self.instanceLocks.get(instance.id, threading.Lock())
 			instance._lock = self.instanceLocks[instance.id]
 			instance._lock.acquire()
+			self.instanceBusy[instance.id] = True
 		finally:
 			self.instanceLock.release()
+
 		return instance
 	
 	def releaseInstance(self, instance):
@@ -166,7 +184,11 @@ class SQL(DataInterface):
 				if (e < len(self.instanceOrder)-1):
 					s = s + ", "
 			self.executeStatement("UPDATE instances SET %s WHERE id = %d" % (s, instance.id))
+			self.instanceBusy[instance.id] = False
 			instance._lock.release()
+		except:
+			self.log.exception("Excepted while holding lock")
+			raise
 		finally:
 			self.instanceLock.release()
 	
@@ -174,8 +196,13 @@ class SQL(DataInterface):
 		self.instanceLock.acquire()
 		try:
 			self.executeStatement("DELETE FROM instances WHERE id = %d" % (instance.id))
-			instance._lock.release()
+			#XXXstroucki extraneous instance won't have a lock
+			try:
+				instance._lock.release()
+			except:
+				pass
 			del self.instanceLocks[instance.id]
+			del self.instanceBusy[instance.id]
 		finally:
 			self.instanceLock.release()
 	
@@ -258,6 +285,17 @@ class SQL(DataInterface):
 		r = cur.fetchone()
 		network = Network(d={'id':r[0], 'name':r[1]})
 		return network
+
+        def getImages(self):
+                count = 0
+                myList = []
+                for i in self.dfs.list("images"):
+                        myFile = self.dfs.getLocalHandle("images/" + i)
+                        if os.path.isfile(myFile):
+                                image = LocalImages(d={'id':count, 'imageName':i, 'imageSize':humanReadable(self.dfs.stat(myFile)[6])})
+                                myList.append(image)
+                                count += 1
+                return myList
 	
 	def getUsers(self):
 		cur = self.executeStatement("SELECT * from users")

Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanager.py Wed Feb  8 04:44:09 2012
@@ -28,7 +28,7 @@ from tashi import boolean
 
 from tashi.rpycservices import rpycservices
 from rpyc.utils.server import ThreadedServer
-from rpyc.utils.authenticators import VdbAuthenticator
+from rpyc.utils.authenticators import TlsliteVdbAuthenticator
 
 @signalHandler(signal.SIGTERM)
 def handleSIGTERM(signalNumber, stackFrame):
@@ -51,11 +51,11 @@ def main():
 	if boolean(config.get("Security", "authAndEncrypt")):
 		users = {}
 		users[config.get('AllowedUsers', 'clusterManagerUser')] = config.get('AllowedUsers', 'clusterManagerPassword')
-		authenticator = VdbAuthenticator.from_dict(users)
+		authenticator = TlsliteVdbAuthenticator.from_dict(users)
 		t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False, authenticator=authenticator)
 	else:
 		t = ThreadedServer(service=rpycservices.ManagerService, hostname='0.0.0.0', port=int(config.get('NodeManagerService', 'port')), auto_register=False)
-	t.logger.quiet = True
+	t.logger.setLevel(logging.ERROR)
 	t.service.service = service
 	t.service._type = 'NodeManagerService'
 

Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/nodemanagerservice.py Wed Feb  8 04:44:09 2012
@@ -71,12 +71,9 @@ class NodeManagerService(object):
 	
 	def loadVmInfo(self):
 		try:
-			f = open(self.infoFile, "r")
-			data = f.read()
-			f.close()
-			self.instances = cPickle.loads(data)
+			self.instances = self.vmm.getInstances()
 		except Exception, e:
-			self.log.warning('Failed to load VM info from %s' % (self.infoFile))
+			self.log.exception('Failed to obtain VM info')
 			self.instances = {}
 	
 	def saveVmInfo(self):
@@ -94,6 +91,12 @@ class NodeManagerService(object):
 			self.log.warning('VM state was %s, call indicated %s' % (vmStates[instance.state], vmStates[old]))
 		if (cur == InstanceState.Exited):
 			del self.instances[vmId]
+			return True
+
+		if (instance.state == cur):
+			# Don't do anything if state is what it should be
+			return True
+
 		instance.state = cur
 		newInst = Instance(d={'state':cur})
 		success = lambda: None
@@ -168,6 +171,8 @@ class NodeManagerService(object):
 			try:
 				host = self.vmm.getHostInfo(self)
 				instances = self.instances.values()
+				#import pprint
+				#self.log.warning("Instances: " + pprint.saferepr(instances))
 				self.id = cm.registerNodeManager(host, instances)
 			except Exception, e:
 				self.log.exception('Failed to register with the CM')
@@ -291,6 +296,9 @@ class NodeManagerService(object):
 	
 	def listVms(self):
 		return self.instances.keys()
+
+	def liveCheck(self):
+		return "alive"
 	
 	def statsThread(self):
 		if (self.statsInterval == 0):

Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/qemu.py Wed Feb  8 04:44:09 2012
@@ -27,6 +27,9 @@ import subprocess
 import sys
 import time
 
+# for scratch space support
+from os import system
+
 from tashi.rpycservices.rpyctypes import *
 from tashi.util import broken, logged, scrubString, boolean
 from tashi import version, stringPartition
@@ -102,6 +105,8 @@ class Qemu(VmControlInterface):
 		self.consolePortLock = threading.Lock()
 		self.migrationSemaphore = threading.Semaphore(int(self.config.get("Qemu", "maxParallelMigrations")))
 		self.stats = {}
+		self.scratchVg = self.config.get("Qemu", "scratchVg")
+		# XXXstroucki revise
 		self.scratchDir = self.config.get("Qemu", "scratchDir")
 		if len(self.scratchDir) == 0:
 			self.scratchDir = "/tmp"
@@ -118,7 +123,7 @@ class Qemu(VmControlInterface):
 	class anonClass:
 		def __init__(self, **attrs):
 			self.__dict__.update(attrs)
-	
+
 	def getSystemPids(self):
 		"""Utility function to get a list of system PIDs that match the QEMU_BIN specified (/proc/nnn/exe)"""
 		pids = []
@@ -130,15 +135,25 @@ class Qemu(VmControlInterface):
 			except Exception:
 				pass
 		return pids
-	
+
+	def getInstances(self):
+		"""Will return a dict of instances by vmId to the caller"""
+		return dict((x, self.controlledVMs[x].instance) for x in self.controlledVMs.keys())
+
 	def matchSystemPids(self, controlledVMs):
 		"""This is run in a separate polling thread and it must do things that are thread safe"""
+		if self.nm is None:
+			#XXXstroucki log may not be there yet either
+			#self.log.info("NM hook not yet available")
+			return
+
 		vmIds = controlledVMs.keys()
 		pids = self.getSystemPids()
 		for vmId in vmIds:
+			child = controlledVMs[vmId]
+
 			if vmId not in pids:
 				os.unlink(self.INFO_DIR + "/%d"%(vmId))
-				child = controlledVMs[vmId]
 				del controlledVMs[vmId]
 				try:
 					del self.stats[vmId]
@@ -163,11 +178,30 @@ class Qemu(VmControlInterface):
 					for i in child.monitorHistory:
 						f.write(i)
 					f.close()
+				#XXXstroucki remove scratch storage
+				try:
+					if self.scratchVg is not None:
+						scratch_name = child.instance.name
+						log.info("Removing any scratch for " + scratch_name)
+						cmd = "/sbin/lvremove -f %s" % self.scratchVg
+    						result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
+				except:
+					pass
+
 				try:
 					if (not child.migratingOut):
 						self.nm.vmStateChange(vmId, None, InstanceState.Exited)
 				except Exception, e:
 					log.exception("vmStateChange failed")
+			else:
+				try:
+					if (child.migratingOut):
+						self.nm.vmStateChange(vmId, None, InstanceState.MigrateTrans)
+					else:
+						self.nm.vmStateChange(vmId, None, InstanceState.Running)
+				except:
+					#XXXstroucki nm is initialised at different time
+					log.exception("vmStateChange failed")
 						
 	
 	def scanInfoDir(self):
@@ -185,13 +219,21 @@ class Qemu(VmControlInterface):
 				self.vncPortLock.release()
 				child.monitorFd = os.open(child.ptyFile, os.O_RDWR | os.O_NOCTTY)
 				child.monitor = os.fdopen(child.monitorFd)
+
+				#XXXstroucki ensure instance has vmId
+				child.instance.vmId = vmId
+				
 				self.controlledVMs[child.pid] = child
 				log.info("Adding vmId %d" % (child.pid))
 			except Exception, e:
 				log.exception("Failed to load VM info for %d", vmId)
 			else:
 				log.info("Loaded VM info for %d", vmId)
-		self.matchSystemPids(self.controlledVMs)
+		# XXXstroucki NM may not be available yet here.
+		try:
+			self.matchSystemPids(self.controlledVMs)
+		except:
+			pass
 	
 	def pollVMsLoop(self):
 		"""Infinite loop that checks for dead VMs"""
@@ -262,7 +304,8 @@ class Qemu(VmControlInterface):
 		res = self.consumeAvailable(child)
 		os.write(child.monitorFd, command + "\n")
 		if (expectPrompt):
-			self.consumeUntil(child, command)
+			# XXXstroucki: receiving a vm can take a long time
+			self.consumeUntil(child, command, timeout=timeout)
 			res = self.consumeUntil(child, "(qemu) ", timeout=timeout)
 		return res
 
@@ -300,13 +343,8 @@ class Qemu(VmControlInterface):
 		cmd = "head -n 1 /proc/meminfo"		
 		memoryStr = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).stdout.read().strip().split()
 		if (memoryStr[2] == "kB"):
-			host.memory = int(memoryStr[1])/1024
-		elif (memoryStr[2] == "mB"):
-			host.memory = int(memoryStr[1])
-		elif (memoryStr[2] == "gB"):
-			host.memory = int(memoryStr[1])*1024
-		elif (memoryStr[2] == " B"):
-			host.memory = int(memoryStr[1])/(1024*1024)
+			# XXXstroucki should have parameter for reserved mem
+			host.memory = (int(memoryStr[1])/1024) - 512
 		else:
 			log.warning('Unable to determine amount of physical memory - reporting 0')
 			host.memory = 0
@@ -351,6 +389,8 @@ class Qemu(VmControlInterface):
 				snapshot = "on"
 				migrate = "on"
 
+			thisDiskList.append("cache=off")
+
 			thisDiskList.append("snapshot=%s" % snapshot)
 
 			if (self.useMigrateArgument):
@@ -358,9 +398,50 @@ class Qemu(VmControlInterface):
 
 			diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
 
+		# scratch disk (should be integrated better)
+		scratchSize = instance.hints.get("scratchSpace", "0")
+		scratchSize = int(scratchSize)
+		scratch_file = None
 
+		try:
+			if scratchSize > 0:
+				if self.scratchVg is None:
+					raise Exception, "No scratch volume group defined"
+				# create scratch disk
+				# XXXstroucki: needs to be cleaned somewhere
+				# XXXstroucki: clean user provided instance name
+				scratch_name = "lv" + instance.name
+				# XXXstroucki hold lock
+				# XXXstroucki check for capacity
+				cmd = "/sbin/lvcreate -n" + scratch_name + " -L" + str(scratchSize) + "G " + self.scratchVg
+				result = subprocess.Popen(cmd.split(), executable=cmd.split()[0], stdout=subprocess.PIPE).wait()
+				index += 1
+
+				thisDiskList = [ "file=/dev/%s/%s" % (self.scratchVg, scratch_name) ]
+				thisDiskList.append("if=%s" % diskInterface)
+				thisDiskList.append("index=%d" % index)
+				thisDiskList.append("cache=off")
+				
+				if (True or disk.persistent):
+					snapshot = "off"
+					migrate = "off"
+				else:
+					snapshot = "on"
+					migrate = "on"
+
+				thisDiskList.append("snapshot=%s" % snapshot)
+
+				if (self.useMigrateArgument):
+					thisDiskList.append("migrate=%s" % migrate)
+
+				diskString = diskString + "-drive " + ",".join(thisDiskList) + " "
+
+		except:
+			print 'caught exception'
+			raise 'exception'
+	
 		#  Nic hints
-		nicModel = instance.hints.get("nicModel", "e1000")
+		nicModel = instance.hints.get("nicModel", "virtio")
 		nicString = ""
 		for i in range(0, len(instance.nics)):
 			nic = instance.nics[i]
@@ -404,6 +485,7 @@ class Qemu(VmControlInterface):
 		child = self.anonClass(pid=pid, instance=instance, stderr=os.fdopen(pipe_r, 'r'), migratingOut = False, monitorHistory=[], errorBit = True, OSchild = True)
 		child.ptyFile = None
 		child.vncPort = -1
+		child.instance.vmId = child.pid
 		self.saveChildInfo(child)
 		self.controlledVMs[child.pid] = child
 		log.info("Adding vmId %d" % (child.pid))
@@ -427,7 +509,8 @@ class Qemu(VmControlInterface):
 		child.monitor = os.fdopen(child.monitorFd)
 		self.saveChildInfo(child)
 		if (issueContinue):
-			self.enterCommand(child, "c")
+			# XXXstroucki: receiving a vm can take a long time
+			self.enterCommand(child, "c", timeout=None)
 	
 	def stopVm(self, vmId, target, stopFirst):
 		"""Universal function to stop a VM -- used by suspendVM, migrateVM """
@@ -437,7 +520,7 @@ class Qemu(VmControlInterface):
 		if (target):
 			retry = self.migrationRetries
 			while (retry > 0):
-				res = self.enterCommand(child, "migrate %s" % (target), timeout=self.migrateTimeout)
+				res = self.enterCommand(child, "migrate -i %s" % (target), timeout=self.migrateTimeout)
 				retry = retry - 1
 				if (res.find("migration failed") == -1):
 					retry = -1
@@ -459,7 +542,6 @@ class Qemu(VmControlInterface):
 		return vmId
 	
 	def suspendVm(self, vmId, target):
-		child = self.getChildFromPid(vmId)
 		tmpTarget = "/tmp/tashi_qemu_suspend_%d_%d" % (os.getpid(), vmId)
 		# XXX: Use fifo to improve performance
 		vmId = self.stopVm(vmId, "\"exec:gzip -c > %s\"" % (tmpTarget), True)
@@ -678,6 +760,7 @@ class Qemu(VmControlInterface):
 										self.stats[vmId]['%s_%s' % (device, label)] = int(val)
 					self.stats[vmId]['cpuLoad'] = cpuLoad
 					self.stats[vmId]['rss'] = rss
+					self.stats[vmId]['vsize'] = vsize
 					self.stats[vmId]['recvMBs'] = sendMBs
 					self.stats[vmId]['sendMBs'] = recvMBs
 			except:

Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/vmcontrolinterface.py Wed Feb  8 04:44:09 2012
@@ -27,6 +27,10 @@ class VmControlInterface(object):
 		self.config = config
 		self.dfs = dfs
 		self.nm = nm
+
+        def getInstances(self):
+                """Will return a dict of instances by vmId to the caller"""
+		raise NotImplementedError
 	
 	def instantiateVm(self, instance):
 		"""Takes an InstanceConfiguration, creates a VM based on it, 

Modified: incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py
URL: http://svn.apache.org/viewvc/incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py?rev=1241773&r1=1241772&r2=1241773&view=diff
==============================================================================
--- incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py (original)
+++ incubator/tashi/branches/oldstable/src/tashi/nodemanager/vmcontrol/xenpv.py Wed Feb  8 04:44:09 2012
@@ -111,6 +111,7 @@ class XenPV(VmControlInterface, threadin
 		self.transientDir = self.config.get('XenPV', 'transientDir')
 		self.defaultVmType = self.config.get('XenPV', 'defaultVmType') 
 		self.disktype = self.config.get('XenPV', 'defaultDiskType')
+		# XXXstroucki default disktype vhd?
 		self.newvms = listVms(self.vmNamePrefix)
 		self.hostId = -1
 		self.sleeptime = 5
@@ -133,7 +134,7 @@ class XenPV(VmControlInterface, threadin
 				# If the vm had transient disks, delete them
 				for i in range(len(a.disks)):
 					if a.disks[i].persistent == False:
-						diskname = self.transientDisk(a.id, i, disktype)
+						diskname = self.transientDisk(a.id, i, self.disktype)
 						try:
 							os.unlink(diskname)
 						except:
@@ -150,7 +151,7 @@ class XenPV(VmControlInterface, threadin
 			time.sleep(self.sleeptime)
 			self.cron()
 ########################################
-# This is an ugly function, but the muti-line string literal makes it
+# This is an ugly function, but the multi-line string literal makes it
 # a lot easier
 ########################################
 	def createXenConfig(self, vmName, 
@@ -167,12 +168,7 @@ class XenPV(VmControlInterface, threadin
 		vmType = hints.get('vmtype', self.defaultVmType)
 		print 'starting vm with type: ', vmType
 
-                try:
-                   disktype = self.config.get('XenPV', 'defaultDiskType')
-                except:
-                   disktype = 'vhd'
-
-                disk0 = 'tap:%s'%disktype
+                disk0 = 'tap:%s' % self.disktype
 		diskU = 'xvda1'
 
 		try:
@@ -216,7 +212,7 @@ kernel = "%s"
      ramdisk)
 
 		elif vmType == 'hvm':
-			disk0 = 'tap:%s'%disktype
+			disk0 = 'tap:%s' % self.disktype
 			diskU = 'hda1'