You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ra...@apache.org on 2014/11/05 11:45:13 UTC
[19/50] [abbrv] git commit: updated refs/heads/master to 4c5f792
CLOUDSTACK-7632:Automation for volume life cycle testPath
Signed-off-by: SrikanteswaraRao Talluri <ta...@apache.org>
This closes #32
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/a69950f6
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/a69950f6
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/a69950f6
Branch: refs/heads/master
Commit: a69950f6a97819c9c071ad9f278377f2a5ca77ca
Parents: aa1e260
Author: Prashant Kumar Mishra <pr...@citrix.comm>
Authored: Thu Oct 9 11:42:56 2014 +0530
Committer: SrikanteswaraRao Talluri <ta...@apache.org>
Committed: Thu Oct 30 18:30:25 2014 +0530
----------------------------------------------------------------------
.../testpaths/testpath_volumelifecycle.py | 967 ++++++++++++++-----
1 file changed, 732 insertions(+), 235 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/a69950f6/test/integration/testpaths/testpath_volumelifecycle.py
----------------------------------------------------------------------
diff --git a/test/integration/testpaths/testpath_volumelifecycle.py b/test/integration/testpaths/testpath_volumelifecycle.py
index d30f2b0..526d20c 100644
--- a/test/integration/testpaths/testpath_volumelifecycle.py
+++ b/test/integration/testpaths/testpath_volumelifecycle.py
@@ -16,11 +16,12 @@
# under the License.
"""Utilities functions
"""
-#All tests inherit from cloudstackTestCase
+# All tests inherit from cloudstackTestCase
from marvin.cloudstackTestCase import cloudstackTestCase
-#Import Integration Libraries
+# Import Integration Libraries
from marvin.codes import FAILED, PASS
-#base - contains all resources as entities and defines create, delete, list operations on them
+# base - contains all resources as entities and defines create, delete,
+# list operations on them
from marvin.lib.base import (Account,
VirtualMachine,
ServiceOffering,
@@ -28,10 +29,11 @@ from marvin.lib.base import (Account,
DiskOffering,
Volume,
Template,
- StoragePool)
+ StoragePool,
+ Resources)
from marvin.lib.utils import cleanup_resources, validateList
-#common - commonly used methods for all tests are listed here
+# common - commonly used methods for all tests are listed here
from marvin.lib.common import (get_zone,
get_domain,
get_template,
@@ -43,54 +45,97 @@ import tempfile
def verify_attach_volume(self, vmid, volid):
- list_volumes = Volume.list(self.userapiclient,
- id=volid
- )
- self.assertEqual(validateList(list_volumes)[0], PASS, "Check List volume response for volume %s" % volid)
- self.assertEqual(len(list_volumes), 1, "There is no data disk attached to vm id:%s" % vmid)
- self.assertEqual(list_volumes[0].virtualmachineid, vmid, "Check if volume state (attached) is reflected")
- self.debug("volume id:%s successfully attached to vm id%s" % (volid, vmid))
- return
+ list_volumes = Volume.list(self.userapiclient,
+ id=volid
+ )
+ self.assertEqual(
+ validateList(list_volumes)[0],
+ PASS,
+ "Check List volume response for volume %s" %
+ volid)
+ self.assertEqual(
+ len(list_volumes),
+ 1,
+ "There is no data disk attached to vm id:%s" %
+ vmid)
+ self.assertEqual(
+ list_volumes[0].virtualmachineid,
+ vmid,
+ "Check if volume state (attached) is reflected")
+ self.debug("volume id:%s successfully attached to vm id%s" % (volid, vmid))
+ return
def verify_detach_volume(self, vmid, volid):
- list_volumes = Volume.list(self.userapiclient,
- id=volid
- )
- self.assertEqual(validateList(list_volumes)[0], PASS, "Check List volume response for volume %s" % volid)
- self.assertEqual(len(list_volumes), 1, "Detach data disk id: %s for vm id :%s was not successful" % (volid, vmid))
- self.assertEqual(list_volumes[0].virtualmachineid, None, "Check if volume state (attached) is reflected")
- self.debug("volume id: %s successfully detached from vm id:%s" % (volid, vmid))
+ list_volumes = Volume.list(self.userapiclient,
+ id=volid
+ )
+ self.assertEqual(
+ validateList(list_volumes)[0],
+ PASS,
+ "Check List volume response for volume %s" %
+ volid)
+ self.assertEqual(
+ len(list_volumes),
+ 1,
+ "Detach data disk id: %s for vm id :%s was not successful" %
+ (volid,
+ vmid))
+ self.assertEqual(
+ list_volumes[0].virtualmachineid,
+ None,
+ "Check if volume state (attached) is reflected")
+ self.debug(
+ "volume id: %s successfully detached from vm id:%s" %
+ (volid, vmid))
def verify_vm(self, vmid):
- list_vm = list_virtual_machines(self.userapiclient,
- account=self.account.name,
- domainid=self.account.domainid,
- id=vmid
- )
- self.assertEqual(validateList(list_vm)[0], PASS, "Check List vm response for vmid: %s" % vmid)
- self.assertGreater(len(list_vm), 0, "Check the list vm response for vm id: %s" % vmid)
- vm = list_vm[0]
- self.assertEqual(vm.id, str(vmid), "Vm deployed is different from the test")
- self.assertEqual(vm.state, "Running", "VM is not in Running state")
- self.debug("VM got created successfully %s" % vmid)
+ list_vm = list_virtual_machines(self.userapiclient,
+ account=self.account.name,
+ domainid=self.account.domainid,
+ id=vmid
+ )
+ self.assertEqual(
+ validateList(list_vm)[0],
+ PASS,
+ "Check List vm response for vmid: %s" %
+ vmid)
+ self.assertGreater(
+ len(list_vm),
+ 0,
+ "Check the list vm response for vm id: %s" %
+ vmid)
+ vm = list_vm[0]
+ self.assertEqual(
+ vm.id,
+ str(vmid),
+ "Vm deployed is different from the test")
+ self.assertEqual(vm.state, "Running", "VM is not in Running state")
+ self.debug("VM got created successfully %s" % vmid)
class TestPathVolume(cloudstackTestCase):
+
@classmethod
def setUpClass(cls):
testClient = super(TestPathVolume, cls).getClsTestClient()
cls.apiclient = testClient.getApiClient()
cls.testdata = testClient.getParsedTestDataConfig()
- #Get Zone,Domain and templates
+ # Get Zone,Domain and templates
cls.domain = get_domain(cls.apiclient)
cls.zone = get_zone(cls.apiclient)
cls.testdata["mode"] = cls.zone.networktype
- cls.template = get_template(cls.apiclient, cls.zone.id, cls.testdata["ostype"])
+ cls.template = get_template(
+ cls.apiclient,
+ cls.zone.id,
+ cls.testdata["ostype"])
cls.testdata["template"]["ostypeid"] = cls.template.ostypeid
if cls.template == FAILED:
- cls.fail("get_template() failed to return template with description %s" % cls.testdata["ostype"])
+ cls.fail(
+ "get_template() failed to return template with description \
+ %s" %
+ cls.testdata["ostype"])
cls._cleanup = []
try:
cls.account = Account.create(cls.apiclient,
@@ -98,61 +143,77 @@ class TestPathVolume(cloudstackTestCase):
domainid=cls.domain.id
)
cls._cleanup.append(cls.account)
- #createa two service offerings
- cls.service_offering_1 = ServiceOffering.create(cls.apiclient, cls.testdata["service_offerings"]["small"])
+ # createa two service offerings
+ cls.service_offering_1 = ServiceOffering.create(
+ cls.apiclient,
+ cls.testdata["service_offerings"]["small"])
cls._cleanup.append(cls.service_offering_1)
# Create Disk offerings
- cls.disk_offering_1 = DiskOffering.create(cls.apiclient, cls.testdata["disk_offering"])
+ cls.disk_offering_1 = DiskOffering.create(
+ cls.apiclient,
+ cls.testdata["disk_offering"])
cls._cleanup.append(cls.disk_offering_1)
- #check if zone wide storage is enable
+ # check if zone wide storage is enable
cls.list_storage = StoragePool.list(cls.apiclient,
scope="ZONE"
)
if cls.list_storage:
cls.zone_wide_storage = cls.list_storage[0]
- cls.debug("zone wide storage id is %s" % cls.zone_wide_storage.id)
+ cls.debug(
+ "zone wide storage id is %s" %
+ cls.zone_wide_storage.id)
cls.testdata["tags"] = "zp"
update1 = StoragePool.update(cls.apiclient,
id=cls.zone_wide_storage.id,
tags=cls.testdata["tags"]
)
- cls.debug("Storage %s pool tag%s" % (cls.zone_wide_storage.id, update1.tags))
+ cls.debug(
+ "Storage %s pool tag%s" %
+ (cls.zone_wide_storage.id, update1.tags))
cls.testdata["service_offerings"]["tags"] = "zp"
- cls.tagged_so = ServiceOffering.create(cls.apiclient, cls.testdata["service_offerings"])
+ cls.tagged_so = ServiceOffering.create(
+ cls.apiclient,
+ cls.testdata["service_offerings"])
cls.testdata["service_offerings"]["tags"] = " "
cls._cleanup.append(cls.tagged_so)
- #create tagged disk offerings
+ # create tagged disk offerings
cls.testdata["disk_offering"]["tags"] = "zp"
- cls.disk_offering_tagged = DiskOffering.create(cls.apiclient, cls.testdata["disk_offering"])
+ cls.disk_offering_tagged = DiskOffering.create(
+ cls.apiclient,
+ cls.testdata["disk_offering"])
cls._cleanup.append(cls.disk_offering_tagged)
else:
cls.debug("No zone wide storage found")
- #check if local storage is enable
+ # check if local storage is enable
if cls.zone.localstorageenabled:
cls.testdata["disk_offering"]["tags"] = " "
cls.testdata["service_offerings"]["storagetype"] = 'local'
- cls.service_offering_2 = ServiceOffering.create(cls.apiclient, cls.testdata["service_offerings"])
+ cls.service_offering_2 = ServiceOffering.create(
+ cls.apiclient,
+ cls.testdata["service_offerings"])
cls._cleanup.append(cls.service_offering_2)
- #craete a compute offering with local storage
+ # craete a compute offering with local storage
cls.testdata["disk_offering"]["storagetype"] = 'local'
- cls.disk_offering_local = DiskOffering.create(cls.apiclient, cls.testdata["disk_offering"])
+ cls.disk_offering_local = DiskOffering.create(
+ cls.apiclient,
+ cls.testdata["disk_offering"])
cls._cleanup.append(cls.disk_offering_local)
cls.testdata["disk_offering"]["storagetype"] = ' '
else:
cls.debug("No local storage found")
- cls.userapiclient = testClient.getUserApiClient(UserName=cls.account.name,
- DomainName=cls.account.domain
- )
- #Check if login is successful with new account
+ cls.userapiclient = testClient.getUserApiClient(
+ UserName=cls.account.name,
+ DomainName=cls.account.domain)
+ # Check if login is successful with new account
response = User.login(cls.userapiclient,
username=cls.account.name,
password=cls.testdata["account"]["password"]
)
assert response.sessionkey is not None
- #response should have non null value
+ # response should have non null value
except Exception as e:
- cls.tearDownClass()
- raise e
+ cls.tearDownClass()
+ raise e
return
@classmethod
@@ -162,13 +223,19 @@ class TestPathVolume(cloudstackTestCase):
except Exception as e:
raise Exception("Warning:Exception during cleanup: %s" % e)
- @attr(tags=["advanced", "advancedsg", "basic", ], required_hardware="false")
- def test_01_positive_test_1(self):
+ @attr(
+ tags=[
+ "advanced",
+ "advancedsg",
+ "basic",
+ ],
+ required_hardware="false")
+ def test_01_positive_path(self):
"""
positive test for volume life cycle
# 1. Deploy a vm [vm1] with shared storage and data disk
# 2. Deploy a vm [vm2]with shared storage without data disk
- # 3.
+ # 3. TBD
# 4. Create a new volume and attache to vm2
# 5. Detach data disk from vm1 and download it
# Variance(1-9)
@@ -179,50 +246,63 @@ class TestPathVolume(cloudstackTestCase):
# 10. Create new VM using the template created in step 9
# 11. Delete the template
# 12. Detach the disk from VM2 and re-attach the disk to VM1
- # 13.
- # 14.
+ # 13.TBD
+ # 14.TBD
# 15.Migrate volume(detached) and then attach to a vm and live-migrate
- # 16.Upload volume of size smaller than storage.max.volume.upload.size(leaving the negative case)
- # 17.NA
- # 18.
- # 19.NA
+ # 16.Upload volume of size smaller than
+ storage.max.volume.upload.size(leaving the negative case)
+ # 17.TBD
+ # 18.TBD
+ # 19.TBD
# 20.Detach data disks from VM2 and delete volume
"""
+
# 1. Deploy a vm [vm1] with shared storage and data disk
- self.virtual_machine_1 = VirtualMachine.create(self.userapiclient,
- self.testdata["small"],
- templateid=self.template.id,
- accountid=self.account.name,
- domainid=self.account.domainid,
- serviceofferingid=self.service_offering_1.id,
- zoneid=self.zone.id,
- diskofferingid=self.disk_offering_1.id,
- mode=self.testdata["mode"]
- )
+ self.virtual_machine_1 = VirtualMachine.create(
+ self.userapiclient,
+ self.testdata["small"],
+ templateid=self.template.id,
+ accountid=self.account.name,
+ domainid=self.account.domainid,
+ serviceofferingid=self.service_offering_1.id,
+ zoneid=self.zone.id,
+ diskofferingid=self.disk_offering_1.id,
+ mode=self.testdata["mode"])
verify_vm(self, self.virtual_machine_1.id)
# List data volume for vm1
list_volume = Volume.list(self.userapiclient,
virtualmachineid=self.virtual_machine_1.id,
type='DATADISK'
)
- self.assertEqual(validateList(list_volume)[0], PASS, "Check List volume response for vm id %s" % self.virtual_machine_1.id)
+ self.assertEqual(
+ validateList(list_volume)[0],
+ PASS,
+ "Check List volume response for vm id %s" %
+ self.virtual_machine_1.id)
list_data_volume_for_vm1 = list_volume[0]
- self.assertEqual(len(list_volume), 1, "There is no data disk attached to vm id:%s" % self.virtual_machine_1.id)
- self.assertEqual(list_data_volume_for_vm1.virtualmachineid, str(self.virtual_machine_1.id), "Check if volume state (attached) is reflected")
+ self.assertEqual(
+ len(list_volume),
+ 1,
+ "There is no data disk attached to vm id:%s" %
+ self.virtual_machine_1.id)
+ self.assertEqual(
+ list_data_volume_for_vm1.virtualmachineid, str(
+ self.virtual_machine_1.id),
+ "Check if volume state (attached) is reflected")
# 2. Deploy a vm [vm2]with shared storage without data disk
- self.virtual_machine_2 = VirtualMachine.create(self.userapiclient,
- self.testdata["small"],
- templateid=self.template.id,
- accountid=self.account.name,
- domainid=self.account.domainid,
- serviceofferingid=self.service_offering_1.id,
- zoneid=self.zone.id,
- mode=self.testdata["mode"]
- )
+ self.virtual_machine_2 = VirtualMachine.create(
+ self.userapiclient,
+ self.testdata["small"],
+ templateid=self.template.id,
+ accountid=self.account.name,
+ domainid=self.account.domainid,
+ serviceofferingid=self.service_offering_1.id,
+ zoneid=self.zone.id,
+ mode=self.testdata["mode"])
verify_vm(self, self.virtual_machine_2.id)
- #4. Create a new volume and attache to vm2
+ # 4. Create a new volume and attache to vm2
self.volume = Volume.create(self.userapiclient,
services=self.testdata["volume"],
diskofferingid=self.disk_offering_1.id,
@@ -232,152 +312,217 @@ class TestPathVolume(cloudstackTestCase):
list_data_volume = Volume.list(self.userapiclient,
id=self.volume.id
)
- self.assertEqual(validateList(list_data_volume)[0], PASS, "Check List volume response for volume %s" % self.volume.id)
- self.assertEqual(list_data_volume[0].id, self.volume.id, "check list volume response for volume id: %s" % self.volume.id)
- self.debug("volume id %s got created successfully" % list_data_volume[0].id)
+ self.assertEqual(
+ validateList(list_data_volume)[0],
+ PASS,
+ "Check List volume response for volume %s" %
+ self.volume.id)
+ self.assertEqual(
+ list_data_volume[0].id,
+ self.volume.id,
+ "check list volume response for volume id: %s" %
+ self.volume.id)
+ self.debug(
+ "volume id %s got created successfully" %
+ list_data_volume[0].id)
# Attach volume to vm2
self.virtual_machine_2.attach_volume(self.userapiclient,
self.volume
)
verify_attach_volume(self, self.virtual_machine_2.id, self.volume.id)
- #Variance
+ # Variance
if self.zone.localstorageenabled:
# V1.Create vm3 with local storage offering
- self.virtual_machine_local_3=VirtualMachine.create(self.userapiclient,
- self.testdata["small"],
- templateid=self.template.id,
- accountid=self.account.name,
- domainid=self.account.domainid,
- serviceofferingid=self.service_offering_2.id,
- zoneid=self.zone.id,
- mode=self.testdata["mode"]
- )
+ self.virtual_machine_local_3 = VirtualMachine.create(
+ self.userapiclient,
+ self.testdata["small"],
+ templateid=self.template.id,
+ accountid=self.account.name,
+ domainid=self.account.domainid,
+ serviceofferingid=self.service_offering_2.id,
+ zoneid=self.zone.id,
+ mode=self.testdata["mode"])
verify_vm(self, self.virtual_machine_local_3.id)
# V2.create two data disk on local storage
self.local_volumes = []
for i in range(2):
- local_volume = Volume.create(self.userapiclient,
- services=self.testdata["volume"],
- diskofferingid=self.disk_offering_local.id,
- zoneid=self.zone.id
- )
+ local_volume = Volume.create(
+ self.userapiclient,
+ services=self.testdata["volume"],
+ diskofferingid=self.disk_offering_local.id,
+ zoneid=self.zone.id)
- list_local_data_volume = Volume.list(self.userapiclient,
- id=local_volume.id
- )
- self.assertEqual(validateList(list_local_data_volume)[0], PASS, "Check List volume response for volume %s" % local_volume.id)
- self.assertEqual(list_local_data_volume[0].id, local_volume.id, "check list volume response for volume id: %s" % local_volume.id)
- self.debug("volume id %s got created successfully" % list_local_data_volume[0].id)
- self.local_volumes.append(local_volume)
+ list_local_data_volume = Volume.list(self.userapiclient,
+ id=local_volume.id
+ )
+ self.assertEqual(
+ validateList(list_local_data_volume)[0],
+ PASS,
+ "Check List volume response for volume %s" %
+ local_volume.id)
+ self.assertEqual(
+ list_local_data_volume[0].id,
+ local_volume.id,
+ "check list volume response for volume id: %s" %
+ local_volume.id)
+ self.debug(
+ "volume id %s got created successfully" %
+ list_local_data_volume[0].id)
+ self.local_volumes.append(local_volume)
# V3.Attach local disk to vm1
self.virtual_machine_1.attach_volume(self.userapiclient,
self.local_volumes[0]
)
- verify_attach_volume(self, self.virtual_machine_1.id, self.local_volumes[0].id)
+ verify_attach_volume(
+ self,
+ self.virtual_machine_1.id,
+ self.local_volumes[0].id)
if self.list_storage:
# V4.create vm4 with zone wide storage
- self.virtual_machine_zone_4 = VirtualMachine.create(self.userapiclient,
- self.testdata["small"],
- templateid=self.template.id,
- accountid=self.account.name,
- domainid=self.account.domainid,
- serviceofferingid=self.tagged_so.id,
- zoneid=self.zone.id,
- mode=self.testdata["mode"]
- )
+ self.virtual_machine_zone_4 = VirtualMachine.create(
+ self.userapiclient,
+ self.testdata["small"],
+ templateid=self.template.id,
+ accountid=self.account.name,
+ domainid=self.account.domainid,
+ serviceofferingid=self.tagged_so.id,
+ zoneid=self.zone.id,
+ mode=self.testdata["mode"])
verify_vm(self, self.virtual_machine_zone_4.id)
# V5.Create two data disk on zone wide storage
self.zone_volumes = []
for i in range(2):
- zone_volume = Volume.create(self.userapiclient,
- services=self.testdata["volume"],
- diskofferingid=self.disk_offering_tagged.id,
- zoneid=self.zone.id
- )
+ zone_volume = Volume.create(
+ self.userapiclient,
+ services=self.testdata["volume"],
+ diskofferingid=self.disk_offering_tagged.id,
+ zoneid=self.zone.id)
- list_zone_data_volume = Volume.list(self.userapiclient,
- id=zone_volume.id
- )
- self.assertEqual(validateList(list_zone_data_volume)[0], PASS, "Check List volume response for volume %s" % zone_volume.id)
- self.assertEqual(list_zone_data_volume[0].id, zone_volume.id, "check list volume response for volume id: %s" % zone_volume.id)
- self.debug("volume id:%s got created successfully" % list_zone_data_volume[0].id)
- self.zone_volumes.append(zone_volume)
+ list_zone_data_volume = Volume.list(self.userapiclient,
+ id=zone_volume.id
+ )
+ self.assertEqual(
+ validateList(list_zone_data_volume)[0],
+ PASS,
+ "Check List volume response for volume %s" %
+ zone_volume.id)
+ self.assertEqual(
+ list_zone_data_volume[0].id,
+ zone_volume.id,
+ "check list volume response for volume id: %s" %
+ zone_volume.id)
+ self.debug(
+ "volume id:%s got created successfully" %
+ list_zone_data_volume[0].id)
+ self.zone_volumes.append(zone_volume)
# V6.Attach data disk running on ZWPS to VM1 (root disk on shared)
self.virtual_machine_1.attach_volume(self.userapiclient,
self.zone_volumes[0]
)
- verify_attach_volume(self, self.virtual_machine_1.id, self.zone_volumes[0].id)
- # V7. Create a cluster wide volume and attach to vm running on zone wide storage
- self.cluster_volume = Volume.create(self.userapiclient,
- services=self.testdata["volume"],
- diskofferingid=self.disk_offering_1.id,
- zoneid=self.zone.id
- )
+ verify_attach_volume(
+ self,
+ self.virtual_machine_1.id,
+ self.zone_volumes[0].id)
+ # V7. Create a cluster wide volume and attach to vm running on zone
+ # wide storage
+ self.cluster_volume = Volume.create(
+ self.userapiclient,
+ services=self.testdata["volume"],
+ diskofferingid=self.disk_offering_1.id,
+ zoneid=self.zone.id)
list_cluster_volume = Volume.list(self.userapiclient,
id=self.cluster_volume.id
)
- self.assertEqual(validateList(list_cluster_volume)[0], PASS, "Check List volume response for volume %s" % self.cluster_volume.id)
- self.assertEqual(list_cluster_volume[0].id, str(self.cluster_volume.id), "volume does not exist %s" % self.cluster_volume.id)
- self.debug("volume id %s got created successfuly" % list_cluster_volume[0].id)
+ self.assertEqual(
+ validateList(list_cluster_volume)[0],
+ PASS,
+ "Check List volume response for volume %s" %
+ self.cluster_volume.id)
+ self.assertEqual(
+ list_cluster_volume[0].id, str(
+ self.cluster_volume.id), "volume does not exist %s" %
+ self.cluster_volume.id)
+ self.debug(
+ "volume id %s got created successfuly" %
+ list_cluster_volume[0].id)
self.virtual_machine_zone_4.attach_volume(self.userapiclient,
self.cluster_volume
)
- verify_attach_volume(self, self.virtual_machine_zone_4.id, self.cluster_volume.id)
+ verify_attach_volume(
+ self,
+ self.virtual_machine_zone_4.id,
+ self.cluster_volume.id)
if self.list_storage and self.zone.localstorageenabled:
- #V8.Attach zone wide volume to vm running on local storage
+ # V8.Attach zone wide volume to vm running on local storage
self.virtual_machine_local_3.attach_volume(self.userapiclient,
self.zone_volumes[1]
)
- verify_attach_volume(self, self.virtual_machine_local_3.id, self.zone_volumes[1].id)
+ verify_attach_volume(
+ self,
+ self.virtual_machine_local_3.id,
+ self.zone_volumes[1].id)
# V9.Attach local volume to a vm running on zone wide storage
self.virtual_machine_zone_4.attach_volume(self.userapiclient,
self.local_volumes[1]
)
- verify_attach_volume(self, self.virtual_machine_zone_4.id, self.local_volumes[1].id)
+ verify_attach_volume(
+ self,
+ self.virtual_machine_zone_4.id,
+ self.local_volumes[1].id)
# 5. Detach data disk from vm1 and download it
self.virtual_machine_1.detach_volume(self.userapiclient,
volume=list_data_volume_for_vm1
)
- verify_detach_volume(self, self.virtual_machine_1.id, list_data_volume_for_vm1.id)
+ verify_detach_volume(
+ self,
+ self.virtual_machine_1.id,
+ list_data_volume_for_vm1.id)
# download detached volume
- self.extract_volume = Volume.extract(self.userapiclient,
- volume_id=list_data_volume_for_vm1.id,
- zoneid=self.zone.id,
- mode='HTTP_DOWNLOAD'
- )
+ self.extract_volume = Volume.extract(
+ self.userapiclient,
+ volume_id=list_data_volume_for_vm1.id,
+ zoneid=self.zone.id,
+ mode='HTTP_DOWNLOAD')
self.debug("extracted url is%s :" % self.extract_volume.url)
try:
formatted_url = urllib.unquote_plus(self.extract_volume.url)
- self.debug("Attempting to download volume at url %s" % formatted_url)
+ self.debug(
+ "Attempting to download volume at url %s" %
+ formatted_url)
response = urllib.urlopen(formatted_url)
self.debug("response from volume url %s" % response.getcode())
fd, path = tempfile.mkstemp()
- self.debug("Saving volume %s to path %s" % (list_data_volume_for_vm1.id, path))
+ self.debug(
+ "Saving volume %s to path %s" %
+ (list_data_volume_for_vm1.id, path))
os.close(fd)
with open(path, 'wb') as fd:
fd.write(response.read())
self.debug("Saved volume successfully")
except Exception:
- self.fail("Extract Volume Failed with invalid URL %s (vol id: %s)" % (self.extract_volume, list_data_volume_for_vm1.id))
- #Need to get format for downloaded volume ,for now using default format VHD
+ self.fail(
+ "Extract Volume Failed with invalid URL %s (vol id: %s)" %
+ (self.extract_volume, list_data_volume_for_vm1.id))
+ # checking format of downloaded volume and assigning to
+ # testdata["volume_upload"]
if "OVA" in self.extract_volume.url.upper():
self.testdata["upload_volume"]["format"] = "OVA"
if "QCOW2" in self.extract_volume.url.upper():
self.testdata["upload_volume"]["format"] = "QCOW2"
# 6. Upload volume by providing url of downloaded volume in step 5
- self.upload_response = Volume.upload(self.userapiclient,
- zoneid=self.zone.id,
- url=self.extract_volume.url,
- services=self.testdata["upload_volume"]
- )
+ self.upload_response = Volume.upload(
+ self.userapiclient,
+ zoneid=self.zone.id,
+ url=self.extract_volume.url,
+ services=self.testdata["upload_volume"])
self.upload_response.wait_for_upload(self.userapiclient
)
self.debug("uploaded volume id is %s" % self.upload_response.id)
@@ -385,122 +530,201 @@ class TestPathVolume(cloudstackTestCase):
self.virtual_machine_2.attach_volume(self.userapiclient,
volume=self.upload_response
)
- verify_attach_volume(self, self.virtual_machine_2.id, self.upload_response.id)
+ verify_attach_volume(
+ self,
+ self.virtual_machine_2.id,
+ self.upload_response.id)
# 8. Try to delete an attached volume
try:
self.volume.delete(self.userapiclient
)
- self.fail("Volume got deleted in attached state %s " % self.volume.id)
+ self.fail(
+ "Volume got deleted in attached state %s " %
+ self.volume.id)
except Exception as e:
self.debug("Attached volume deletion failed because %s" % e)
- #9. Create template from root volume of VM1(stop VM->create template -> start vm)
+ # 9. Create template from root volume of VM1(stop VM->create template
+ # -> start vm)
self.virtual_machine_1.stop(self.userapiclient
)
- self.list_root_disk_for_vm1 = Volume.list(self.userapiclient,
- virtualmachineid=self.virtual_machine_1.id,
- type='ROOT'
- )
- self.assertEqual(validateList(self.list_root_disk_for_vm1)[0], PASS, "Check List volume response for vm %s" % self.virtual_machine_1.id)
- self.assertEqual(len(self.list_root_disk_for_vm1), 1, "list root disk for vm1 is empty : %s" % self.virtual_machine_1.id)
- self.template_from_vm1_root_disk = Template.create(self.userapiclient,
- self.testdata["template"],
- self.list_root_disk_for_vm1[0].id,
- account=self.account.name,
- domainid=self.account.domainid
- )
- list_template = Template.list(self.userapiclient,
- templatefilter=self.testdata["templatefilter"],
- id=self.template_from_vm1_root_disk.id
- )
- self.assertEqual(validateList(list_template)[0], PASS, "Check List template response for template id %s" % self.template_from_vm1_root_disk.id)
- self.assertEqual(len(list_template), 1, "list template response is empty for template id : %s" % list_template[0].id)
- self.assertEqual(list_template[0].id, self.template_from_vm1_root_disk.id, "list template id is not same as created template")
- self.debug("Template id:%s got created successfully" % self.template_from_vm1_root_disk.id)
+ self.list_root_disk_for_vm1 = Volume.list(
+ self.userapiclient,
+ virtualmachineid=self.virtual_machine_1.id,
+ type='ROOT')
+ self.assertEqual(
+ validateList(
+ self.list_root_disk_for_vm1)[0],
+ PASS,
+ "Check List volume response for vm %s" %
+ self.virtual_machine_1.id)
+ self.assertEqual(
+ len(
+ self.list_root_disk_for_vm1),
+ 1,
+ "list root disk for vm1 is empty : %s" %
+ self.virtual_machine_1.id)
+ self.template_from_vm1_root_disk = Template.create(
+ self.userapiclient,
+ self.testdata["template"],
+ self.list_root_disk_for_vm1[0].id,
+ account=self.account.name,
+ domainid=self.account.domainid)
+ list_template = Template.list(
+ self.userapiclient,
+ templatefilter=self.testdata["templatefilter"],
+ id=self.template_from_vm1_root_disk.id)
+ self.assertEqual(
+ validateList(list_template)[0],
+ PASS,
+ "Check List template response for template id %s" %
+ self.template_from_vm1_root_disk.id)
+ self.assertEqual(
+ len(list_template),
+ 1,
+ "list template response is empty for template id : %s" %
+ list_template[0].id)
+ self.assertEqual(
+ list_template[0].id,
+ self.template_from_vm1_root_disk.id,
+ "list template id is not same as created template")
+ self.debug(
+ "Template id:%s got created successfully" %
+ self.template_from_vm1_root_disk.id)
self.virtual_machine_1.start(self.userapiclient
)
# 10. Deploy a vm using template ,created from vm1's root disk
- self.virtual_machine_3 = VirtualMachine.create(self.userapiclient,
- self.testdata["small"],
- templateid=self.template_from_vm1_root_disk.id,
- accountid=self.account.name,
- domainid=self.account.domainid,
- serviceofferingid=self.service_offering_1.id,
- zoneid=self.zone.id,
- mode=self.testdata["mode"]
- )
+ self.virtual_machine_3 = VirtualMachine.create(
+ self.userapiclient,
+ self.testdata["small"],
+ templateid=self.template_from_vm1_root_disk.id,
+ accountid=self.account.name,
+ domainid=self.account.domainid,
+ serviceofferingid=self.service_offering_1.id,
+ zoneid=self.zone.id,
+ mode=self.testdata["mode"])
verify_vm(self, self.virtual_machine_3.id)
# 11.delete the template created from root disk of vm1
try:
self.template_from_vm1_root_disk.delete(self.userapiclient
)
- self.debug("Template id: %s got deleted successfuly" % self.template_from_vm1_root_disk.id)
+ self.debug(
+ "Template id: %s got deleted successfuly" %
+ self.template_from_vm1_root_disk.id)
except Exception as e:
raise Exception("Template deletion failed with error %s" % e)
- list_template = Template.list(self.userapiclient,
- templatefilter=self.testdata["templatefilter"],
- id=self.template_from_vm1_root_disk.id
- )
- self.assertEqual(list_template, None, "Template is not deleted, id %s:" % self.template_from_vm1_root_disk.id)
- self.debug("Template id%s got deleted successfully" % self.template_from_vm1_root_disk.id)
+ list_template = Template.list(
+ self.userapiclient,
+ templatefilter=self.testdata["templatefilter"],
+ id=self.template_from_vm1_root_disk.id)
+ self.assertEqual(
+ list_template,
+ None,
+ "Template is not deleted, id %s:" %
+ self.template_from_vm1_root_disk.id)
+ self.debug(
+ "Template id%s got deleted successfully" %
+ self.template_from_vm1_root_disk.id)
# List vm and check the state of vm
verify_vm(self, self.virtual_machine_3.id)
- #12.Detach the disk from VM2 and re-attach the disk to VM1
+ # 12.Detach the disk from VM2 and re-attach the disk to VM1
self.virtual_machine_2.detach_volume(self.userapiclient,
volume=self.upload_response
)
- verify_detach_volume(self, self.virtual_machine_2.id, self.upload_response.id)
+ verify_detach_volume(
+ self,
+ self.virtual_machine_2.id,
+ self.upload_response.id)
self.virtual_machine_1.attach_volume(self.userapiclient,
volume=self.upload_response
)
- verify_attach_volume(self, self.virtual_machine_1.id, self.upload_response.id)
+ verify_attach_volume(
+ self,
+ self.virtual_machine_1.id,
+ self.upload_response.id)
# 15.Migrate volume(detached) and then attach to a vm and live-migrate
- self.migrate_volume = Volume.create(self.userapiclient,
- services=self.testdata["volume"],
- diskofferingid=self.disk_offering_1.id,
- zoneid=self.zone.id
- )
+ self.migrate_volume = Volume.create(
+ self.userapiclient,
+ services=self.testdata["volume"],
+ diskofferingid=self.disk_offering_1.id,
+ zoneid=self.zone.id)
list_volume = Volume.list(self.apiclient,
id=self.migrate_volume.id
)
- self.assertEqual(validateList(list_volume)[0], PASS, "Check List volume response for volume %s" % self.migrate_volume.id)
- self.assertEqual(list_volume[0].id, str(self.migrate_volume.id), "volume does not exist %s" % self.migrate_volume.id)
+ self.assertEqual(
+ validateList(list_volume)[0],
+ PASS,
+ "Check List volume response for volume %s" %
+ self.migrate_volume.id)
+ self.assertEqual(
+ list_volume[0].id, str(
+ self.migrate_volume.id), "volume does not exist %s" %
+ self.migrate_volume.id)
self.debug("volume id %s got created successfuly" % list_volume[0].id)
self.virtual_machine_1.attach_volume(self.userapiclient,
self.migrate_volume
)
- verify_attach_volume(self, self.virtual_machine_1.id, self.migrate_volume.id)
+ verify_attach_volume(
+ self,
+ self.virtual_machine_1.id,
+ self.migrate_volume.id)
self.virtual_machine_1.detach_volume(self.userapiclient,
volume=self.migrate_volume
)
- verify_detach_volume(self, self.virtual_machine_1.id, self.migrate_volume.id)
+ verify_detach_volume(
+ self,
+ self.virtual_machine_1.id,
+ self.migrate_volume.id)
list_volume = Volume.list(self.apiclient,
id=self.migrate_volume.id
)
- self.assertEqual(validateList(list_volume)[0], PASS, "Check List volume response for volume %s" % self.migrate_volume.id)
- self.assertEqual(list_volume[0].id, str(self.migrate_volume.id), "volume does not exist %s" % self.migrate_volume.id)
+ self.assertEqual(
+ validateList(list_volume)[0],
+ PASS,
+ "Check List volume response for volume %s" %
+ self.migrate_volume.id)
+ self.assertEqual(
+ list_volume[0].id, str(
+ self.migrate_volume.id), "volume does not exist %s" %
+ self.migrate_volume.id)
self.debug("volume id %s got created successfuly" % list_volume[0].id)
list_pool = StoragePool.list(self.apiclient,
id=list_volume[0].storageid
)
- self.assertEqual(validateList(list_pool)[0], PASS, "Check List pool response for storage id %s" % list_volume[0].storageid)
- self.assertGreater(len(list_pool), 0, "Check the list list storagepoolresponse for vm id: %s" % list_volume[0].storageid)
+ self.assertEqual(
+ validateList(list_pool)[0],
+ PASS,
+ "Check List pool response for storage id %s" %
+ list_volume[0].storageid)
+ self.assertGreater(
+ len(list_pool),
+ 0,
+ "Check the list list storagepoolresponse for vm id: %s" %
+ list_volume[0].storageid)
list_pools = StoragePool.list(self.apiclient,
scope=list_pool[0].scope
)
- self.assertEqual(validateList(list_pools)[0], PASS, "Check List pool response for scope %s" % list_pool[0].scope)
- self.assertGreater(len(list_pools), 0, "Check the list vm response for scope :%s" % list_volume[0].scope)
+ self.assertEqual(
+ validateList(list_pools)[0],
+ PASS,
+ "Check List pool response for scope %s" %
+ list_pool[0].scope)
+ self.assertGreater(
+ len(list_pools),
+ 0,
+ "Check the list vm response for scope :%s" %
+ list_volume[0].scope)
storagepoolid = None
for i in range(len(list_pools)):
if list_volume[0].storageid != list_pools[i].id:
@@ -521,29 +745,43 @@ class TestPathVolume(cloudstackTestCase):
self.virtual_machine_2.attach_volume(self.userapiclient,
self.migrate_volume
)
- verify_attach_volume(self, self.virtual_machine_2.id, self.migrate_volume.id)
+ verify_attach_volume(
+ self,
+ self.virtual_machine_2.id,
+ self.migrate_volume.id)
- pool_for_migration = StoragePool.listForMigration(self.apiclient,
- id=self.migrate_volume.id
- )
- self.assertEqual(validateList(pool_for_migration)[0], PASS, "Check list pool For Migration response for volume %s" % self.migrate_volume.id)
- self.assertGreater(len(pool_for_migration), 0, "Check the listForMigration response for volume :%s" % self.migrate_volume.id)
+ pool_for_migration = StoragePool.listForMigration(
+ self.apiclient,
+ id=self.migrate_volume.id)
+ self.assertEqual(
+ validateList(pool_for_migration)[0],
+ PASS,
+ "Check list pool For Migration response for volume %s" %
+ self.migrate_volume.id)
+ self.assertGreater(
+ len(pool_for_migration),
+ 0,
+ "Check the listForMigration response for volume :%s" %
+ self.migrate_volume.id)
try:
- volume_migrate = Volume.migrate(self.apiclient,
- storageid=pool_for_migration[0].id,
- volumeid=self.migrate_volume.id,
- livemigrate=True
- )
+ volume_migrate = Volume.migrate(
+ self.apiclient,
+ storageid=pool_for_migration[0].id,
+ volumeid=self.migrate_volume.id,
+ livemigrate=True)
except Exception as e:
raise Exception("Volume migration failed with error %s" % e)
else:
try:
self.migrate_volume.delete(self.userapiclient
)
- self.debug("volume id:%s got deleted successfully " % self.migrate_volume.id)
+ self.debug(
+ "volume id:%s got deleted successfully " %
+ self.migrate_volume.id)
except Exception as e:
raise Exception("Volume deletion failed with error %s" % e)
- # 16.Upload volume of size smaller than storage.max.volume.upload.size(leaving the negative case)
+ # 16.Upload volume of size smaller than
+ # storage.max.volume.upload.size(leaving the negative case)
self.testdata["upload_volume"]["format"] = "VHD"
volume_upload = Volume.upload(self.userapiclient,
self.testdata["upload_volume"],
@@ -551,7 +789,9 @@ class TestPathVolume(cloudstackTestCase):
)
volume_upload.wait_for_upload(self.userapiclient
)
- self.debug("volume id :%s got uploaded successfully is " % volume_upload.id)
+ self.debug(
+ "volume id :%s got uploaded successfully is " %
+ volume_upload.id)
# 20.Detach data disk from vm 2 and delete the volume
self.virtual_machine_2.detach_volume(self.userapiclient,
@@ -562,6 +802,263 @@ class TestPathVolume(cloudstackTestCase):
try:
self.volume.delete(self.userapiclient
)
- self.debug("volume id:%s got deleted successfully " % self.volume.id)
+ self.debug("volume id:%s got deleted successfully " %
+ self.volume.id)
+ except Exception as e:
+ raise Exception("Volume deletion failed with error %s" % e)
+
+ @attr(
+ tags=[
+ "advanced",
+ "advancedsg",
+ "basic",
+ ],
+ required_hardware="false")
+ def test_02_negative_path(self):
+ """
+ negative test for volume life cycle
+ # 1. Deploy a vm [vm1] with shared storage and data disk
+ #v1. Create VM2 with local storage offering disk offerings
+ # 2.TBD
+ # 3. Detach the data disk from VM1 and Download the volume
+ # 4.TBD
+ # 5. Attach volume with deviceid = 0
+ # 6. Attach volume, specify a VM which is destroyed
+ # 7.TBD
+ # 8.TBD
+ # 9.TBD
+ # 10.TBD
+ # 11.Upload the volume from T3 by providing the URL of the downloaded
+ volume, but specify a wrong format (not supported by the
+ hypervisor)
+ # 12.Upload the same volume from T4 by providing a wrong URL
+ # 13.Upload volume, provide wrong checksum
+ # 14.Upload a volume when maximum limit for the account is reached
+ # 15.TBD
+ # 16.Upload volume with all correct parameters
+ (covered in positive test path)
+ # 17.TBD
+ # 18.TBD
+ # 19.Now attach the volume with all correct parameters
+ (covered in positive test path)
+ # 20.Destroy and expunge all VMs
+
+ """
+
+ # 1. Deploy a vm [vm1] with shared storage and data disk
+ self.virtual_machine_1 = VirtualMachine.create(
+ self.userapiclient,
+ self.testdata["small"],
+ templateid=self.template.id,
+ accountid=self.account.name,
+ domainid=self.account.domainid,
+ serviceofferingid=self.service_offering_1.id,
+ zoneid=self.zone.id,
+ diskofferingid=self.disk_offering_1.id,
+ mode=self.testdata["mode"])
+ verify_vm(self, self.virtual_machine_1.id)
+ # List data volume for vm1
+ list_volume = Volume.list(self.userapiclient,
+ virtualmachineid=self.virtual_machine_1.id,
+ type='DATADISK'
+ )
+ self.assertEqual(
+ validateList(list_volume)[0],
+ PASS,
+ "Check List volume response for vm id %s" %
+ self.virtual_machine_1.id)
+ list_data_volume_for_vm1 = list_volume[0]
+ self.assertEqual(
+ len(list_volume),
+ 1,
+ "There is no data disk attached to vm id:%s" %
+ self.virtual_machine_1.id)
+ self.assertEqual(
+ list_data_volume_for_vm1.virtualmachineid, str(
+ self.virtual_machine_1.id),
+ "Check if volume state (attached) is reflected")
+ # Variance
+ if self.zone.localstorageenabled:
+ # V1.Create vm3 with local storage offering
+ self.virtual_machine_local_2 = VirtualMachine.create(
+ self.userapiclient,
+ self.testdata["small"],
+ templateid=self.template.id,
+ accountid=self.account.name,
+ domainid=self.account.domainid,
+ serviceofferingid=self.service_offering_2.id,
+ zoneid=self.zone.id,
+ mode=self.testdata["mode"])
+ verify_vm(self, self.virtual_machine_local_2.id)
+
+ # 3. Detach the data disk from VM1 and Download the volume
+ self.virtual_machine_1.detach_volume(self.userapiclient,
+ volume=list_data_volume_for_vm1
+ )
+ verify_detach_volume(
+ self,
+ self.virtual_machine_1.id,
+ list_data_volume_for_vm1.id)
+ # download detached volume
+ self.extract_volume = Volume.extract(
+ self.userapiclient,
+ volume_id=list_data_volume_for_vm1.id,
+ zoneid=self.zone.id,
+ mode='HTTP_DOWNLOAD')
+
+ self.debug("extracted url is%s :" % self.extract_volume.url)
+ try:
+
+ formatted_url = urllib.unquote_plus(self.extract_volume.url)
+ self.debug(
+ "Attempting to download volume at url %s" %
+ formatted_url)
+ response = urllib.urlopen(formatted_url)
+ self.debug("response from volume url %s" % response.getcode())
+ fd, path = tempfile.mkstemp()
+ self.debug(
+ "Saving volume %s to path %s" %
+ (list_data_volume_for_vm1.id, path))
+ os.close(fd)
+ with open(path, 'wb') as fd:
+ fd.write(response.read())
+ self.debug("Saved volume successfully")
+ except Exception:
+ self.fail(
+ "Extract Volume Failed with invalid URL %s (vol id: %s)" %
+ (self.extract_volume, list_data_volume_for_vm1.id))
+
+ # 6. Attach volume, specify a VM which is destroyed
+ self.virtual_machine_2 = VirtualMachine.create(
+ self.userapiclient,
+ self.testdata["small"],
+ templateid=self.template.id,
+ accountid=self.account.name,
+ domainid=self.account.domainid,
+ serviceofferingid=self.service_offering_1.id,
+ zoneid=self.zone.id,
+ mode=self.testdata["mode"])
+ verify_vm(self, self.virtual_machine_2.id)
+ try:
+ self.virtual_machine_2.delete(self.apiclient)
+ except Exception as e:
+ raise Exception("Vm deletion failed with error %s" % e)
+ # Create a new volume
+ self.volume = Volume.create(self.userapiclient,
+ services=self.testdata["volume"],
+ diskofferingid=self.disk_offering_1.id,
+ zoneid=self.zone.id
+ )
+
+ list_data_volume = Volume.list(self.userapiclient,
+ id=self.volume.id
+ )
+ self.assertEqual(
+ validateList(list_data_volume)[0],
+ PASS,
+ "Check List volume response for volume %s" %
+ self.volume.id)
+ self.assertEqual(
+ list_data_volume[0].id,
+ self.volume.id,
+ "check list volume response for volume id: %s" %
+ self.volume.id)
+ self.debug(
+ "volume id %s got created successfully" %
+ list_data_volume[0].id)
+ # try Attach volume to vm2
+ try:
+ self.virtual_machine_2.attach_volume(self.userapiclient,
+ self.volume
+ )
+ self.fail("Volume got attached to a destroyed vm ")
+ except Exception:
+ self.debug("Volume cant not be attached to a destroyed vm ")
+
+ # 11.Upload the volume by providing the URL of the downloaded
+ # volume, but specify a wrong format (not supported by the hypervisor)
+ if "OVA" in self.extract_volume.url.upper():
+ self.testdata["upload_volume"]["format"] = "VHD"
+ else:
+ self.testdata["upload_volume"]["format"] = "OVA"
+ try:
+ self.upload_response = Volume.upload(
+ self.userapiclient,
+ zoneid=self.zone.id,
+ url=self.extract_volume.url,
+ services=self.testdata["upload_volume"])
+ self.fail("Volume got uploaded with invalid format")
+ except Exception as e:
+ self.debug("upload volume failed due %s" % e)
+ # 12. Upload the same volume from T4 by providing a wrong URL
+ self.testdata["upload_volume"]["format"] = "VHD"
+ if "OVA" in self.extract_volume.url.upper():
+ self.testdata["upload_volume"]["format"] = "OVA"
+ if "QCOW2" in self.extract_volume.url.upper():
+ self.testdata["upload_volume"]["format"] = "QCOW2"
+ u1 = self.extract_volume.url.split('.')
+ u1[-2] = "wrong"
+ wrong_url = ".".join(u1)
+ try:
+ self.upload_response = Volume.upload(
+ self.userapiclient,
+ zoneid=self.zone.id,
+ url=wrong_url,
+ services=self.testdata["upload_volume"])
+ self.upload_response.wait_for_upload(self.userapiclient
+ )
+ self.fail("volume got uploaded with wrong url")
+ except Exception as e:
+ self.debug("upload volume failed due to %s" % e)
+ # 13.Upload volume, provide wrong checksum
+ try:
+ self.upload_response = Volume.upload(
+ self.userapiclient,
+ zoneid=self.zone.id,
+ url=self.extract_volume.url,
+ services=self.testdata["upload_volume"],
+ checksome="123456")
+ self.upload_response.wait_for_upload(self.userapiclient
+ )
+ self.fail("volume got uploaded with wrong checksome")
+ except Exception as e:
+ self.debug("upload volume failed due to %s" % e)
+
+ # 14.Upload a volume when maximum limit for the account is reached
+ account_update = Resources.updateLimit(self.apiclient,
+ resourcetype=2,
+ account=self.account.name,
+ domainid=self.account.domainid,
+ max=1
+ )
+ list_resource = Resources.list(self.apiclient,
+ account=self.account.name,
+ domainid=self.account.domainid,
+ resourcetype=2
+ )
+ self.assertEqual(
+ validateList(list_resource)[0],
+ PASS,
+ "Check List resource response for volume %s" %
+ self.account.name)
+ self.assertEqual(
+ str(
+ list_resource[0].max),
+ '1',
+ "check list List resource response for account id: %s" %
+ self.account.name)
+ self.debug(
+ "Max resources got updated successfully for account %s" %
+ self.account.name)
+ try:
+ self.upload_response = Volume.upload(
+ self.userapiclient,
+ zoneid=self.zone.id,
+ url=self.extract_volume.url,
+ services=self.testdata["upload_volume"])
+ self.upload_response.wait_for_upload(self.userapiclient
+ )
+ self.fail("volume got uploaded after account reached max limit for\
+ volumes ")
except Exception as e:
- raise Exception("Volume deletion failed with error %s" % e)
\ No newline at end of file
+ self.debug("upload volume failed due to %s" % e)