You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by gi...@apache.org on 2014/05/19 14:45:02 UTC
[4/7] Revert "CLOUDSTACK-6282 - Divided test_escalations.py into
individual files based on functionality and added automed tests for Public IP
Addresses"
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d43d28ee/test/integration/component/test_escalations_isos.py
----------------------------------------------------------------------
diff --git a/test/integration/component/test_escalations_isos.py b/test/integration/component/test_escalations_isos.py
deleted file mode 100644
index 387a681..0000000
--- a/test/integration/component/test_escalations_isos.py
+++ /dev/null
@@ -1,783 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-#Import Local Modules
-from marvin.cloudstackTestCase import cloudstackTestCase
-from marvin.cloudstackAPI import (createVolume,
- createTemplate)
-from marvin.lib.base import (Volume,
- Iso,
- VirtualMachine,
- Template,
- Snapshot,
- SecurityGroup,
- Account,
- Zone,
- Network,
- NetworkOffering,
- DiskOffering,
- ServiceOffering,
- VmSnapshot,
- SnapshotPolicy,
- SSHKeyPair,
- Resources,
- Configurations,
- VpnCustomerGateway,
- Hypervisor,
- VpcOffering,
- VPC,
- NetworkACL)
-from marvin.lib.common import (get_zone,
- get_domain,
- get_template,
- list_os_types)
-from marvin.lib.utils import (validateList,
- cleanup_resources,
- random_gen)
-from marvin.codes import (PASS, FAIL, EMPTY_LIST)
-from nose.plugins.attrib import attr
-import time
-
-class TestIsos(cloudstackTestCase):
-
- @classmethod
- def setUpClass(cls):
- try:
- cls._cleanup = []
- cls.testClient = super(TestIsos, cls).getClsTestClient()
- cls.api_client = cls.testClient.getApiClient()
- cls.services = cls.testClient.getParsedTestDataConfig()
- # Get Domain, Zone, Template
- cls.domain = get_domain(cls.api_client)
- cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
- cls.template = get_template(
- cls.api_client,
- cls.zone.id,
- cls.services["ostype"]
- )
- cls.hypervisor = cls.testClient.getHypervisorInfo()
- cls.services['mode'] = cls.zone.networktype
- cls.account = Account.create(
- cls.api_client,
- cls.services["account"],
- domainid=cls.domain.id
- )
- # Getting authentication for user in newly created Account
- cls.user = cls.account.user[0]
- cls.userapiclient = cls.testClient.getUserApiClient(cls.user.username, cls.domain.name)
- cls._cleanup.append(cls.account)
- except Exception as e:
- cls.tearDownClass()
- raise Exception("Warning: Exception in setup : %s" % e)
- return
-
- def setUp(self):
-
- self.apiClient = self.testClient.getApiClient()
- self.cleanup = []
-
- def tearDown(self):
- #Clean up, terminate the created resources
- cleanup_resources(self.apiClient, self.cleanup)
- return
-
- @classmethod
- def tearDownClass(cls):
- try:
- cleanup_resources(cls.api_client, cls._cleanup)
- except Exception as e:
- raise Exception("Warning: Exception during cleanup : %s" % e)
-
- return
-
- def __verify_values(self, expected_vals, actual_vals):
- """
- @Desc: Function to verify expected and actual values
- @Steps:
- Step1: Initializing return flag to True
- Step1: Verifying length of expected and actual dictionaries is matching.
- If not matching returning false
- Step2: Listing all the keys from expected dictionary
- Step3: Looping through each key from step2 and verifying expected and actual dictionaries have same value
- If not making return flag to False
- Step4: returning the return flag after all the values are verified
- """
- return_flag = True
-
- if len(expected_vals) != len(actual_vals):
- return False
-
- keys = expected_vals.keys()
- for i in range(0, len(expected_vals)):
- exp_val = expected_vals[keys[i]]
- act_val = actual_vals[keys[i]]
- if exp_val == act_val:
- return_flag = return_flag and True
- else:
- return_flag = return_flag and False
- self.debug("expected Value: %s, is not matching with actual value: %s" % (
- exp_val,
- act_val
- ))
- return return_flag
-
- @attr(tags=["advanced", "basic", "provisioning"])
- def test_01_list_isos_pagination(self):
- """
- @Desc: Test to List ISO's pagination
- @steps:
- Step1: Listing all the ISO's for a user
- Step2: Verifying that no ISO's are listed
- Step3: Creating (page size + 1) number of ISO's
- Step4: Listing all the ISO's again for a user
- Step5: Verifying that list size is (page size + 1)
- Step6: Listing all the ISO's in page1
- Step7: Verifying that list size is (page size)
- Step8: Listing all the ISO's in page2
- Step9: Verifying that list size is 1
- Step10: Listing the ISO's by Id
- Step11: Verifying if the ISO is downloaded and ready.
- If yes the continuing
- If not waiting and checking for iso to be ready till timeout
- Step12: Deleting the ISO present in page 2
- Step13: Listing all the ISO's in page2
- Step14: Verifying that no ISO's are listed
- """
- # Listing all the ISO's for a User
- list_iso_before = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"]
- )
- # Verifying that no ISOs are listed
- self.assertIsNone(
- list_iso_before,
- "ISOs listed for newly created User"
- )
- self.services["iso"]["zoneid"] = self.zone.id
- # Creating pagesize + 1 number of ISO's
- for i in range(0, (self.services["pagesize"] + 1)):
- iso_created = Iso.create(
- self.userapiclient,
- self.services["iso"]
- )
- self.assertIsNotNone(
- iso_created,
- "ISO creation failed"
- )
- if(i < self.services["pagesize"]):
- self.cleanup.append(iso_created)
-
- # Listing all the ISO's for a User
- list_iso_after = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"]
- )
- status = validateList(list_iso_after)
- self.assertEquals(
- PASS,
- status[0],
- "ISO's creation failed"
- )
- # Verifying that list size is pagesize + 1
- self.assertEquals(
- self.services["pagesize"] + 1,
- len(list_iso_after),
- "Failed to create pagesize + 1 number of ISO's"
- )
- # Listing all the ISO's in page 1
- list_iso_page1 = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- page=1,
- pagesize=self.services["pagesize"]
- )
- status = validateList(list_iso_page1)
- self.assertEquals(
- PASS,
- status[0],
- "Failed to list ISO's in page 1"
- )
- # Verifying the list size to be equal to pagesize
- self.assertEquals(
- self.services["pagesize"],
- len(list_iso_page1),
- "Size of ISO's in page 1 is not matching"
- )
- # Listing all the Templates in page 2
- list_iso_page2 = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- page=2,
- pagesize=self.services["pagesize"]
- )
- status = validateList(list_iso_page2)
- self.assertEquals(
- PASS,
- status[0],
- "Failed to list ISo's in page 2"
- )
- # Verifying the list size to be equal to 1
- self.assertEquals(
- 1,
- len(list_iso_page2),
- "Size of ISO's in page 2 is not matching"
- )
- # Verifying the state of the ISO to be ready. If not waiting for state to become ready
- iso_ready = False
- count = 0
- while iso_ready is False:
- list_iso = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- id=iso_created.id
- )
- status = validateList(list_iso)
- self.assertEquals(
- PASS,
- status[0],
- "Failed to list ISO by Id"
- )
- if list_iso[0].isready is True:
- iso_ready = True
- elif (str(list_iso[0].status) == "Error"):
- self.fail("Created ISO is in Errored state")
- break
- elif count > 10:
- self.fail("Timed out before ISO came into ready state")
- break
- else:
- time.sleep(self.services["sleep"])
- count = count + 1
-
- # Deleting the ISO present in page 2
- Iso.delete(
- iso_created,
- self.userapiclient
- )
- # Listing all the ISO's in page 2 again
- list_iso_page2 = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- page=2,
- pagesize=self.services["pagesize"]
- )
- # Verifying that there are no ISO's listed
- self.assertIsNone(
- list_iso_page2,
- "ISO's not deleted from page 2"
- )
- del self.services["iso"]["zoneid"]
- return
-
- @attr(tags=["advanced", "basic", "provisioning"])
- def test_02_download_iso(self):
- """
- @Desc: Test to Download ISO
- @steps:
- Step1: Listing all the ISO's for a user
- Step2: Verifying that no ISO's are listed
- Step3: Creating an ISO
- Step4: Listing all the ISO's again for a user
- Step5: Verifying that list size is 1
- Step6: Verifying if the ISO is in ready state.
- If yes the continuing
- If not waiting and checking for template to be ready till timeout
- Step7: Downloading the ISO (Extract)
- Step8: Verifying the details of downloaded ISO
- """
- # Listing all the ISO's for a User
- list_iso_before = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"]
- )
- # Verifying that no ISOs are listed
- self.assertIsNone(
- list_iso_before,
- "ISOs listed for newly created User"
- )
- self.services["iso"]["zoneid"] = self.zone.id
- self.services["iso"]["isextractable"] = True
- # Creating an ISO's
- iso_created = Iso.create(
- self.userapiclient,
- self.services["iso"]
- )
- self.assertIsNotNone(
- iso_created,
- "ISO creation failed"
- )
- self.cleanup.append(iso_created)
- # Listing all the ISO's for a User
- list_iso_after = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"]
- )
- status = validateList(list_iso_after)
- self.assertEquals(
- PASS,
- status[0],
- "ISO's creation failed"
- )
- # Verifying that list size is 1
- self.assertEquals(
- 1,
- len(list_iso_after),
- "Failed to create an ISO's"
- )
- # Verifying the state of the ISO to be ready. If not waiting for state to become ready
- iso_ready = False
- count = 0
- while iso_ready is False:
- list_iso = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- id=iso_created.id
- )
- status = validateList(list_iso)
- self.assertEquals(
- PASS,
- status[0],
- "Failed to list ISO by Id"
- )
- if list_iso[0].isready is True:
- iso_ready = True
- elif (str(list_iso[0].status) == "Error"):
- self.fail("Created ISO is in Errored state")
- break
- elif count > 10:
- self.fail("Timed out before ISO came into ready state")
- break
- else:
- time.sleep(self.services["sleep"])
- count = count + 1
-
- # Downloading the ISO
- download_iso = Iso.extract(
- self.userapiclient,
- iso_created.id,
- mode="HTTP_DOWNLOAD",
- zoneid=self.zone.id
- )
- self.assertIsNotNone(
- download_iso,
- "Download ISO failed"
- )
- # Verifying the details of downloaded ISO
- self.assertEquals(
- "DOWNLOAD_URL_CREATED",
- download_iso.state,
- "Download URL not created for ISO"
- )
- self.assertIsNotNone(
- download_iso.url,
- "Download URL not created for ISO"
- )
- self.assertEquals(
- iso_created.id,
- download_iso.id,
- "Download ISO details are not same as ISO created"
- )
- del self.services["iso"]["zoneid"]
- del self.services["iso"]["isextractable"]
- return
-
- @attr(tags=["advanced", "basic", "provisioning"])
- def test_03_edit_iso_details(self):
- """
- @Desc: Test to Edit ISO name, displaytext, OSType
- @steps:
- Step1: Listing all the ISO's for a user
- Step2: Verifying that no ISO's are listed
- Step3: Creating an ISO
- Step4: Listing all the ISO's again for a user
- Step5: Verifying that list size is 1
- Step6: Verifying if the ISO is in ready state.
- If yes the continuing
- If not waiting and checking for template to be ready till timeout
- Step7: Editing the ISO's name, displaytext
- Step8: Verifying that ISO name and displaytext are edited
- Step9: Editing the ISO name, displaytext, ostypeid
- Step10: Verifying that ISO name, displaytext and ostypeid are edited
- """
- # Listing all the ISO's for a User
- list_iso_before = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"]
- )
- # Verifying that no ISOs are listed
- self.assertIsNone(
- list_iso_before,
- "ISOs listed for newly created User"
- )
- self.services["iso"]["zoneid"] = self.zone.id
- # Creating an ISO's
- iso_created = Iso.create(
- self.userapiclient,
- self.services["iso"]
- )
- self.assertIsNotNone(
- iso_created,
- "ISO creation failed"
- )
- self.cleanup.append(iso_created)
- # Listing all the ISO's for a User
- list_iso_after = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"]
- )
- status = validateList(list_iso_after)
- self.assertEquals(
- PASS,
- status[0],
- "ISO's creation failed"
- )
- # Verifying that list size is 1
- self.assertEquals(
- 1,
- len(list_iso_after),
- "Failed to create an ISO's"
- )
- # Verifying the state of the ISO to be ready. If not waiting for state to become ready
- iso_ready = False
- count = 0
- while iso_ready is False:
- list_iso = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- id=iso_created.id
- )
- status = validateList(list_iso)
- self.assertEquals(
- PASS,
- status[0],
- "Failed to list ISO by Id"
- )
- if list_iso[0].isready is True:
- iso_ready = True
- elif (str(list_iso[0].status) == "Error"):
- self.fail("Created ISO is in Errored state")
- break
- elif count > 10:
- self.fail("Timed out before ISO came into ready state")
- break
- else:
- time.sleep(self.services["sleep"])
- count = count + 1
-
- # Editing the ISO name, displaytext
- edited_iso = Iso.update(
- iso_created,
- self.userapiclient,
- name="NewISOName",
- displaytext="NewISODisplayText"
- )
- self.assertIsNotNone(
- edited_iso,
- "Editing ISO failed"
- )
- # Verifying the details of edited template
- expected_dict = {
- "id":iso_created.id,
- "name":"NewISOName",
- "displaytest":"NewISODisplayText",
- "account":iso_created.account,
- "domainid":iso_created.domainid,
- "isfeatured":iso_created.isfeatured,
- "ostypeid":iso_created.ostypeid,
- "ispublic":iso_created.ispublic,
- }
- actual_dict = {
- "id":edited_iso.id,
- "name":edited_iso.name,
- "displaytest":edited_iso.displaytext,
- "account":edited_iso.account,
- "domainid":edited_iso.domainid,
- "isfeatured":edited_iso.isfeatured,
- "ostypeid":edited_iso.ostypeid,
- "ispublic":edited_iso.ispublic,
- }
- edit_iso_status = self.__verify_values(
- expected_dict,
- actual_dict
- )
- self.assertEqual(
- True,
- edit_iso_status,
- "Edited ISO details are not as expected"
- )
- # Editing the ISO name, displaytext, ostypeid
- ostype_list = list_os_types(self.userapiclient)
- status = validateList(ostype_list)
- self.assertEquals(
- PASS,
- status[0],
- "Failed to list OS Types"
- )
- for i in range(0, len(ostype_list)):
- if ostype_list[i].id != iso_created.ostypeid:
- newostypeid = ostype_list[i].id
- break
-
- edited_iso = Iso.update(
- iso_created,
- self.userapiclient,
- name=iso_created.name,
- displaytext=iso_created.displaytext,
- ostypeid=newostypeid
- )
- self.assertIsNotNone(
- edited_iso,
- "Editing ISO failed"
- )
- # Verifying the details of edited template
- expected_dict = {
- "id":iso_created.id,
- "name":iso_created.name,
- "displaytest":iso_created.displaytext,
- "account":iso_created.account,
- "domainid":iso_created.domainid,
- "isfeatured":iso_created.isfeatured,
- "ostypeid":newostypeid,
- "ispublic":iso_created.ispublic,
- }
- actual_dict = {
- "id":edited_iso.id,
- "name":edited_iso.name,
- "displaytest":edited_iso.displaytext,
- "account":edited_iso.account,
- "domainid":edited_iso.domainid,
- "isfeatured":edited_iso.isfeatured,
- "ostypeid":edited_iso.ostypeid,
- "ispublic":edited_iso.ispublic,
- }
- edit_iso_status = self.__verify_values(
- expected_dict,
- actual_dict
- )
- self.assertEqual(
- True,
- edit_iso_status,
- "Edited ISO details are not as expected"
- )
- del self.services["iso"]["zoneid"]
- return
-
- @attr(tags=["advanced", "basic", "provisioning"])
- def test_04_copy_iso(self):
- """
- @Desc: Test to copy ISO from one zone to another
- @steps:
- Step1: Listing Zones available for a user
- Step2: Verifying if the zones listed are greater than 1.
- If Yes continuing.
- If not halting the test.
- Step3: Listing all the ISO's for a user in zone1
- Step4: Verifying that no ISO's are listed
- Step5: Listing all the ISO's for a user in zone2
- Step6: Verifying that no ISO's are listed
- Step7: Creating an ISO in zone 1
- Step8: Listing all the ISO's again for a user in zone1
- Step9: Verifying that list size is 1
- Step10: Listing all the ISO's for a user in zone2
- Step11: Verifying that no ISO's are listed
- Step12: Copying the ISO created in step7 from zone1 to zone2
- Step13: Listing all the ISO's for a user in zone2
- Step14: Verifying that list size is 1
- Step15: Listing all the ISO's for a user in zone1
- Step16: Verifying that list size is 1
- """
- # Listing Zones available for a user
- zones_list = Zone.list(
- self.userapiclient,
- available=True
- )
- status = validateList(zones_list)
- self.assertEquals(
- PASS,
- status[0],
- "Failed to list Zones"
- )
- if not len(zones_list) > 1:
- self.fail("Enough zones doesnot exists to copy iso")
- else:
- # Listing all the ISO's for a User in Zone 1
- list_isos_zone1 = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- zoneid=zones_list[0].id
- )
- # Verifying that no ISO's are listed
- self.assertIsNone(
- list_isos_zone1,
- "ISO's listed for newly created User in Zone1"
- )
- # Listing all the ISO's for a User in Zone 2
- list_isos_zone2 = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- zoneid=zones_list[1].id
- )
- # Verifying that no ISO's are listed
- self.assertIsNone(
- list_isos_zone2,
- "ISO's listed for newly created User in Zone2"
- )
- self.services["iso"]["zoneid"] = zones_list[0].id
- # Creating an ISO in Zone 1
- iso_created = Iso.create(
- self.userapiclient,
- self.services["iso"]
- )
- self.assertIsNotNone(
- iso_created,
- "ISO creation failed"
- )
- self.cleanup.append(iso_created)
- # Listing all the ISO's for a User in Zone 1
- list_isos_zone1 = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- zoneid=zones_list[0].id
- )
- status = validateList(list_isos_zone1)
- self.assertEquals(
- PASS,
- status[0],
- "ISO creation failed in Zone1"
- )
- # Verifying that list size is 1
- self.assertEquals(
- 1,
- len(list_isos_zone1),
- "Failed to create a Template"
- )
- # Listing all the ISO's for a User in Zone 2
- list_isos_zone2 = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- zoneid=zones_list[1].id
- )
- # Verifying that no ISO's are listed
- self.assertIsNone(
- list_isos_zone2,
- "ISO's listed for newly created User in Zone2"
- )
- # Verifying the state of the ISO to be ready. If not waiting for state to become ready
- iso_ready = False
- count = 0
- while iso_ready is False:
- list_iso = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- id=iso_created.id
- )
- status = validateList(list_iso)
- self.assertEquals(
- PASS,
- status[0],
- "Failed to list ISO by Id"
- )
- if list_iso[0].isready is True:
- iso_ready = True
- elif (str(list_iso[0].status) == "Error"):
- self.fail("Created ISO is in Errored state")
- break
- elif count > 10:
- self.fail("Timed out before ISO came into ready state")
- break
- else:
- time.sleep(self.services["sleep"])
- count = count + 1
-
- # Copying the ISO from Zone1 to Zone2
- copied_iso = Iso.copy(
- self.userapiclient,
- iso_created.id,
- sourcezoneid=iso_created.zoneid,
- destzoneid=zones_list[1].id
- )
- self.assertIsNotNone(
- copied_iso,
- "Copying ISO from Zone1 to Zone2 failed"
- )
- # Listing all the ISO's for a User in Zone 1
- list_isos_zone1 = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- zoneid=zones_list[0].id
- )
- status = validateList(list_isos_zone1)
- self.assertEquals(
- PASS,
- status[0],
- "ISO creation failed in Zone1"
- )
- # Verifying that list size is 1
- self.assertEquals(
- 1,
- len(list_isos_zone1),
- "Failed to create a Template"
- )
- # Listing all the ISO's for a User in Zone 2
- list_isos_zone2 = Iso.list(
- self.userapiclient,
- listall=self.services["listall"],
- isofilter=self.services["templatefilter"],
- zoneid=zones_list[1].id
- )
- status = validateList(list_isos_zone2)
- self.assertEquals(
- PASS,
- status[0],
- "ISO failed to copy into Zone2"
- )
- # Verifying that list size is 1
- self.assertEquals(
- 1,
- len(list_isos_zone2),
- "ISO failed to copy into Zone2"
- )
- self.assertNotEquals(
- "Connection refused",
- list_isos_zone2[0].status,
- "Failed to copy ISO"
- )
- self.assertEquals(
- True,
- list_isos_zone2[0].isready,
- "Failed to copy ISO"
- )
- del self.services["iso"]["zoneid"]
- return
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/d43d28ee/test/integration/component/test_escalations_securitygroups.py
----------------------------------------------------------------------
diff --git a/test/integration/component/test_escalations_securitygroups.py b/test/integration/component/test_escalations_securitygroups.py
deleted file mode 100644
index 8934088..0000000
--- a/test/integration/component/test_escalations_securitygroups.py
+++ /dev/null
@@ -1,588 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-#Import Local Modules
-from marvin.cloudstackTestCase import cloudstackTestCase
-from marvin.cloudstackAPI import (createVolume,
- createTemplate)
-from marvin.lib.base import (Volume,
- Iso,
- VirtualMachine,
- Template,
- Snapshot,
- SecurityGroup,
- Account,
- Zone,
- Network,
- NetworkOffering,
- DiskOffering,
- ServiceOffering,
- VmSnapshot,
- SnapshotPolicy,
- SSHKeyPair,
- Resources,
- Configurations,
- VpnCustomerGateway,
- Hypervisor,
- VpcOffering,
- VPC,
- NetworkACL)
-from marvin.lib.common import (get_zone,
- get_domain,
- get_template,
- list_os_types)
-from marvin.lib.utils import (validateList,
- cleanup_resources,
- random_gen)
-from marvin.codes import (PASS, FAIL, EMPTY_LIST)
-from nose.plugins.attrib import attr
-import time
-
-class TestSecurityGroups(cloudstackTestCase):
-
- @classmethod
- def setUpClass(cls):
- try:
- cls._cleanup = []
- cls.testClient = super(TestSecurityGroups, cls).getClsTestClient()
- cls.api_client = cls.testClient.getApiClient()
- cls.services = cls.testClient.getParsedTestDataConfig()
- # Get Domain, Zone, Template
- cls.domain = get_domain(cls.api_client)
- cls.zone = get_zone(cls.api_client, cls.testClient.getZoneForTests())
- cls.template = get_template(
- cls.api_client,
- cls.zone.id,
- cls.services["ostype"]
- )
- cls.services['mode'] = cls.zone.networktype
- cls.account = Account.create(
- cls.api_client,
- cls.services["account"],
- domainid=cls.domain.id
- )
- # Getting authentication for user in newly created Account
- cls.user = cls.account.user[0]
- cls.userapiclient = cls.testClient.getUserApiClient(cls.user.username, cls.domain.name)
- cls._cleanup.append(cls.account)
- except Exception as e:
- cls.tearDownClass()
- raise Exception("Warning: Exception in setup : %s" % e)
- return
-
- def setUp(self):
-
- self.apiClient = self.testClient.getApiClient()
- self.cleanup = []
-
- def tearDown(self):
- #Clean up, terminate the created resources
- cleanup_resources(self.apiClient, self.cleanup)
- return
-
- @classmethod
- def tearDownClass(cls):
- try:
- cleanup_resources(cls.api_client, cls._cleanup)
- except Exception as e:
- raise Exception("Warning: Exception during cleanup : %s" % e)
-
- return
-
- def __verify_values(self, expected_vals, actual_vals):
- """
- @Desc: Function to verify expected and actual values
- @Steps:
- Step1: Initializing return flag to True
- Step1: Verifying length of expected and actual dictionaries is matching.
- If not matching returning false
- Step2: Listing all the keys from expected dictionary
- Step3: Looping through each key from step2 and verifying expected and actual dictionaries have same value
- If not making return flag to False
- Step4: returning the return flag after all the values are verified
- """
- return_flag = True
-
- if len(expected_vals) != len(actual_vals):
- return False
-
- keys = expected_vals.keys()
- for i in range(0, len(expected_vals)):
- exp_val = expected_vals[keys[i]]
- act_val = actual_vals[keys[i]]
- if exp_val == act_val:
- return_flag = return_flag and True
- else:
- return_flag = return_flag and False
- self.debug("expected Value: %s, is not matching with actual value: %s" % (
- exp_val,
- act_val
- ))
- return return_flag
-
- @attr(tags=["basic", "provisioning"])
- def test_01_list_securitygroups_pagination(self):
- """
- @Desc: Test to List Security Groups pagination
- @steps:
- Step1: Listing all the Security Groups for a user
- Step2: Verifying that list size is 1
- Step3: Creating (page size) number of Security Groups
- Step4: Listing all the Security Groups again for a user
- Step5: Verifying that list size is (page size + 1)
- Step6: Listing all the Security Groups in page1
- Step7: Verifying that list size is (page size)
- Step8: Listing all the Security Groups in page2
- Step9: Verifying that list size is 1
- Step10: Deleting the Security Group present in page 2
- Step11: Listing all the Security Groups in page2
- Step12: Verifying that no security groups are listed
- """
- # Listing all the Security Groups for a User
- list_securitygroups_before = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"]
- )
- # Verifying that default security group is created
- status = validateList(list_securitygroups_before)
- self.assertEquals(
- PASS,
- status[0],
- "Default Security Groups creation failed"
- )
- # Verifying the size of the list is 1
- self.assertEquals(
- 1,
- len(list_securitygroups_before),
- "Count of Security Groups list is not matching"
- )
- # Creating pagesize number of security groups
- for i in range(0, (self.services["pagesize"])):
- securitygroup_created = SecurityGroup.create(
- self.userapiclient,
- self.services["security_group"],
- account=self.account.name,
- domainid=self.domain.id,
- description=self.services["security_group"]["name"]
- )
- self.assertIsNotNone(
- securitygroup_created,
- "Security Group creation failed"
- )
- if (i < self.services["pagesize"]):
- self.cleanup.append(securitygroup_created)
-
- # Listing all the security groups for user again
- list_securitygroups_after = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"]
- )
- status = validateList(list_securitygroups_after)
- self.assertEquals(
- PASS,
- status[0],
- "Security Groups creation failed"
- )
- # Verifying that list size is pagesize + 1
- self.assertEquals(
- self.services["pagesize"] + 1,
- len(list_securitygroups_after),
- "Failed to create pagesize + 1 number of Security Groups"
- )
- # Listing all the security groups in page 1
- list_securitygroups_page1 = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"],
- page=1,
- pagesize=self.services["pagesize"]
- )
- status = validateList(list_securitygroups_page1)
- self.assertEquals(
- PASS,
- status[0],
- "Failed to list security groups in page 1"
- )
- # Verifying the list size to be equal to pagesize
- self.assertEquals(
- self.services["pagesize"],
- len(list_securitygroups_page1),
- "Size of security groups in page 1 is not matching"
- )
- # Listing all the security groups in page 2
- list_securitygroups_page2 = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"],
- page=2,
- pagesize=self.services["pagesize"]
- )
- status = validateList(list_securitygroups_page2)
- self.assertEquals(
- PASS,
- status[0],
- "Failed to list security groups in page 2"
- )
- # Verifying the list size to be equal to pagesize
- self.assertEquals(
- 1,
- len(list_securitygroups_page2),
- "Size of security groups in page 2 is not matching"
- )
- # Deleting the security group present in page 2
- SecurityGroup.delete(
- securitygroup_created,
- self.userapiclient)
- # Listing all the security groups in page 2 again
- list_securitygroups_page2 = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"],
- page=2,
- pagesize=self.services["pagesize"]
- )
- # Verifying that there are no security groups listed
- self.assertIsNone(
- list_securitygroups_page2,
- "Security Groups not deleted from page 2"
- )
- return
-
- @attr(tags=["basic", "provisioning"])
- def test_02_securitygroups_authorize_revoke_ingress(self):
- """
- @Desc: Test to Authorize and Revoke Ingress for Security Group
- @steps:
- Step1: Listing all the Security Groups for a user
- Step2: Verifying that list size is 1
- Step3: Creating a Security Groups
- Step4: Listing all the Security Groups again for a user
- Step5: Verifying that list size is 2
- Step6: Authorizing Ingress for the security group created in step3
- Step7: Listing the security groups by passing id of security group created in step3
- Step8: Verifying that list size is 1
- Step9: Verifying that Ingress is authorized to the security group
- Step10: Verifying the details of the Ingress rule are as expected
- Step11: Revoking Ingress for the security group created in step3
- Step12: Listing the security groups by passing id of security group created in step3
- Step13: Verifying that list size is 1
- Step14: Verifying that Ingress is revoked from the security group
- """
- # Listing all the Security Groups for a User
- list_securitygroups_before = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"]
- )
- # Verifying that default security group is created
- status = validateList(list_securitygroups_before)
- self.assertEquals(
- PASS,
- status[0],
- "Default Security Groups creation failed"
- )
- # Verifying the size of the list is 1
- self.assertEquals(
- 1,
- len(list_securitygroups_before),
- "Count of Security Groups list is not matching"
- )
- # Creating a security group
- securitygroup_created = SecurityGroup.create(
- self.userapiclient,
- self.services["security_group"],
- account=self.account.name,
- domainid=self.domain.id,
- description=self.services["security_group"]["name"]
- )
- self.assertIsNotNone(
- securitygroup_created,
- "Security Group creation failed"
- )
- self.cleanup.append(securitygroup_created)
-
- # Listing all the security groups for user again
- list_securitygroups_after = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"]
- )
- status = validateList(list_securitygroups_after)
- self.assertEquals(
- PASS,
- status[0],
- "Security Groups creation failed"
- )
- # Verifying that list size is 2
- self.assertEquals(
- 2,
- len(list_securitygroups_after),
- "Failed to create Security Group"
- )
- # Authorizing Ingress for the security group created in step3
- securitygroup_created.authorize(
- self.userapiclient,
- self.services["ingress_rule"],
- self.account.name,
- self.domain.id,
- )
- # Listing the security group by Id
- list_securitygroups_byid = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"],
- id=securitygroup_created.id,
- domainid=self.domain.id
- )
- # Verifying that security group is listed
- status = validateList(list_securitygroups_byid)
- self.assertEquals(
- PASS,
- status[0],
- "Listing of Security Groups by id failed"
- )
- # Verifying size of the list is 1
- self.assertEquals(
- 1,
- len(list_securitygroups_byid),
- "Count of the listing security group by id is not matching"
- )
- securitygroup_ingress = list_securitygroups_byid[0].ingressrule
- # Validating the Ingress rule
- status = validateList(securitygroup_ingress)
- self.assertEquals(
- PASS,
- status[0],
- "Security Groups Ingress rule authorization failed"
- )
- self.assertEquals(
- 1,
- len(securitygroup_ingress),
- "Security Group Ingress rules count is not matching"
- )
- # Verifying the details of the Ingress rule are as expected
- #Creating expected and actual values dictionaries
- expected_dict = {
- "cidr":self.services["ingress_rule"]["cidrlist"],
- "protocol":self.services["ingress_rule"]["protocol"],
- "startport":self.services["ingress_rule"]["startport"],
- "endport":self.services["ingress_rule"]["endport"],
- }
- actual_dict = {
- "cidr":str(securitygroup_ingress[0].cidr),
- "protocol":str(securitygroup_ingress[0].protocol.upper()),
- "startport":str(securitygroup_ingress[0].startport),
- "endport":str(securitygroup_ingress[0].endport),
- }
- ingress_status = self.__verify_values(
- expected_dict,
- actual_dict
- )
- self.assertEqual(
- True,
- ingress_status,
- "Listed Security group Ingress rule details are not as expected"
- )
- # Revoking the Ingress rule from Security Group
- securitygroup_created.revoke(self.userapiclient, securitygroup_ingress[0].ruleid)
- # Listing the security group by Id
- list_securitygroups_byid = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"],
- id=securitygroup_created.id,
- domainid=self.domain.id
- )
- # Verifying that security group is listed
- status = validateList(list_securitygroups_byid)
- self.assertEquals(
- PASS,
- status[0],
- "Listing of Security Groups by id failed"
- )
- # Verifying size of the list is 1
- self.assertEquals(
- 1,
- len(list_securitygroups_byid),
- "Count of the listing security group by id is not matching"
- )
- securitygroup_ingress = list_securitygroups_byid[0].ingressrule
- # Verifying that Ingress rule is empty(revoked)
- status = validateList(securitygroup_ingress)
- self.assertEquals(
- EMPTY_LIST,
- status[2],
- "Security Groups Ingress rule is not revoked"
- )
- return
-
- @attr(tags=["basic", "provisioning"])
- def test_03_securitygroups_authorize_revoke_egress(self):
- """
- @Desc: Test to Authorize and Revoke Egress for Security Group
- @steps:
- Step1: Listing all the Security Groups for a user
- Step2: Verifying that list size is 1
- Step3: Creating a Security Groups
- Step4: Listing all the Security Groups again for a user
- Step5: Verifying that list size is 2
- Step6: Authorizing Egress for the security group created in step3
- Step7: Listing the security groups by passing id of security group created in step3
- Step8: Verifying that list size is 1
- Step9: Verifying that Egress is authorized to the security group
- Step10: Verifying the details of the Egress rule are as expected
- Step11: Revoking Egress for the security group created in step3
- Step12: Listing the security groups by passing id of security group created in step3
- Step13: Verifying that list size is 1
- Step14: Verifying that Egress is revoked from the security group
- """
- # Listing all the Security Groups for a User
- list_securitygroups_before = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"]
- )
- # Verifying that default security group is created
- status = validateList(list_securitygroups_before)
- self.assertEquals(
- PASS,
- status[0],
- "Default Security Groups creation failed"
- )
- # Verifying the size of the list is 1
- self.assertEquals(
- 1,
- len(list_securitygroups_before),
- "Count of Security Groups list is not matching"
- )
- # Creating a security group
- securitygroup_created = SecurityGroup.create(
- self.userapiclient,
- self.services["security_group"],
- account=self.account.name,
- domainid=self.domain.id,
- description=self.services["security_group"]["name"]
- )
- self.assertIsNotNone(
- securitygroup_created,
- "Security Group creation failed"
- )
- self.cleanup.append(securitygroup_created)
-
- # Listing all the security groups for user again
- list_securitygroups_after = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"]
- )
- status = validateList(list_securitygroups_after)
- self.assertEquals(
- PASS,
- status[0],
- "Security Groups creation failed"
- )
- # Verifying that list size is 2
- self.assertEquals(
- 2,
- len(list_securitygroups_after),
- "Failed to create Security Group"
- )
- # Authorizing Egress for the security group created in step3
- securitygroup_created.authorizeEgress(
- self.userapiclient,
- self.services["ingress_rule"],
- self.account.name,
- self.domain.id,
- )
- # Listing the security group by Id
- list_securitygroups_byid = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"],
- id=securitygroup_created.id,
- domainid=self.domain.id
- )
- # Verifying that security group is listed
- status = validateList(list_securitygroups_byid)
- self.assertEquals(
- PASS,
- status[0],
- "Listing of Security Groups by id failed"
- )
- # Verifying size of the list is 1
- self.assertEquals(
- 1,
- len(list_securitygroups_byid),
- "Count of the listing security group by id is not matching"
- )
- securitygroup_egress = list_securitygroups_byid[0].egressrule
- # Validating the Ingress rule
- status = validateList(securitygroup_egress)
- self.assertEquals(
- PASS,
- status[0],
- "Security Groups Egress rule authorization failed"
- )
- self.assertEquals(
- 1,
- len(securitygroup_egress),
- "Security Group Egress rules count is not matching"
- )
- # Verifying the details of the Egress rule are as expected
- #Creating expected and actual values dictionaries
- expected_dict = {
- "cidr":self.services["ingress_rule"]["cidrlist"],
- "protocol":self.services["ingress_rule"]["protocol"],
- "startport":self.services["ingress_rule"]["startport"],
- "endport":self.services["ingress_rule"]["endport"],
- }
- actual_dict = {
- "cidr":str(securitygroup_egress[0].cidr),
- "protocol":str(securitygroup_egress[0].protocol.upper()),
- "startport":str(securitygroup_egress[0].startport),
- "endport":str(securitygroup_egress[0].endport),
- }
- ingress_status = self.__verify_values(
- expected_dict,
- actual_dict
- )
- self.assertEqual(
- True,
- ingress_status,
- "Listed Security group Egress rule details are not as expected"
- )
- # Revoking the Egress rule from Security Group
- securitygroup_created.revokeEgress(self.userapiclient, securitygroup_egress[0].ruleid)
- # Listing the security group by Id
- list_securitygroups_byid = SecurityGroup.list(
- self.userapiclient,
- listall=self.services["listall"],
- id=securitygroup_created.id,
- domainid=self.domain.id
- )
- # Verifying that security group is listed
- status = validateList(list_securitygroups_byid)
- self.assertEquals(
- PASS,
- status[0],
- "Listing of Security Groups by id failed"
- )
- # Verifying size of the list is 1
- self.assertEquals(
- 1,
- len(list_securitygroups_byid),
- "Count of the listing security group by id is not matching"
- )
- securitygroup_egress = list_securitygroups_byid[0].egressrule
- # Verifying that Ingress rule is empty(revoked)
- status = validateList(securitygroup_egress)
- self.assertEquals(
- EMPTY_LIST,
- status[2],
- "Security Groups Egress rule is not revoked"
- )
- return
\ No newline at end of file