You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@libcloud.apache.org by cl...@apache.org on 2021/05/07 01:53:27 UTC

[libcloud] 01/02: Enable tests to work with existing containers

This is an automated email from the ASF dual-hosted git repository.

clewolff pushed a commit to branch storage-s3-integration-tests
in repository https://gitbox.apache.org/repos/asf/libcloud.git

commit 892d97952fc76e789d18d678d0c4ea01af1b89b6
Author: Clemens Wolff <cl...@apache.org>
AuthorDate: Thu May 6 21:51:28 2021 -0400

    Enable tests to work with existing containers
---
 integration/storage/base.py | 41 +++++++++++++++++++++++------------------
 1 file changed, 23 insertions(+), 18 deletions(-)

diff --git a/integration/storage/base.py b/integration/storage/base.py
index 56786cd..902ccd6 100644
--- a/integration/storage/base.py
+++ b/integration/storage/base.py
@@ -45,6 +45,9 @@ class Integration:
         account = None
         secret = None
 
+        container_name_prefix = 'lcsit'
+        container_name_max_length = 63
+
         def setUp(self):
             for required in 'provider', 'account', 'secret':
                 value = getattr(self, required, None)
@@ -63,6 +66,9 @@ class Integration:
 
         def tearDown(self):
             for container in self.driver.list_containers():
+                if not container.name.startswith(self.container_name_prefix):
+                    continue
+
                 for obj in container.list_objects():
                     try:
                         obj.delete()
@@ -86,7 +92,7 @@ class Integration:
 
         def test_containers(self):
             # make a new container
-            container_name = random_container_name()
+            container_name = self._random_container_name()
             container = self.driver.create_container(container_name)
             self.assertEqual(container.name, container_name)
             container = self.driver.get_container(container_name)
@@ -98,7 +104,7 @@ class Integration:
 
             # check that the new container can be listed
             containers = self.driver.list_containers()
-            self.assertEqual([c.name for c in containers], [container_name])
+            self.assertIn(container_name, [c.name for c in containers])
 
             # delete the container
             self.driver.delete_container(container)
@@ -109,12 +115,12 @@ class Integration:
 
             # check that the container is deleted
             containers = self.driver.list_containers()
-            self.assertEqual([c.name for c in containers], [])
+            self.assertNotIn(container_name, [c.name for c in containers])
 
         def _test_objects(self, do_upload, do_download, size=1 * MB):
             content = os.urandom(size)
             blob_name = 'testblob'
-            container = self.driver.create_container(random_container_name())
+            container = self.driver.create_container(self._random_container_name())
 
             # upload a file
             obj = do_upload(container, blob_name, content)
@@ -167,7 +173,7 @@ class Integration:
         def test_objects_range_downloads(self):
             blob_name = 'testblob-range'
             content = b'0123456789'
-            container = self.driver.create_container(random_container_name())
+            container = self.driver.create_container(self._random_container_name())
 
             obj = self.driver.upload_object(
                 self._create_tempfile(content=content),
@@ -255,7 +261,7 @@ class Integration:
         def test_upload_via_stream_with_content_encoding(self):
             object_name = 'content_encoding.gz'
             content = gzip.compress(os.urandom(MB // 100))
-            container = self.driver.create_container(random_container_name())
+            container = self.driver.create_container(self._random_container_name())
             self.driver.upload_object_via_stream(
                 iter(content),
                 container,
@@ -269,7 +275,7 @@ class Integration:
 
         def test_cdn_url(self):
             content = os.urandom(MB // 100)
-            container = self.driver.create_container(random_container_name())
+            container = self.driver.create_container(self._random_container_name())
             obj = self.driver.upload_object_via_stream(iter(content), container, 'cdn')
 
             response = requests.get(self.driver.get_object_cdn_url(obj))
@@ -284,6 +290,16 @@ class Integration:
             self.addCleanup(os.remove, path)
             return path
 
+        @classmethod
+        def _random_container_name(cls):
+            suffix = random_string(cls.container_name_max_length)
+            name = cls.container_name_prefix + suffix
+            name = re.sub('[^a-z0-9-]', '-', name)
+            name = re.sub('-+', '-', name)
+            name = name[:cls.container_name_max_length]
+            name = name.lower()
+            return name
+
     class ContainerTestBase(TestBase):
         image = None
         version = 'latest'
@@ -373,17 +389,6 @@ def random_string(length, alphabet=string.ascii_lowercase + string.digits):
     return ''.join(random.choice(alphabet) for _ in range(length))
 
 
-def random_container_name(prefix='test'):
-    max_length = 63
-    suffix = random_string(max_length)
-    name = prefix + suffix
-    name = re.sub('[^a-z0-9-]', '-', name)
-    name = re.sub('-+', '-', name)
-    name = name[:max_length]
-    name = name.lower()
-    return name
-
-
 def read_stream(stream):
     buffer = io.BytesIO()
     buffer.writelines(stream)