You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by gi...@apache.org on 2014/01/14 10:05:43 UTC
[1/3] CLOUDSTACK-2237: Automation-Adding next set of test cases for
feature - SG in advanced zone
Updated Branches:
refs/heads/master 16d36dd75 -> c108d7f0d
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c108d7f0/tools/marvin/marvin/config/config.cfg
----------------------------------------------------------------------
diff --git a/tools/marvin/marvin/config/config.cfg b/tools/marvin/marvin/config/config.cfg
index 5849fe8..7840b7c 100644
--- a/tools/marvin/marvin/config/config.cfg
+++ b/tools/marvin/marvin/config/config.cfg
@@ -43,8 +43,8 @@
"memory": 128
},
"isolated_network_offering": {
- "name": "Network offering-DA services",
- "displaytext": "Network offering-DA services",
+ "name": "Isolated Network offering",
+ "displaytext": "Isolated Network offering",
"guestiptype": "Isolated",
"supportedservices": "Dhcp,Dns,SourceNat,PortForwarding,Vpn,Firewall,Lb,UserData,StaticNat",
"traffictype": "GUEST",
@@ -75,8 +75,8 @@
"protocol": "TCP"
},
"shared_network": {
- "name": "MySharedNetwork - Test",
- "displaytext": "MySharedNetwork",
+ "name": "Test Shared Network",
+ "displaytext": "Test Shared Network",
"vlan" : "",
"gateway" :"",
"netmask" :"",
@@ -86,8 +86,8 @@
"scope":"all"
},
"shared_network_offering_sg": {
- "name": "MySharedOffering-sg",
- "displaytext": "MySharedOffering-sg",
+ "name": "SharedNwOffering-sg",
+ "displaytext": "SharedNwOffering-sg",
"guestiptype": "Shared",
"supportedservices": "Dhcp,Dns,UserData,SecurityGroup",
"specifyVlan" : "False",
@@ -103,7 +103,7 @@
"shared_network_sg": {
"name": "Shared-Network-SG-Test",
"displaytext": "Shared-Network_SG-Test",
- "networkofferingid":"1",
+ "networkofferingid":"",
"vlan" : "",
"gateway" :"",
"netmask" :"255.255.255.0",
@@ -123,8 +123,8 @@
"cidr": "10.0.0.1/24"
},
"shared_network_offering": {
- "name": "MySharedOffering",
- "displaytext": "MySharedOffering",
+ "name": "SharedNwOffering",
+ "displaytext": "SharedNwOffering",
"guestiptype": "Shared",
"supportedservices": "Dhcp,Dns,UserData",
"specifyVlan" : "False",
@@ -140,7 +140,7 @@
"ingress_rule": {
"protocol": "TCP",
"startport": "22",
- "endport": "22",
+ "endport": "80",
"cidrlist": "0.0.0.0/0"
},
"ostype": "CentOS 5.3 (64-bit)",
[3/3] git commit: updated refs/heads/master to c108d7f
Posted by gi...@apache.org.
CLOUDSTACK-2237: Automation-Adding next set of test cases for feature - SG in advanced zone
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/c108d7f0
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/c108d7f0
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/c108d7f0
Branch: refs/heads/master
Commit: c108d7f0d0e1e1e746dd186e3153c2d8387da7e2
Parents: 16d36dd
Author: Ashutosh K <as...@clogeny.com>
Authored: Tue Jan 14 14:34:21 2014 +0530
Committer: Girish Shilamkar <gi...@clogeny.com>
Committed: Tue Jan 14 14:34:21 2014 +0530
----------------------------------------------------------------------
.../component/test_advancedsg_networks.py | 1973 +++++++++++++++---
tools/marvin/marvin/config/config.cfg | 20 +-
2 files changed, 1677 insertions(+), 316 deletions(-)
----------------------------------------------------------------------
[2/3] CLOUDSTACK-2237: Automation-Adding next set of test cases for
feature - SG in advanced zone
Posted by gi...@apache.org.
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/c108d7f0/test/integration/component/test_advancedsg_networks.py
----------------------------------------------------------------------
diff --git a/test/integration/component/test_advancedsg_networks.py b/test/integration/component/test_advancedsg_networks.py
index 7f3a390..207659f 100644
--- a/test/integration/component/test_advancedsg_networks.py
+++ b/test/integration/component/test_advancedsg_networks.py
@@ -28,7 +28,8 @@ from marvin.integration.lib.base import (Zone,
Domain,
VpcOffering,
VPC,
- SecurityGroup)
+ SecurityGroup,
+ Host)
from marvin.integration.lib.common import (get_domain,
get_zone,
@@ -41,10 +42,12 @@ from marvin.integration.lib.utils import (cleanup_resources,
random_gen,
validateList)
from marvin.cloudstackAPI import (authorizeSecurityGroupIngress,
- revokeSecurityGroupIngress)
+ revokeSecurityGroupIngress,
+ deleteSecurityGroup)
from nose.plugins.attrib import attr
from marvin.codes import PASS
import time
+import sys
import random
class TestCreateZoneSG(cloudstackTestCase):
@@ -532,11 +535,11 @@ class TestNetworksInAdvancedSG(cloudstackTestCase):
# Steps,
# 1. create a Domain and subdomain
# 2. Create one shared Network in parent domain with SG and set subdomain access True
- # 3. Deploy a VM in subdomain using the shared network
+ # 3. Deploy a VM in subdomain using the shared network
# Validations,
# 1. shared network should be created successfully
- # 2. Shared network should be able to be accessed within subdomain (VM should be deployed)
+ # 2. Shared network should be able to be accessed within subdomain (VM should be deployed)
#Create Domain
self.debug("Creating parent domain")
@@ -852,6 +855,53 @@ class TestNetworksInAdvancedSG(cloudstackTestCase):
return
@attr(tags = ["advancedsg"])
+ def test_29_deleteSharedNwSG_ZoneWide_InUse(self):
+ """ Test delete Zone wide shared network with SG which is in use"""
+
+ # Steps,
+ # 1. create zone wide shared network
+ # 2. Deploy vm in the shared network
+ # 3. Try to delete the shared network while its still in use
+
+ # Validations,
+ # 1. shared network deletion should fail
+
+ physical_network, vlan = get_free_vlan(self.api_client, self.zone.id)
+
+ #create network using the shared network offering created
+ self.services["shared_network_sg"]["acltype"] = "domain"
+ self.services["shared_network_sg"]["vlan"] = vlan
+ self.services["shared_network_sg"]["networkofferingid"] = self.shared_network_offering_sg.id
+ self.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
+
+ self.setSharedNetworkParams("shared_network_sg")
+
+ self.debug("Creating shared network in zone: %s" % self.zone.id)
+ shared_network_sg = Network.create(self.api_client,self.services["shared_network_sg"],
+ networkofferingid=self.shared_network_offering_sg.id,
+ zoneid=self.zone.id)
+
+ self.debug("Created shared network: %s" % shared_network_sg.id)
+
+ self.debug("Deploying vm in the shared network: %s" % shared_network_sg.id)
+
+ vm = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ networkids=[shared_network_sg.id,],serviceofferingid=self.service_offering.id)
+ self.debug("Created vm %s" % vm.id)
+
+ self.debug("Trying to delete shared network: %s" % shared_network_sg.id)
+
+ try:
+ shared_network_sg.delete(self.api_client)
+ self.fail("Exception not raised while deleting network")
+ except Exception as e:
+ self.debug("Network deletion failed with exception: %s" % e)
+
+ self.cleanup_networks.append(shared_network_sg)
+ self.cleanup_vms.append(vm)
+ return
+
+ @attr(tags = ["advancedsg"])
def test_12_deleteSharedNwSGAccountSpecific_NotInUse(self):
""" Test delete Account specific shared network creation with SG which is not in use"""
@@ -938,6 +988,43 @@ class TestNetworksInAdvancedSG(cloudstackTestCase):
return
@attr(tags = ["advancedsg"])
+ def test_30_deleteSharedNwSG_ZoneWide_NotInUse(self):
+ """ Test delete zone wide shared network with SG which is not in use"""
+
+ # Steps,
+ # 1. create a zone wide shared network with SG
+ # 2. Try to delete the shared network
+
+ # Validations,
+ # 1. shared network deletion should succeed
+
+ physical_network, vlan = get_free_vlan(self.api_client, self.zone.id)
+
+ #create network using the shared network offering created
+ self.services["shared_network_sg"]["acltype"] = "domain"
+ self.services["shared_network_sg"]["vlan"] = vlan
+ self.services["shared_network_sg"]["networkofferingid"] = self.shared_network_offering_sg.id
+ self.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
+
+ self.setSharedNetworkParams("shared_network_sg")
+
+ self.debug("Creating shared network in zone: %s" % self.zone.id)
+ shared_network_sg = Network.create(self.api_client,self.services["shared_network_sg"],
+ networkofferingid=self.shared_network_offering_sg.id,
+ zoneid=self.zone.id)
+
+ self.debug("Created shared network: %s" % shared_network_sg.id)
+
+ self.debug("Trying to delete shared network: %s" % shared_network_sg.id)
+
+ try:
+ shared_network_sg.delete(self.api_client)
+ except Exception as e:
+ self.fail("Network deletion failed with exception: %s" % e)
+
+ return
+
+ @attr(tags = ["advancedsg"])
def test__14_createSharedNwWithSG_withoutParams(self):
""" Test create shared network with SG without specifying necessary parameters"""
@@ -1126,7 +1213,7 @@ class TestNetworksInAdvancedSG_VmOperations(cloudstackTestCase):
exceptions.append(e)
if len(exceptions) > 0:
- self.faill("There were exceptions during cleanup: %s" % exceptions)
+ self.fail("There were exceptions during cleanup: %s" % exceptions)
return
@@ -1145,6 +1232,29 @@ class TestNetworksInAdvancedSG_VmOperations(cloudstackTestCase):
return
+ def setVmState(self, vm, state):
+ """ set VM state and verify if it is reflected correctly
+ Currently takes 2 states - running and stopped"""
+
+ if state=="running":
+ vm.start(self.api_client)
+ elif state=="stopped":
+ vm.stop(self.api_client)
+ else:
+ self.fail("Invalid state passed")
+ retriesCount = 5
+ while True:
+ vm_list = list_virtual_machines(self.api_client, id=vm.id)
+ self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, vm list is %s" % vm_list)
+ if vm_list[0].state.lower() == state:
+ break
+ if retriesCount == 0:
+ self.fail("Failed to set VM state as %s" % state)
+ retriesCount -= 1
+ time.sleep(10)
+
+ return
+
@attr(tags = ["advancedsg"])
def test__16_AccountSpecificNwAccess(self):
""" Test account specific network access of users"""
@@ -1152,7 +1262,7 @@ class TestNetworksInAdvancedSG_VmOperations(cloudstackTestCase):
# Steps,
# 1. create multiple accounts/users and their account specific SG enabled shared networks
# 2. Deploy VM in the account specific network with user of that account
- # 3. Try to deploy VM in one account specific network from other account user
+ # 3. Try to deploy VM in one account specific network from other account user
# Validations,
# 1. VM deployment should be allowed for the users of the same account only for their account
@@ -1511,325 +1621,336 @@ class TestNetworksInAdvancedSG_VmOperations(cloudstackTestCase):
return
-class TestSecurityGroups_BasicSanity(cloudstackTestCase):
-
- @classmethod
- def setUpClass(cls):
- cloudstackTestClient = super(TestSecurityGroups_BasicSanity,cls).getClsTestClient()
- cls.api_client = cloudstackTestClient.getApiClient()
-
- cls.services = cloudstackTestClient.getConfigParser().parsedDict
-
- # Get Zone, Domain and templates
- cls.domain = get_domain(cls.api_client, cls.services)
- cls.zone = get_zone(cls.api_client, cls.services)
- cls.template = get_template(cls.api_client,cls.zone.id,cls.services["ostype"])
-
- cls.services["virtual_machine"]["zoneid"] = cls.zone.id
- cls.services["virtual_machine"]["template"] = cls.template.id
+ @attr(tags = ["advancedsg"])
+ def test_24_DeployVM_Multiple_Shared_Networks(self):
+ """ Test deploy VM in multiple zone wide shared networks"""
- cls.service_offering = ServiceOffering.create(cls.api_client,cls.services["service_offering"])
+ # Steps,
+ # 1. Create multiple zone wide shared networks
+ # 2. Try to deploy VM using all these networks
- cls.services["shared_network_offering_sg"]["specifyVlan"] = "True"
- cls.services["shared_network_offering_sg"]["specifyIpRanges"] = "True"
- #Create Network Offering
- cls.shared_network_offering_sg = NetworkOffering.create(cls.api_client,cls.services["shared_network_offering_sg"],conservemode=False)
+ # Validations,
+ # 1. VM deployment should fail saying "error 431 Only support one zone wide network per VM if security group enabled"
- #Update network offering state from disabled to enabled.
- NetworkOffering.update(cls.shared_network_offering_sg,cls.api_client,state="enabled")
+ self.services["shared_network_sg"]["acltype"] = "domain"
+ self.services["shared_network_sg"]["networkofferingid"] = self.shared_network_offering_sg.id
- physical_network, vlan = get_free_vlan(cls.api_client, cls.zone.id)
+ physical_network, vlan = get_free_vlan(self.api_client, self.zone.id)
#create network using the shared network offering created
- cls.services["shared_network_sg"]["vlan"] = vlan
- cls.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
- cls.services["shared_network_sg"]["acltype"] = "domain"
- cls.services["shared_network_sg"]["networkofferingid"] = cls.shared_network_offering_sg.id
+ self.services["shared_network_sg"]["vlan"] = vlan
+ self.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
- cls.setSharedNetworkParams("shared_network_sg")
+ self.setSharedNetworkParams("shared_network_sg")
- cls.shared_network = Network.create(cls.api_client,cls.services["shared_network_sg"],
- networkofferingid=cls.shared_network_offering_sg.id,zoneid=cls.zone.id)
+ shared_network_1 = Network.create(self.api_client,self.services["shared_network_sg"],
+ networkofferingid=self.shared_network_offering_sg.id,zoneid=self.zone.id)
+ self.cleanup_networks.append(shared_network_1)
+ self.debug("Created shared network: %s" % shared_network_1.id)
- cls._cleanup = [
- cls.service_offering,
- cls.shared_network,
- cls.shared_network_offering_sg,
- ]
- return
+ self.services["shared_network_sg"]["vlan"] = get_free_vlan(self.api_client, self.zone.id)[1]
+ self.setSharedNetworkParams("shared_network_sg")
+
+ shared_network_2 = Network.create(self.api_client,self.services["shared_network_sg"],
+ networkofferingid=self.shared_network_offering_sg.id,zoneid=self.zone.id)
+ self.cleanup_networks.append(shared_network_2)
+ self.debug("Created shared network: %s" % shared_network_2.id)
- @classmethod
- def tearDownClass(cls):
try:
- #Update network offering state from enabled to disabled.
- cls.shared_network_offering_sg.update(cls.api_client,state="disabled")
- #Cleanup resources used
- cleanup_resources(cls.api_client, cls._cleanup)
+ self.debug("Creating virtual machine in zone wide shared networks %s and %s, this should fail" %
+ (shared_network_1.id, shared_network_2.id))
+ vm = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ templateid=self.template.id, networkids=[shared_network_1.id, shared_network_2.id],
+ serviceofferingid=self.service_offering.id)
+ self.cleanup_vms.append(vm)
+ self.fail("Vm creation should have failed, it succeded, created vm %s" % vm.id)
except Exception as e:
- raise Exception("Warning: Exception during cleanup : %s" % e)
- return
+ self.debug("VM creation failed as expected with exception: %s" % e)
- def setUp(self):
- self.api_client = self.testClient.getApiClient()
- self.dbclient = self.testClient.getDbConnection()
- self.cleanup = []
- self.cleanup_networks = []
- self.cleanup_accounts = []
- self.cleanup_domains = []
- self.cleanup_projects = []
- self.cleanup_vms = []
- self.cleanup_secGrps = []
return
- def tearDown(self):
- exceptions = []
- try:
- #Clean up, terminate the created network offerings
- cleanup_resources(self.api_client, self.cleanup)
- except Exception as e:
- self.debug("Warning: Exception during cleanup : %s" % e)
- exceptions.append(e)
+ @attr(tags = ["advancedsg"])
+ def test_25_Deploy_Multiple_VM_Different_Shared_Networks_Same_SG(self):
+ """ Test deploy Multiple VMs in different shared networks but same security group"""
- #below components is not a part of cleanup because to mandate the order and to cleanup network
- try:
- for vm in self.cleanup_vms:
- vm.delete(self.api_client)
- except Exception as e:
- self.debug("Warning: Exception during virtual machines cleanup : %s" % e)
- exceptions.append(e)
+ # Steps,
+ # 1. Create multiple zone wide shared networks
+ # 2. Create a custom security group
+ # 3. Deploy Multiple VMs in different shared networks but using the same custom
+ # security group
- # Wait for VMs to expunge
- wait_for_cleanup(self.api_client, ["expunge.delay", "expunge.interval"])
+ # Validations,
+ # 1. VM deployments should succeed
- try:
- for sec_grp in self.cleanup_secGrps:
- sec_grp.delete(self.api_client)
- except Exception as e:
- self.debug("Warning : Exception during security groups cleanup: %s" % e)
- exceptions.append(e)
+ self.services["shared_network_sg"]["acltype"] = "domain"
+ self.services["shared_network_sg"]["networkofferingid"] = self.shared_network_offering_sg.id
- try:
- for project in self.cleanup_projects:
- project.delete(self.api_client)
- except Exception as e:
- self.debug("Warning: Exception during project cleanup : %s" % e)
- exceptions.append(e)
+ physical_network, vlan = get_free_vlan(self.api_client, self.zone.id)
- try:
- for account in self.cleanup_accounts:
- account.delete(self.api_client)
- except Exception as e:
- self.debug("Warning: Exception during account cleanup : %s" % e)
- exceptions.append(e)
+ #create network using the shared network offering created
+ self.services["shared_network_sg"]["vlan"] = vlan
+ self.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
- #Wait till all resources created are cleaned up completely and then attempt to delete Network
- time.sleep(self.services["sleep"])
+ self.setSharedNetworkParams("shared_network_sg")
- try:
- for network in self.cleanup_networks:
- network.delete(self.api_client)
- except Exception as e:
- self.debug("Warning: Exception during network cleanup : %s" % e)
- exceptions.append(e)
+ shared_network_1 = Network.create(self.api_client,self.services["shared_network_sg"],
+ networkofferingid=self.shared_network_offering_sg.id,zoneid=self.zone.id)
+ self.cleanup_networks.append(shared_network_1)
+ self.debug("Created shared network: %s" % shared_network_1.id)
- try:
- for domain in self.cleanup_domains:
- domain.delete(self.api_client)
- except Exception as e:
- self.debug("Warning: Exception during domain cleanup : %s" % e)
- exceptions.append(e)
+ self.services["shared_network_sg"]["vlan"] = get_free_vlan(self.api_client, self.zone.id)[1]
+ self.setSharedNetworkParams("shared_network_sg")
- if len(exceptions) > 0:
- self.fail("There were exceptions during cleanup: %s" % exceptions)
+ shared_network_2 = Network.create(self.api_client,self.services["shared_network_sg"],
+ networkofferingid=self.shared_network_offering_sg.id,zoneid=self.zone.id)
+ self.cleanup_networks.append(shared_network_2)
+ self.debug("Created shared network: %s" % shared_network_2.id)
- return
+ self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
+ sec_grp_1 = SecurityGroup.create(self.api_client,self.services["security_group"])
+ self.debug("Created security groups: %s" % sec_grp_1.id)
+ self.cleanup_secGrps.append(sec_grp_1)
- @classmethod
- def setSharedNetworkParams(cls, network, range=20):
+ self.debug("Creating virtual machine in shared network %s and security group %s" %
+ (shared_network_1.id, sec_grp_1.id))
+ vm_1 = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ templateid=self.template.id, networkids=[shared_network_1.id],
+ serviceofferingid=self.service_offering.id,
+ securitygroupids=[sec_grp_1.id])
+ self.cleanup_vms.append(vm_1)
- # @range: range decides the endip. Pass the range as "x" if you want the difference between the startip
- # and endip as "x"
- # Set the subnet number of shared networks randomly prior to execution
- # of each test case to avoid overlapping of ip addresses
- shared_network_subnet_number = random.randrange(1,254)
+ self.debug("Creating virtual machine in shared network %s and security group %s" %
+ (shared_network_2.id, sec_grp_1.id))
+ vm_2 = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ templateid=self.template.id, networkids=[shared_network_2.id],
+ serviceofferingid=self.service_offering.id,
+ securitygroupids=[sec_grp_1.id])
+ self.cleanup_vms.append(vm_2)
- cls.services[network]["gateway"] = "172.16."+str(shared_network_subnet_number)+".1"
- cls.services[network]["startip"] = "172.16."+str(shared_network_subnet_number)+".2"
- cls.services[network]["endip"] = "172.16."+str(shared_network_subnet_number)+"."+str(range+1)
- cls.services[network]["netmask"] = "255.255.255.0"
+ vm_list = list_virtual_machines(self.api_client, listall=True)
+
+ self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, vm list is %s" % vm_list)
+
+ vm_ids = [vm.id for vm in vm_list]
+ self.assertTrue(vm_1.id in vm_ids, "vm %s not present in vm list %s" % (vm_1.id, vm_ids))
+ self.assertTrue(vm_1.id in vm_ids, "vm %s not present in vm list %s" % (vm_2.id, vm_ids))
return
@attr(tags = ["advancedsg"])
- def test_21_DeployVM_WithoutSG(self):
- """ Test deploy VM without passing any security group id"""
+ def test_26_Destroy_Deploy_VM_NoFreeIPs(self):
+ """ Test destroy VM in zone wide shared nw when IPs are full and then try to deploy vm"""
# Steps,
- # 1. List security groups and authorize ingress rule for the default security group
- # 2. Create custom security group and authorize ingress rule for this security group
- # 3. Deploy VM without passing any security group
- # 4. SSH to VM and try pinging outside world from VM
+ # 1. Create zone wide shared network in SG enabled zone.
+ # 2. Exhaust the free IPs in the network by deploying VM.
+ # 3. Destroy VM when IPs are full and then again try to deploy VM
# Validations,
- # 1. VM deployment should be successful
- # 2. VM should be deployed in default security group and not in custom security group
- # 3. VM should be reached through SSH using any IP and ping to outside world should be successful
-
- ingress_rule_ids = []
+ # 1. VM should be deployed as one IP gets available
- self.debug("Listing security groups")
- securitygroups = SecurityGroup.list(self.api_client)
+ self.services["shared_network_sg"]["acltype"] = "domain"
+ self.services["shared_network_sg"]["networkofferingid"] = self.shared_network_offering_sg.id
- self.assertEqual(validateList(securitygroups)[0], PASS, "securitygroups list validation\
- failed, securitygroups list is %s" % securitygroups)
+ physical_network, vlan = get_free_vlan(self.api_client, self.zone.id)
- defaultSecurityGroup = securitygroups[0]
+ #create network using the shared network offering created
+ self.services["shared_network_sg"]["vlan"] = vlan
+ self.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
- self.debug("Default security group: %s" % defaultSecurityGroup.id)
+ self.setSharedNetworkParams("shared_network_sg", range=2)
- # Authorize ingress rule for the default security group
- cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd()
- cmd.securitygroupid = defaultSecurityGroup.id
- cmd.protocol = 'TCP'
- cmd.startport = 22
- cmd.endport = 80
- cmd.cidrlist = '0.0.0.0/0'
- ingress_rule_1 = self.api_client.authorizeSecurityGroupIngress(cmd)
- ingress_rule_ids.append(ingress_rule_1.ingressrule[0].ruleid)
+ shared_network = Network.create(self.api_client,self.services["shared_network_sg"],
+ networkofferingid=self.shared_network_offering_sg.id,zoneid=self.zone.id)
+ self.cleanup_networks.append(shared_network)
+
+ # Deploying 1 VM will exhaust the IP range because we are passing range as 2, and one of the IPs
+ # already gets consumed by the virtual router of the shared network
+
+ self.debug("Creating virtual machine shared network %s" % shared_network.id)
+ vm_1 = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ templateid=self.template.id,networkids=[shared_network.id,],
+ serviceofferingid=self.service_offering.id)
+ try:
+ self.debug("Trying to create virtual machine when all the IPs are consumed")
+ vm = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ templateid=self.template.id,networkids=[shared_network.id,],
+ serviceofferingid=self.service_offering.id)
+ self.cleanup_vms.append(vm)
+ self.fail("Vm creation succeded, should have failed")
+ except Exception as e:
+ self.debug("VM creation failed as expected with exception: %s" % e)
+
+ # Now delete VM to free IP in shared network
+ vm_1.delete(self.api_client)
+ # Wait for VMs to expunge
+ wait_for_cleanup(self.api_client, ["expunge.delay", "expunge.interval"])
+
+ # As IP is free now, VM deployment in the shared network should be successful
+
+ try:
+ self.debug("Trying to create virtual machine when all the IPs are consumed")
+ vm_2 = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ templateid=self.template.id,networkids=[shared_network.id,],
+ serviceofferingid=self.service_offering.id)
+ self.cleanup_vms.append(vm_2)
+ self.debug("Deployed VM %s in the shared network %s" % (vm_2.id, shared_network.id))
+ except Exception as e:
+ self.fail("VM creation failed with exception: %s" % e)
+
+ return
+
+ @data("stopStart","reboot")
+ @attr(tags = ["advancedsg"])
+ def test_27_start_stop_vm(self, value):
+ """ Test start and stop vm"""
+
+ # Steps,
+ # 1. Create a security group and authorize ingress rule (port 22-80) for this security group
+ # 2. Create a user account and deploy VMs in this account and security group
+ # 3. Try to access VM through SSH
+ # 4. Start and stop/ Reboot VM
+ # 5. Again try to access the VM through SSH
+
+ # Validations,
+ # 1. Both the times, SSH should be successful
+
+ #Create user account
+ user_account = Account.create(self.api_client,self.services["account"],domainid=self.domain.id)
+ self.debug("Created user account : %s" % user_account.name)
+ self.cleanup_accounts.append(user_account)
+
+ ingress_rule_ids = []
# Create custom security group
self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
- custom_sec_grp = SecurityGroup.create(self.api_client,self.services["security_group"])
+ custom_sec_grp = SecurityGroup.create(self.api_client,self.services["security_group"], account=user_account.name, domainid=self.domain.id)
self.debug("Created security groups: %s" % custom_sec_grp.id)
self.cleanup_secGrps.append(custom_sec_grp)
# Authorize Security group to for allowing SSH to VM
- ingress_rule_2 = custom_sec_grp.authorize(self.api_client,self.services["ingress_rule"])
- ingress_rule_ids.append(ingress_rule_2["ingressrule"][0].ruleid)
+ ingress_rule = custom_sec_grp.authorize(self.api_client,self.services["ingress_rule"])
+ ingress_rule_ids.append(ingress_rule["ingressrule"][0].ruleid)
self.debug("Authorized ingress rule for security group: %s" % custom_sec_grp.id)
- self.debug(ingress_rule_ids)
-
# Create virtual machine without passing any security group id
vm = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
- templateid=self.template.id,
- serviceofferingid=self.service_offering.id)
+ templateid=self.template.id,accountid=user_account.name,
+ domainid=self.domain.id,serviceofferingid=self.service_offering.id,
+ securitygroupids = [custom_sec_grp.id,])
self.debug("Created VM : %s" % vm.id)
self.cleanup_vms.append(vm)
- self.debug("listing virtual machines")
-
- vm_list = list_virtual_machines(self.api_client, id=vm.id)
+ # Should be able to SSH VM
+ try:
+ self.debug("SSH into VM: %s" % vm.nic[0].ipaddress)
+ vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
- self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, list is %s" % vm_list)
+ self.debug("SSH to VM successful, proceeding for %s operation" % value)
- self.assertEqual(len(vm_list[0].securitygroup), 1, "There should be exactly one security group associated \
- with the vm, vm security group list is : %s" % vm_list[0].securitygroup)
- self.assertEqual(defaultSecurityGroup.id, vm_list[0].securitygroup[0].id, "Vm should be deployed in default security group %s \
- instead it is deployed in security group %s" % (defaultSecurityGroup.id, vm_list[0].securitygroup[0].id))
+ if value == "stopStart":
+ self.setVmState(vm, "stopped")
+ self.setVmState(vm, "running")
+ elif value == "reboot":
+ vm.reboot(self.api_client)
- # Should be able to SSH VM
- try:
self.debug("SSH into VM: %s" % vm.nic[0].ipaddress)
- ssh = vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
+ vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
- # Ping to outsite world
- res = ssh.execute("ping -c 1 www.google.com")
except Exception as e:
- self.fail("SSH Access failed for %s: %s" % \
- (vm.ssh_ip, e)
+ self.fail("SSH Access failed for %s: %s, failed at line %s" % \
+ (vm.nic[0].ipaddress, e, sys.exc_info()[2].tb_lineno)
)
finally:
cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd()
- self.debug("ingress_rules: %s" % ingress_rule_ids)
for rule_id in ingress_rule_ids:
cmd.id = rule_id
self.api_client.revokeSecurityGroupIngress(cmd)
- result = str(res)
- self.assertEqual(
- result.count("1 received"),
- 1,
- "Ping to outside world from VM should be successful"
- )
+
return
+ @data("recover","expunge")
@attr(tags = ["advancedsg"])
- def test_22_DeployVM_WithoutSG(self):
- """ Test deploy VM without passing any security group id"""
+ def test_28_destroy_recover_expunge_vm(self, value):
+ """ Test start and stop vm"""
# Steps,
- # 1. List security groups and authorize ingress rule for the default security group
- # 2. Create custom security group and authorize ingress rule for this security group
- # 3. Deploy VM by passing only custom security group id
- # 4. SSH to VM and try pinging outside world from VM
+ # 1. Create a security group and authorize ingress rule (port 22-80) for this security group
+ # 2. Create a user account and deploy VMs in this account and security group
+ # 3. Try to access VM through SSH
+ # 4. Destroy and recover VM (Or Expunge)
+ # 5. Again try to access the VM through SSH
# Validations,
- # 1. VM deployment should be successful
- # 2. VM should be deployed in custom security group and not in any other security group
- # 3. VM should be reached through SSH using any IP and ping to outside world should be successful
-
- ingress_rule_ids = []
-
- self.debug("Listing security groups")
- securitygroups = SecurityGroup.list(self.api_client)
-
- self.assertEqual(validateList(securitygroups)[0], PASS, "securitygroups list validation\
- failed, securitygroups list is %s" % securitygroups)
-
- defaultSecurityGroup = securitygroups[0]
+ # 1. In destroy/recover case Both the times, SSH should be successful
+ # 2. In expunge case, SSH should not be suceessful after vm gets expunged
- self.debug("Default security group: %s" % defaultSecurityGroup.id)
+ #Create user account
+ user_account = Account.create(self.api_client,self.services["account"],domainid=self.domain.id)
+ self.debug("Created user account : %s" % user_account.name)
+ self.cleanup_accounts.append(user_account)
- # Authorize ingress rule for the default security group
- cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd()
- cmd.securitygroupid = defaultSecurityGroup.id
- cmd.protocol = 'TCP'
- cmd.startport = 22
- cmd.endport = 80
- cmd.cidrlist = '0.0.0.0/0'
- ingress_rule_1 = self.api_client.authorizeSecurityGroupIngress(cmd)
- ingress_rule_ids.append(ingress_rule_1.ingressrule[0].ruleid)
+ ingress_rule_ids = []
# Create custom security group
self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
- custom_sec_grp = SecurityGroup.create(self.api_client,self.services["security_group"])
+ custom_sec_grp = SecurityGroup.create(self.api_client,self.services["security_group"], account=user_account.name, domainid=self.domain.id)
self.debug("Created security groups: %s" % custom_sec_grp.id)
self.cleanup_secGrps.append(custom_sec_grp)
# Authorize Security group to for allowing SSH to VM
- ingress_rule_2 = custom_sec_grp.authorize(self.api_client,self.services["ingress_rule"])
- ingress_rule_ids.append(ingress_rule_2["ingressrule"][0].ruleid)
+ ingress_rule = custom_sec_grp.authorize(self.api_client,self.services["ingress_rule"])
+ ingress_rule_ids.append(ingress_rule["ingressrule"][0].ruleid)
self.debug("Authorized ingress rule for security group: %s" % custom_sec_grp.id)
# Create virtual machine without passing any security group id
vm = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
- templateid=self.template.id,
- serviceofferingid=self.service_offering.id,
+ templateid=self.template.id,accountid=user_account.name,
+ domainid=self.domain.id,serviceofferingid=self.service_offering.id,
securitygroupids = [custom_sec_grp.id,])
self.debug("Created VM : %s" % vm.id)
- self.cleanup_vms.append(vm)
-
- self.debug("listing virtual machines")
-
- vm_list = list_virtual_machines(self.api_client, id=vm.id)
-
- self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, list is %s" % vm_list)
-
- self.assertEqual(len(vm_list[0].securitygroup), 1, "There should be exactly one security group associated \
- with the vm, vm security group list is : %s" % vm_list[0].securitygroup)
- self.assertEqual(custom_sec_grp.id, vm_list[0].securitygroup[0].id, "Vm should be deployed in custom security group %s \
- instead it is deployed in security group %s" % (custom_sec_grp.id, vm_list[0].securitygroup[0].id))
# Should be able to SSH VM
try:
self.debug("SSH into VM: %s" % vm.nic[0].ipaddress)
- ssh = vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
-
- # Ping to outsite world
- res = ssh.execute("ping -c 1 www.google.com")
+ vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
+ self.debug("SSH to VM successful, proceeding for %s operation" % value)
+ vm.delete(self.api_client)
+ if value == "recover":
+ vm.recover(self.api_client)
+ vm.start(self.api_client)
+ retriesCount = 5
+ while True:
+ vm_list = VirtualMachine.list(self.api_client, id=vm.id)
+ self.assertEqual(validateList(vm_list)[0], PASS , "vm list validation failed, vm list is %s" % vm_list)
+ if str(vm_list[0].state).lower() == "running":
+ break
+ if retriesCount == 0:
+ self.fail("Failed to start vm: %s" % vm.id)
+ retriesCount -= 1
+ time.sleep(10)
+ self.debug("SSH into VM: %s" % vm.nic[0].ipaddress)
+ vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
+ self.debug("SSH successful")
+ self.cleanup_vms.append(vm)
+ elif value == "expunge":
+ #wait till vm gets expunged
+ wait_for_cleanup(self.api_client, ["expunge.delay", "expunge.interval"])
+ try:
+ vm_list = VirtualMachine.list(self.api_client, id=vm.id)
+ self.fail("vm listing should fail, instead got vm list: %s" % vm_list)
+ except Exception as e:
+ self.debug("Vm listing failed as expected with exception: %s" % e)
+
+ try:
+ self.debug("SSH into VM: %s, this should fail" % vm.nic[0].ipaddress)
+ vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
+ self.fail("SSH should have failed, instead it succeeded")
+ except Exception as e:
+ self.debug("SSH failed as expected with exception: %s" % e)
except Exception as e:
- self.fail("SSH Access failed for %s: %s" % \
- (vm.ssh_ip, e)
+ self.fail("SSH Access failed for %s: %s, failed at line %s" % \
+ (vm.nic[0].ipaddress, e, sys.exc_info()[2].tb_lineno)
)
finally:
cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd()
@@ -1837,104 +1958,1344 @@ class TestSecurityGroups_BasicSanity(cloudstackTestCase):
cmd.id = rule_id
self.api_client.revokeSecurityGroupIngress(cmd)
- result = str(res)
- self.assertEqual(
- result.count("1 received"),
- 1,
- "Ping to outside world from VM should be successful"
- )
-
return
@attr(tags = ["advancedsg"])
- def test_23_DeployVM_MultipleSG(self):
- """ Test deploy VM in multiple security groups"""
+ def test_31_Deploy_VM_multiple_shared_networks_sg(self):
+ """ Test deploy VM in multiple SG enabled shared networks"""
# Steps,
- # 1. Create multiple security groups
- # 2. Authorize ingress rule for all these security groups
- # 3. Deploy VM with all the security groups
- # 4. SSH to VM
+ # 1. Create multiple SG enabled shared networks
+ # 3. Try to deploy VM in all of these networks
# Validations,
- # 1. VM deployment should be successful
- # 2. VM should list all the security groups
- # 3. VM should be reached through SSH using any IP in the mentioned CIDRs of the ingress rules
+ # 1. VM deployment should fail saying "Only support one network per VM if security group enabled"
- ingress_rule_ids = []
+ self.services["shared_network_sg"]["acltype"] = "domain"
+ self.services["shared_network_sg"]["networkofferingid"] = self.shared_network_offering_sg.id
- self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
- sec_grp_1 = SecurityGroup.create(self.api_client,self.services["security_group"])
- self.debug("Created security groups: %s" % sec_grp_1.id)
- self.cleanup_secGrps.append(sec_grp_1)
+ physical_network, vlan = get_free_vlan(self.api_client, self.zone.id)
- self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
- sec_grp_2 = SecurityGroup.create(self.api_client,self.services["security_group"])
- self.debug("Created security groups: %s" % sec_grp_2.id)
- self.cleanup_secGrps.append(sec_grp_2)
+ #create network using the shared network offering created
+ self.services["shared_network_sg"]["vlan"] = vlan
+ self.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
- self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
- sec_grp_3 = SecurityGroup.create(self.api_client,self.services["security_group"])
- self.debug("Created security groups: %s" % sec_grp_3.id)
- self.cleanup_secGrps.append(sec_grp_3)
+ self.setSharedNetworkParams("shared_network_sg")
- # Authorize Security group to for allowing SSH to VM
- ingress_rule_1 = sec_grp_1.authorize(self.api_client,self.services["ingress_rule"])
- ingress_rule_ids.append(ingress_rule_1["ingressrule"][0].ruleid)
- self.debug("Authorized ingress rule for security group: %s" % sec_grp_1.id)
+ shared_network_1 = Network.create(self.api_client,self.services["shared_network_sg"],
+ networkofferingid=self.shared_network_offering_sg.id,zoneid=self.zone.id)
+ self.cleanup_networks.append(shared_network_1)
+ self.debug("Created shared network: %s" % shared_network_1.id)
- ingress_rule_2 = sec_grp_2.authorize(self.api_client,self.services["ingress_rule"])
- ingress_rule_ids.append(ingress_rule_2["ingressrule"][0].ruleid)
- self.debug("Authorized ingress rule for security group: %s" % sec_grp_2.id)
+ self.services["shared_network_sg"]["vlan"] = get_free_vlan(self.api_client, self.zone.id)[1]
+ self.setSharedNetworkParams("shared_network_sg")
- ingress_rule_3 = sec_grp_3.authorize(self.api_client,self.services["ingress_rule"])
- ingress_rule_ids.append(ingress_rule_3["ingressrule"][0].ruleid)
- self.debug("Authorized ingress rule for security group: %s" % sec_grp_3.id)
+ shared_network_2 = Network.create(self.api_client,self.services["shared_network_sg"],
+ networkofferingid=self.shared_network_offering_sg.id,zoneid=self.zone.id)
+ self.cleanup_networks.append(shared_network_2)
+ self.debug("Created shared network: %s" % shared_network_2.id)
- vm = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
- templateid=self.template.id,
- securitygroupids=[sec_grp_1.id, sec_grp_2.id, sec_grp_3.id,],
- serviceofferingid=self.service_offering.id)
- self.debug("Created VM : %s" % vm.id)
- self.cleanup_vms.append(vm)
+ try:
+ self.debug("Creating virtual machine in shared networks %s and %s, this should fail" %
+ (shared_network_1.id, shared_network_2.id))
+ vm = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ templateid=self.template.id, networkids=[shared_network_1.id, shared_network_2.id],
+ serviceofferingid=self.service_offering.id)
+ self.cleanup_vms.append(vm)
+ self.fail("Vm deployment should have failed, instead deployed vm %s" % vm.id)
+ except Exception as e:
+ self.debug("VM deployment failed as expected with exception: %s" % e)
- self.debug("listing virtual machines")
+ return
- vm_list = list_virtual_machines(self.api_client, id=vm.id)
+ @unittest.skip("Testing pending on multihost setup")
+ @data("account","domain","zone")
+ @attr(tags = ["advancedsg"])
+ def test_33_VM_Migrate_SharedNwSG(self, value):
+ """ Test migration of VM deployed in Account specific shared network"""
- self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, list is %s" % vm_list)
+ # Steps,
+ # 1. create a user account
+ # 2. Create one shared Network (scope=Account/domain/zone)
+ # 3. Deploy one VM in above shared network
+ # 4. Migrate VM to another host
- sec_grp_list = [sec_grp.id for sec_grp in vm_list[0].securitygroup]
+ # Validations,
+ # 1. VM migration should be successful
- self.assertTrue(sec_grp_1.id in sec_grp_list, "%s not present in security groups of vm: %s" %
- (sec_grp_1.id, sec_grp_list))
- self.assertTrue(sec_grp_2.id in sec_grp_list, "%s not present in security groups of vm: %s" %
- (sec_grp_2.id, sec_grp_list))
- self.assertTrue(sec_grp_3.id in sec_grp_list, "%s not present in security groups of vm: %s" %
- (sec_grp_3.id, sec_grp_list))
+ #Create admin account
- # Should be able to SSH VM
- try:
- self.debug("SSH into VM: %s" % vm.ssh_ip)
- ssh = vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
+ hosts = Host.list(self.api_client, zoneid=self.zone.id)
+ self.assertEqual(validateList(hosts)[0], PASS, "hosts list validation failed, list is %s" % hosts)
+ if len(hosts) < 2:
+ self.skipTest("This test requires at least two hosts present in the zone")
+ domain = self.domain
+ account = None
+ self.services["shared_network_sg"]["acltype"] = "domain"
+ if value == "domain":
+ domain = Domain.create(self.api_client, services=self.services["domain"],
+ parentdomainid=self.domain.id)
+ self.cleanup_domains.append(domain)
+ elif value == "account":
+ account = Account.create(self.api_client,self.services["account"],admin=True,
+ domainid=self.domain.id)
+ self.cleanup_accounts.append(account)
+ self.services["shared_network_sg"]["acltype"] = "account"
- # Ping to outsite world
- res = ssh.execute("ping -c 1 www.google.com")
+ physical_network, vlan = get_free_vlan(self.api_client, self.zone.id)
+ #create network using the shared network offering created
+ self.services["shared_network_sg"]["vlan"] = vlan
+ self.services["shared_network_sg"]["networkofferingid"] = self.shared_network_offering_sg.id
+ self.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
+
+ self.setSharedNetworkParams("shared_network_sg")
+
+ shared_network_sg = Network.create(self.api_client, self.services["shared_network_sg"],
+ accountid=account.name if account else None,
+ domainid=domain.id,
+ networkofferingid=self.shared_network_offering_sg.id,
+ zoneid=self.zone.id)
+
+ if value == "domain" or value == "zone":
+ self.cleanup_networks.append(shared_network_sg)
+
+ self.debug("Created %s wide shared network %s" % (value,shared_network_sg.id))
+
+ virtual_machine = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ accountid=account.name if account else None,
+ domainid=domain.id,
+ networkids=[shared_network_sg.id],
+ serviceofferingid=self.service_offering.id
+ )
+ self.cleanup_vms.append(virtual_machine)
+ hosts_to_migrate = [host for host in hosts if host.id != virtual_machine.hostid]
+ self.assertTrue(len(hosts_to_migrate) > 1, "At least one suitable host should be present to migrate VM")
+
+ try:
+ self.debug("trying to migrate virtual machine from host %s to host %s" % (virtual_machine.hostid, hosts_to_migrate[0].id))
+ virtual_machine.migrate(self.api_client, hosts_to_migrate[0].id)
except Exception as e:
- self.fail("SSH Access failed for %s: %s" % \
- (vm.ssh_ip, e)
- )
- finally:
- cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd()
- for rule_id in ingress_rule_ids:
- cmd.id = rule_id
- self.api_client.revokeSecurityGroupIngress(cmd)
+ self.fail("VM migration failed with exception %s" % e)
- result = str(res)
- self.assertEqual(
- result.count("1 received"),
- 1,
- "Ping to outside world from VM should be successful"
- )
+ vm_list = list_virtual_machines(self.api_client, id=virtual_machine.id)
+ self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, vm list is %s" % vm_list)
+ self.assertEqual(vm_list[0].hostid, hosts_to_migrate[0].id, "VM host id does not reflect the migration")
+ return
+
+class TestSecurityGroups_BasicSanity(cloudstackTestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cloudstackTestClient = super(TestSecurityGroups_BasicSanity,cls).getClsTestClient()
+ cls.api_client = cloudstackTestClient.getApiClient()
+
+ cls.services = cloudstackTestClient.getConfigParser().parsedDict
+
+ # Get Zone, Domain and templates
+ cls.domain = get_domain(cls.api_client, cls.services)
+ cls.zone = get_zone(cls.api_client, cls.services)
+ cls.template = get_template(cls.api_client,cls.zone.id,cls.services["ostype"])
+ cls.services["virtual_machine"]["zoneid"] = cls.zone.id
+ cls.services["virtual_machine"]["template"] = cls.template.id
+
+ cls.service_offering = ServiceOffering.create(cls.api_client,cls.services["service_offering"])
+
+ cls.services["shared_network_offering_sg"]["specifyVlan"] = "True"
+ cls.services["shared_network_offering_sg"]["specifyIpRanges"] = "True"
+ #Create Network Offering
+ cls.shared_network_offering_sg = NetworkOffering.create(cls.api_client,cls.services["shared_network_offering_sg"],conservemode=False)
+
+ #Update network offering state from disabled to enabled.
+ NetworkOffering.update(cls.shared_network_offering_sg,cls.api_client,state="enabled")
+
+ physical_network, vlan = get_free_vlan(cls.api_client, cls.zone.id)
+
+ #create network using the shared network offering created
+ cls.services["shared_network_sg"]["vlan"] = vlan
+ cls.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
+ cls.services["shared_network_sg"]["acltype"] = "domain"
+ cls.services["shared_network_sg"]["networkofferingid"] = cls.shared_network_offering_sg.id
+
+ cls.setSharedNetworkParams("shared_network_sg")
+
+ cls.shared_network = Network.create(cls.api_client,cls.services["shared_network_sg"],
+ networkofferingid=cls.shared_network_offering_sg.id,zoneid=cls.zone.id)
+
+ cls._cleanup = [
+ cls.service_offering,
+ cls.shared_network,
+ cls.shared_network_offering_sg,
+ ]
+ return
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ #Update network offering state from enabled to disabled.
+ cls.shared_network_offering_sg.update(cls.api_client,state="disabled")
+ #Cleanup resources used
+ cleanup_resources(cls.api_client, cls._cleanup)
+ except Exception as e:
+ raise Exception("Warning: Exception during cleanup : %s" % e)
+ return
+
+ def setUp(self):
+ self.api_client = self.testClient.getApiClient()
+ self.dbclient = self.testClient.getDbConnection()
+ self.cleanup = []
+ self.cleanup_networks = []
+ self.cleanup_accounts = []
+ self.cleanup_domains = []
+ self.cleanup_projects = []
+ self.cleanup_vms = []
+ self.cleanup_secGrps = []
+ return
+
+ def tearDown(self):
+ exceptions = []
+ try:
+ #Clean up, terminate the created network offerings
+ cleanup_resources(self.api_client, self.cleanup)
+ except Exception as e:
+ self.debug("Warning: Exception during cleanup : %s" % e)
+ exceptions.append(e)
+
+ #below components is not a part of cleanup because to mandate the order and to cleanup network
+ try:
+ for vm in self.cleanup_vms:
+ vm.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during virtual machines cleanup : %s" % e)
+ exceptions.append(e)
+
+ # Wait for VMs to expunge
+ wait_for_cleanup(self.api_client, ["expunge.delay", "expunge.interval"])
+
+ try:
+ for sec_grp in self.cleanup_secGrps:
+ sec_grp.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning : Exception during security groups cleanup: %s" % e)
+ exceptions.append(e)
+
+ try:
+ for project in self.cleanup_projects:
+ project.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during project cleanup : %s" % e)
+ exceptions.append(e)
+
+ try:
+ for account in self.cleanup_accounts:
+ account.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during account cleanup : %s" % e)
+ exceptions.append(e)
+
+ #Wait till all resources created are cleaned up completely and then attempt to delete Network
+ time.sleep(self.services["sleep"])
+
+ try:
+ for network in self.cleanup_networks:
+ network.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during network cleanup : %s" % e)
+ exceptions.append(e)
+
+ try:
+ for domain in self.cleanup_domains:
+ domain.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during domain cleanup : %s" % e)
+ exceptions.append(e)
+
+ if len(exceptions) > 0:
+ self.fail("There were exceptions during cleanup: %s" % exceptions)
+
+ return
+
+ @classmethod
+ def setSharedNetworkParams(cls, network, range=20):
+
+ # @range: range decides the endip. Pass the range as "x" if you want the difference between the startip
+ # and endip as "x"
+ # Set the subnet number of shared networks randomly prior to execution
+ # of each test case to avoid overlapping of ip addresses
+ shared_network_subnet_number = random.randrange(1,254)
+
+ cls.services[network]["gateway"] = "172.16."+str(shared_network_subnet_number)+".1"
+ cls.services[network]["startip"] = "172.16."+str(shared_network_subnet_number)+".2"
+ cls.services[network]["endip"] = "172.16."+str(shared_network_subnet_number)+"."+str(range+1)
+ cls.services[network]["netmask"] = "255.255.255.0"
+
+ return
+
+ @attr(tags = ["advancedsg"])
+ def test_21_DeployVM_WithoutSG(self):
+ """ Test deploy VM without passing any security group id"""
+
+ # Steps,
+ # 1. List security groups and authorize ingress rule for the default security group
+ # 2. Create custom security group and authorize ingress rule for this security group
+ # 3. Deploy VM without passing any security group
+ # 4. SSH to VM and try pinging outside world from VM
+
+ # Validations,
+ # 1. VM deployment should be successful
+ # 2. VM should be deployed in default security group and not in custom security group
+ # 3. VM should be reached through SSH using any IP and ping to outside world should be successful
+
+ ingress_rule_ids = []
+
+ self.debug("Listing security groups")
+ securitygroups = SecurityGroup.list(self.api_client)
+
+ self.assertEqual(validateList(securitygroups)[0], PASS, "securitygroups list validation\
+ failed, securitygroups list is %s" % securitygroups)
+
+ defaultSecurityGroup = securitygroups[0]
+
+ self.debug("Default security group: %s" % defaultSecurityGroup.id)
+
+ # Authorize ingress rule for the default security group
+ cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd()
+ cmd.securitygroupid = defaultSecurityGroup.id
+ cmd.protocol = 'TCP'
+ cmd.startport = 22
+ cmd.endport = 80
+ cmd.cidrlist = '0.0.0.0/0'
+ ingress_rule_1 = self.api_client.authorizeSecurityGroupIngress(cmd)
+ ingress_rule_ids.append(ingress_rule_1.ingressrule[0].ruleid)
+
+ # Create custom security group
+ self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
+ custom_sec_grp = SecurityGroup.create(self.api_client,self.services["security_group"])
+ self.debug("Created security groups: %s" % custom_sec_grp.id)
+ self.cleanup_secGrps.append(custom_sec_grp)
+
+ # Authorize Security group to for allowing SSH to VM
+ ingress_rule_2 = custom_sec_grp.authorize(self.api_client,self.services["ingress_rule"])
+ ingress_rule_ids.append(ingress_rule_2["ingressrule"][0].ruleid)
+ self.debug("Authorized ingress rule for security group: %s" % custom_sec_grp.id)
+
+ self.debug(ingress_rule_ids)
+
+ # Create virtual machine without passing any security group id
+ vm = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ templateid=self.template.id,
+ serviceofferingid=self.service_offering.id)
+ self.debug("Created VM : %s" % vm.id)
+ self.cleanup_vms.append(vm)
+
+ self.debug("listing virtual machines")
+
+ vm_list = list_virtual_machines(self.api_client, id=vm.id)
+
+ self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, list is %s" % vm_list)
+
+ self.assertEqual(len(vm_list[0].securitygroup), 1, "There should be exactly one security group associated \
+ with the vm, vm security group list is : %s" % vm_list[0].securitygroup)
+ self.assertEqual(defaultSecurityGroup.id, vm_list[0].securitygroup[0].id, "Vm should be deployed in default security group %s \
+ instead it is deployed in security group %s" % (defaultSecurityGroup.id, vm_list[0].securitygroup[0].id))
+
+ # Should be able to SSH VM
+ try:
+ self.debug("SSH into VM: %s" % vm.nic[0].ipaddress)
+ ssh = vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
+
+ # Ping to outsite world
+ res = ssh.execute("ping -c 1 www.google.com")
+ except Exception as e:
+ self.fail("SSH Access failed for %s: %s" % \
+ (vm.ssh_ip, e)
+ )
+ finally:
+ cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd()
+ self.debug("ingress_rules: %s" % ingress_rule_ids)
+ for rule_id in ingress_rule_ids:
+ cmd.id = rule_id
+ self.api_client.revokeSecurityGroupIngress(cmd)
+ result = str(res)
+ self.assertEqual(
+ result.count("1 received"),
+ 1,
+ "Ping to outside world from VM should be successful"
+ )
+ return
+
+ @attr(tags = ["advancedsg"])
+ def test_22_DeployVM_With_Custom_SG(self):
+ """ Test deploy VM by passing custom security group id"""
+
+ # Steps,
+ # 1. List security groups and authorize ingress rule for the default security group
+ # 2. Create custom security group and authorize ingress rule for this security group
+ # 3. Deploy VM by passing only custom security group id
+ # 4. SSH to VM and try pinging outside world from VM
+
+ # Validations,
+ # 1. VM deployment should be successful
+ # 2. VM should be deployed in custom security group and not in any other security group
+ # 3. VM should be reached through SSH using any IP and ping to outside world should be successful
+
+ ingress_rule_ids = []
+
+ self.debug("Listing security groups")
+ securitygroups = SecurityGroup.list(self.api_client)
+
+ self.assertEqual(validateList(securitygroups)[0], PASS, "securitygroups list validation\
+ failed, securitygroups list is %s" % securitygroups)
+
+ defaultSecurityGroup = securitygroups[0]
+
+ self.debug("Default security group: %s" % defaultSecurityGroup.id)
+
+ # Authorize ingress rule for the default security group
+ cmd = authorizeSecurityGroupIngress.authorizeSecurityGroupIngressCmd()
+ cmd.securitygroupid = defaultSecurityGroup.id
+ cmd.protocol = 'TCP'
+ cmd.startport = 22
+ cmd.endport = 80
+ cmd.cidrlist = '0.0.0.0/0'
+ ingress_rule_1 = self.api_client.authorizeSecurityGroupIngress(cmd)
+ ingress_rule_ids.append(ingress_rule_1.ingressrule[0].ruleid)
+
+ # Create custom security group
+ self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
+ custom_sec_grp = SecurityGroup.create(self.api_client,self.services["security_group"])
+ self.debug("Created security groups: %s" % custom_sec_grp.id)
+ self.cleanup_secGrps.append(custom_sec_grp)
+
+ # Authorize Security group to for allowing SSH to VM
+ ingress_rule_2 = custom_sec_grp.authorize(self.api_client,self.services["ingress_rule"])
+ ingress_rule_ids.append(ingress_rule_2["ingressrule"][0].ruleid)
+ self.debug("Authorized ingress rule for security group: %s" % custom_sec_grp.id)
+
+ # Create virtual machine without passing any security group id
+ vm = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ templateid=self.template.id,
+ serviceofferingid=self.service_offering.id,
+ securitygroupids = [custom_sec_grp.id,])
+ self.debug("Created VM : %s" % vm.id)
+ self.cleanup_vms.append(vm)
+
+ self.debug("listing virtual machines")
+
+ vm_list = list_virtual_machines(self.api_client, id=vm.id)
+
+ self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, list is %s" % vm_list)
+
+ self.assertEqual(len(vm_list[0].securitygroup), 1, "There should be exactly one security group associated \
+ with the vm, vm security group list is : %s" % vm_list[0].securitygroup)
+ self.assertEqual(custom_sec_grp.id, vm_list[0].securitygroup[0].id, "Vm should be deployed in custom security group %s \
+ instead it is deployed in security group %s" % (custom_sec_grp.id, vm_list[0].securitygroup[0].id))
+
+ # Should be able to SSH VM
+ try:
+ self.debug("SSH into VM: %s" % vm.nic[0].ipaddress)
+ ssh = vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
+
+ # Ping to outsite world
+ res = ssh.execute("ping -c 1 www.google.com")
+ except Exception as e:
+ self.fail("SSH Access failed for %s: %s" % \
+ (vm.ssh_ip, e)
+ )
+ finally:
+ cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd()
+ for rule_id in ingress_rule_ids:
+ cmd.id = rule_id
+ self.api_client.revokeSecurityGroupIngress(cmd)
+
+ result = str(res)
+ self.assertEqual(
+ result.count("1 received"),
+ 1,
+ "Ping to outside world from VM should be successful"
+ )
+
+ return
+
+ @attr(tags = ["advancedsg"])
+ def test_23_DeployVM_MultipleSG(self):
+ """ Test deploy VM in multiple security groups"""
+
+ # Steps,
+ # 1. Create multiple security groups
+ # 2. Authorize ingress rule for all these security groups
+ # 3. Deploy VM with all the security groups
+ # 4. SSH to VM
+
+ # Validations,
+ # 1. VM deployment should be successful
+ # 2. VM should list all the security groups
+ # 3. VM should be reached through SSH using any IP in the mentioned CIDRs of the ingress rules
+
+ ingress_rule_ids = []
+
+ self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
+ sec_grp_1 = SecurityGroup.create(self.api_client,self.services["security_group"])
+ self.debug("Created security groups: %s" % sec_grp_1.id)
+ self.cleanup_secGrps.append(sec_grp_1)
+
+ self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
+ sec_grp_2 = SecurityGroup.create(self.api_client,self.services["security_group"])
+ self.debug("Created security groups: %s" % sec_grp_2.id)
+ self.cleanup_secGrps.append(sec_grp_2)
+
+ self.services["security_group"]["name"] = "Custom_sec_grp_" + random_gen()
+ sec_grp_3 = SecurityGroup.create(self.api_client,self.services["security_group"])
+ self.debug("Created security groups: %s" % sec_grp_3.id)
+ self.cleanup_secGrps.append(sec_grp_3)
+
+ # Authorize Security group to for allowing SSH to VM
+ ingress_rule_1 = sec_grp_1.authorize(self.api_client,self.services["ingress_rule"])
+ ingress_rule_ids.append(ingress_rule_1["ingressrule"][0].ruleid)
+ self.debug("Authorized ingress rule for security group: %s" % sec_grp_1.id)
+
+ ingress_rule_2 = sec_grp_2.authorize(self.api_client,self.services["ingress_rule"])
+ ingress_rule_ids.append(ingress_rule_2["ingressrule"][0].ruleid)
+ self.debug("Authorized ingress rule for security group: %s" % sec_grp_2.id)
+
+ ingress_rule_3 = sec_grp_3.authorize(self.api_client,self.services["ingress_rule"])
+ ingress_rule_ids.append(ingress_rule_3["ingressrule"][0].ruleid)
+ self.debug("Authorized ingress rule for security group: %s" % sec_grp_3.id)
+
+ vm = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ templateid=self.template.id,
+ securitygroupids=[sec_grp_1.id, sec_grp_2.id, sec_grp_3.id,],
+ serviceofferingid=self.service_offering.id)
+ self.debug("Created VM : %s" % vm.id)
+ self.cleanup_vms.append(vm)
+
+ self.debug("listing virtual machines")
+
+ vm_list = list_virtual_machines(self.api_client, id=vm.id)
+
+ self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, list is %s" % vm_list)
+
+ sec_grp_list = [sec_grp.id for sec_grp in vm_list[0].securitygroup]
+
+ self.assertTrue(sec_grp_1.id in sec_grp_list, "%s not present in security groups of vm: %s" %
+ (sec_grp_1.id, sec_grp_list))
+ self.assertTrue(sec_grp_2.id in sec_grp_list, "%s not present in security groups of vm: %s" %
+ (sec_grp_2.id, sec_grp_list))
+ self.assertTrue(sec_grp_3.id in sec_grp_list, "%s not present in security groups of vm: %s" %
+ (sec_grp_3.id, sec_grp_list))
+
+ # Should be able to SSH VM
+ try:
+ self.debug("SSH into VM: %s" % vm.ssh_ip)
+ ssh = vm.get_ssh_client(ipaddress=vm.nic[0].ipaddress)
+
+ # Ping to outsite world
+ res = ssh.execute("ping -c 1 www.google.com")
+ except Exception as e:
+ self.fail("SSH Access failed for %s: %s" % \
+ (vm.ssh_ip, e)
+ )
+ finally:
+ cmd = revokeSecurityGroupIngress.revokeSecurityGroupIngressCmd()
+ for rule_id in ingress_rule_ids:
+ cmd.id = rule_id
+ self.api_client.revokeSecurityGroupIngress(cmd)
+
+ result = str(res)
+ self.assertEqual(
+ result.count("1 received"),
+ 1,
+ "Ping to outside world from VM should be successful"
+ )
+ return
+
+ @attr(tags = ["advancedsg"])
+ def test_32_delete_default_security_group(self):
+ """ Test Delete the default security group when No VMs are deployed"""
+
+ # Steps,
+ # 1. create an account
+ # 2. List the security groups belonging to the account
+ # 3. Delete the default security group
+ # Validations,
+ # 1. Default security group of the account should not get deleted
+
+ #Create admin account
+ account = Account.create(self.api_client, self.services["account"], admin=True,
+ domainid=self.domain.id)
+
+ self.cleanup_accounts.append(account)
+
+ self.debug("Admin type account created: %s" % account.name)
+
+ securitygroups = SecurityGroup.list(self.api_client, account=account.name, domainid=self.domain.id)
+ self.assertEqual(validateList(securitygroups)[0], PASS, "security groups list validation failed, list is %s" % securitygroups)
+
+ defaultSecGroup = securitygroups[0]
+ cmd = deleteSecurityGroup.deleteSecurityGroupCmd()
+ cmd.id = defaultSecGroup.id
+
+ try:
+ self.debug("Deleting default security group %s in account %s" % (defaultSecGroup.id, account.id))
+ self.api_client.deleteSecurityGroup(cmd)
+ self.fail("Default security group of the account got deleted")
+ except Exception as e:
+ self.debug("Deleting the default security group failed as expected with exception: %s" % e)
+ return
+
+@ddt
+class TestSharedNetworkOperations(cloudstackTestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cloudstackTestClient = super(TestSharedNetworkOperations,cls).getClsTestClient()
+ cls.api_client = cloudstackTestClient.getApiClient()
+
+ cls.services = cloudstackTestClient.getConfigParser().parsedDict
+
+ # Get Zone, Domain and templates
+ cls.domain = get_domain(cls.api_client, cls.services)
+ cls.zone = get_zone(cls.api_client, cls.services)
+ cls.template = get_template(cls.api_client,cls.zone.id,cls.services["ostype"])
+
+ cls.services["virtual_machine"]["zoneid"] = cls.zone.id
+ cls.services["virtual_machine"]["template"] = cls.template.id
+
+ cls.service_offering = ServiceOffering.create(cls.api_client,cls.services["service_offering"])
+
+ cls.services["shared_network_offering_sg"]["specifyVlan"] = "True"
+ cls.services["shared_network_offering_sg"]["specifyIpRanges"] = "True"
+ #Create Network Offering
+ cls.shared_network_offering_sg = NetworkOffering.create(cls.api_client,cls.services["shared_network_offering_sg"],conservemode=False)
+
+ #Update network offering state from disabled to enabled.
+ NetworkOffering.update(cls.shared_network_offering_sg,cls.api_client,state="enabled")
+
+ cls._cleanup = [
+ cls.service_offering,
+ cls.shared_network_offering_sg
+ ]
+ return
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ #Update network offering state from enabled to disabled.
+ cls.shared_network_offering_sg.update(cls.api_client,state="disabled")
+ #Cleanup resources used
+ cleanup_resources(cls.api_client, cls._cleanup)
+ except Exception as e:
+ raise Exception("Warning: Exception during cleanup : %s" % e)
+ return
+
+ def setUp(self):
+ self.api_client = self.testClient.getApiClient()
+ self.dbclient = self.testClient.getDbConnection()
+ self.cleanup = []
+ self.cleanup_networks = []
+ self.cleanup_accounts = []
+ self.cleanup_domains = []
+ self.cleanup_projects = []
+ self.cleanup_vms = []
+ self.cleanup_secGrps = []
+ return
+
+ def tearDown(self):
+ exceptions = []
+ try:
+ #Clean up, terminate the created network offerings
+ cleanup_resources(self.api_client, self.cleanup)
+ except Exception as e:
+ self.debug("Warning: Exception during cleanup : %s" % e)
+ exceptions.append(e)
+
+ #below components is not a part of cleanup because to mandate the order and to cleanup network
+ try:
+ for vm in self.cleanup_vms:
+ vm.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during virtual machines cleanup : %s" % e)
+ exceptions.append(e)
+
+ # Wait for VMs to expunge
+ wait_for_cleanup(self.api_client, ["expunge.delay", "expunge.interval"])
+
+ try:
+ for sec_grp in self.cleanup_secGrps:
+ sec_grp.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning : Exception during security groups cleanup: %s" % e)
+ exceptions.append(e)
+
+ try:
+ for project in self.cleanup_projects:
+ project.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during project cleanup : %s" % e)
+ exceptions.append(e)
+
+ try:
+ for account in self.cleanup_accounts:
+ account.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during account cleanup : %s" % e)
+ exceptions.append(e)
+
+ #Wait till all resources created are cleaned up completely and then attempt to delete Network
+ time.sleep(self.services["sleep"])
+
+ try:
+ for network in self.cleanup_networks:
+ network.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during network cleanup : %s" % e)
+ exceptions.append(e)
+
+ try:
+ for domain in self.cleanup_domains:
+ domain.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during domain cleanup : %s" % e)
+ exceptions.append(e)
+
+ if len(exceptions) > 0:
+ self.fail("There were exceptions during cleanup: %s" % exceptions)
+
+ return
+
+ def setSharedNetworkParams(self, network, range=20):
+
+ # @range: range decides the endip. Pass the range as "x" if you want the difference between the startip
+ # and endip as "x"
+ # Set the subnet number of shared networks randomly prior to execution
+ # of each test case to avoid overlapping of ip addresses
+ shared_network_subnet_number = random.randrange(1,254)
+
+ self.services[network]["gateway"] = "172.16."+str(shared_network_subnet_number)+".1"
+ self.services[network]["startip"] = "172.16."+str(shared_network_subnet_number)+".2"
+ self.services[network]["endip"] = "172.16."+str(shared_network_subnet_number)+"."+str(range+1)
+ self.services[network]["netmask"] = "255.255.255.0"
+
+ return
+
+ @data("account","domain","zone")
+ @attr(tags = ["advancedsg"])
+ def test_34_restart_shared_network_sg(self, value):
+ """ Test restart account/domain/zone wide shared network"""
+
+ # Steps,
+ # 1. Create account/domain/zone wide shared Network
+ # 2. Restart shared network
+
+ # Validations,
+ # 1. Network restart should be successful
+
+ domain = self.domain
+ account = None
+ self.services["shared_network_sg"]["acltype"] = "domain"
+ if value == "domain":
+ domain = Domain.create(self.api_client, services=self.services["domain"],
+ parentdomainid=self.domain.id)
+ self.cleanup_domains.append(domain)
+ elif value == "account":
+ account = Account.create(self.api_client,self.services["account"],admin=True,
+ domainid=self.domain.id)
+ self.cleanup_accounts.append(account)
+ self.services["shared_network_sg"]["acltype"] = "account"
+
+ physical_network, vlan = get_free_vlan(self.api_client, self.zone.id)
+ #create network using the shared network offering created
+ self.services["shared_network_sg"]["vlan"] = vlan
+ self.services["shared_network_sg"]["networkofferingid"] = self.shared_network_offering_sg.id
+ self.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
+
+ self.setSharedNetworkParams("shared_network_sg")
+
+ shared_network_sg = Network.create(self.api_client, self.services["shared_network_sg"],
+ accountid=account.name if account else None,
+ domainid=domain.id,
+ networkofferingid=self.shared_network_offering_sg.id,
+ zoneid=self.zone.id)
+ if value == "domain" or value == "zone":
+ self.cleanup_networks.append(shared_network_sg)
+
+ self.debug("Created %s wide shared network %s" % (value,shared_network_sg.id))
+
+ try:
+ self.debug("Restarting shared network: %s" % shared_network_sg)
+ shared_network_sg.restart(self.api_client)
+ except Exception as e:
+ self.fail("Exception while restarting the shared network: %s" % e)
+ return
+
+ @unittest.skip("Testing pending on multihost setup")
+ @data("account","domain","zone")
+ @attr(tags = ["advancedsg"])
+ def test_35_Enable_Host_Maintenance(self, value):
+ """ Test security group rules of VM after putting host in maintenance mode"""
+
+ # Steps,
+ # 1. Deploy vm in account/domain/zone wide shared network
+ # 2. Put the host of VM in maintenance mode
+ # 3. Verify the security group associated with the VM
+ # 4. Cancel the maintenance mode of the host
+ # 5. Again verify the security group of the VM
+
+ # Validations,
+ # 1. Security group should remain the same throughout all the operations
+
+ #Create admin account
+
+ hosts = Host.list(self.api_client, zoneid=self.zone.id)
+ self.assertEqual(validateList(hosts)[0], PASS, "hosts list validation failed, list is %s" % hosts)
+ if len(hosts) < 2:
+ self.skipTest("This test requires at least two hosts present in the zone")
+ domain = self.domain
+ account = None
+ self.services["shared_network_sg"]["acltype"] = "domain"
+ if value == "domain":
+ domain = Domain.create(self.api_client, services=self.services["domain"],
+ parentdomainid=self.domain.id)
+ self.cleanup_domains.append(domain)
+ elif value == "account":
+ account = Account.create(self.api_client,self.services["account"],admin=True,
+ domainid=self.domain.id)
+ self.cleanup_accounts.append(account)
+ self.services["shared_network_sg"]["acltype"] = "account"
+
+ physical_network, vlan = get_free_vlan(self.api_client, self.zone.id)
+ #create network using the shared network offering created
+ self.services["shared_network_sg"]["vlan"] = vlan
+ self.services["shared_network_sg"]["networkofferingid"] = self.shared_network_offering_sg.id
+ self.services["shared_network_sg"]["physicalnetworkid"] = physical_network.id
+
+ self.setSharedNetworkParams("shared_network_sg")
+
+ shared_network_sg = Network.create(self.api_client, self.services["shared_network_sg"],
+ accountid=account.name if account else None,
+ domainid=domain.id,
+ networkofferingid=self.shared_network_offering_sg.id,
+ zoneid=self.zone.id)
+
+ if value == "domain" or value == "zone":
+ self.cleanup_networks.append(shared_network_sg)
+
+ self.debug("Created %s wide shared network %s" % (value,shared_network_sg.id))
+
+ virtual_machine = VirtualMachine.create(self.api_client,self.services["virtual_machine"],
+ accountid=account.name if account else None,
+ domainid=domain.id,
+ networkids=[shared_network_sg.id],
+ serviceofferingid=self.service_offering.id
+ )
+ self.cleanup_vms.append(virtual_machine)
+ hostid = virtual_machine.hostid
+
+ securitygroupid = virtual_machine.securitygroup[0].id
+
+ # Put host in maintenance mode
+ Host.enableMaintenance(self.api_client, id=hostid)
+
+ vm_list = list_virtual_machines(self.api_client, id=virtual_machine.id)
+ self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, vm list is: %s" % vm_list)
+ self.assertEqual(vm_list[0].securitygroup[0].id, securitygroupid, "Security group id should remain same, before\
+ it was %s and after putting host in maintenance mode, it is %s" % (securitygroupid, vm_list[0].securitygroup[0].id))
+
+ # Cancel host maintenance
+ Host.cancelMaintenance(self.api_client, id=hostid)
+ vm_list = list_virtual_machines(self.api_client, id=virtual_machine.id)
+ self.assertEqual(validateList(vm_list)[0], PASS, "vm list validation failed, vm list is: %s" % vm_list)
+ self.assertEqual(vm_list[0].securitygroup[0].id, securitygroupid, "Security group id should remain same, before\
+ it was %s and after putting host in maintenance mode, it is %s" % (securitygroupid, vm_list[0].securitygroup[0].id))
+
+ return
+
+@ddt
+class TestAccountBasedIngressRules(cloudstackTestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cloudstackTestClient = super(TestAccountBasedIngressRules,cls).getClsTestClient()
+ cls.api_client = cloudstackTestClient.getApiClient()
+
+ cls.services = cloudstackTestClient.getConfigParser().parsedDict
+
+ # Get Zone, Domain and templates
+ cls.domain = get_domain(cls.api_client, cls.services)
+ cls.zone = get_zone(cls.api_client, cls.services)
+ cls.template = get_template(cls.api_client,cls.zone.id,cls.services["ostype"])
+
+ cls.services["virtual_machine"]["zoneid"] = cls.zone.id
+ cls.services["virtual_machine"]["template"] = cls.template.id
+
+ cls.service_offering = ServiceOffering.create(cls.api_client,cls.services["service_offering"])
+
+ cls.services["shared_network_offering_sg"]["specifyVlan"] = "True"
+ cls.services["shared_network_offering_sg"]["specifyIpRanges"] = "True"
+ #Create Network Offering
+ cls.shared_network_offering_sg = NetworkOffering.create(cls.api_client,cls.services["shared_network_offering_sg"],conservemode=False)
+
+ #Update network offering state from disabled to enabled.
+ NetworkOffering.update(cls.shared_network_offering_sg,cls.api_client,state="enabled")
+
+ cls._cleanup = [
+ cls.service_offering,
+ cls.shared_network_offering_sg
+ ]
+ return
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ #Update network offering state from enabled to disabled.
+ cls.shared_network_offering_sg.update(cls.api_client,state="disabled")
+ #Cleanup resources used
+ cleanup_resources(cls.api_client, cls._cleanup)
+ except Exception as e:
+ raise Exception("Warning: Exception during cleanup : %s" % e)
+ return
+
+ def setUp(self):
+ self.api_client = self.testClient.getApiClient()
+ self.dbclient = self.testClient.getDbConnection()
+ self.cleanup = []
+ self.cleanup_networks = []
+ self.cleanup_accounts = []
+ self.cleanup_domains = []
+ self.cleanup_projects = []
+ self.cleanup_vms = []
+ self.cleanup_secGrps = []
+
+ #Create account 1
+ self.account_1 = Account.create(self.api_client, self.services["account"],domainid=self.domain.id)
+ self.cleanup_accounts.append(self.account_1)
+ self.debug("Created account 1: %s" % self.account_1.name)
+
+ #Create account 2
+ self.account_2 = Account.create(self.api_client, self.services["account"],domainid=self.domain.id)
+ self.cleanup_accounts.append(self.account_2)
+ self.debug("Created account 2: %s" % self.account_2.name)
+
+ self.debug("Deploying virtual machine in account 1: %s" % self.account_1.name)
+ self.virtual_machine_1 = VirtualMachine.create(self.api_client,self.services["virtual_machine"],accountid=self.account_1.name,
+ domainid=self.account_1.domainid,serviceofferingid=self.service_offering.id)
+ self.cleanup_vms.append(self.virtual_machine_1)
+ self.debug("Deployed vm: %s" % self.virtual_machine_1.id)
+
+ self.debug("Deploying virtual machine in account 2: %s" % self.account_2.name)
+ self.virtual_machine_2 = VirtualMachine.create(self.api_client,self.services["virtual_machine"],accountid=self.account_2.name,
+ domainid=self.account_2.domainid,serviceofferingid=self.service_offering.id)
+ self.cleanup_vms.append(self.virtual_machine_2)
+ self.debug("Deployed vm: %s" % self.virtual_machine_2.id)
+
+ # Getting default security group of account 1
+ securitygroups_account_1 = SecurityGroup.list(self.api_client, account=self.account_1.name, domainid=self.account_1.domainid)
+
+ self.assertEqual(validateList(securitygroups_account_1)[0], PASS, "securitygroups list validation\
+ failed, securitygroups list is %s" % securitygroups_account_1)
+ self.sec_grp_1 = securitygroups_account_1[0]
+
+ # Getting default security group of account 2
+ securitygroups_account_2 = SecurityGroup.list(self.api_client, account=self.account_2.name, domainid=self.account_2.domainid)
+
+ self.assertEqual(validateList(securitygroups_account_2)[0], PASS, "securitygroups list validation\
+ failed, securitygroups list is %s" % securitygroups_account_2)
+ self.sec_grp_2 = securitygroups_account_2[0]
+ return
+
+ def tearDown(self):
+ exceptions = []
+ try:
+ #Clean up, terminate the created network offerings
+ cleanup_resources(self.api_client, self.cleanup)
+ except Exception as e:
+ self.debug("Warning: Exception during cleanup : %s" % e)
+ exceptions.append(e)
+
+ #below components is not a part of cleanup because to mandate the order and to cleanup network
+ try:
+ for vm in self.cleanup_vms:
+ vm.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning: Exception during virtual machines cleanup : %s" % e)
+ exceptions.append(e)
+
+ # Wait for VMs to expunge
+ wait_for_cleanup(self.api_client, ["expunge.delay", "expunge.interval"])
+
+ try:
+ for sec_grp in self.cleanup_secGrps:
+ sec_grp.delete(self.api_client)
+ except Exception as e:
+ self.debug("Warning : Exception during security groups cleanup: %s" % e)
+ exceptions.append(e)
+
+ try:
+ for p
<TRUNCATED>