You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/02/25 14:57:25 UTC

[GitHub] [iceberg] rymurr commented on a change in pull request #2266: Implement python drop_table

rymurr commented on a change in pull request #2266:
URL: https://github.com/apache/iceberg/pull/2266#discussion_r582884677



##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())

Review comment:
       `s.manifest_location` is a property, `s.manifest_location()` should throw an error wen called like that shouldn't it?

##########
File path: python/tox.ini
##########
@@ -88,7 +88,7 @@ commands =
 #     python -m http.server {posargs}
 
 [flake8]
-ignore = E501,W503
+ignore = E501,W503,C901

Review comment:
       this can be removed once #2268 is merged

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):

Review comment:
       can you add mypy annotations to any new code please?

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,

Review comment:
       I think I would prefer this to explicitly use the threading or asyncio APIs. I know this is using the threading version of multiprocessing but I don't know why?
   
   @rdblue @TGooch44 is there a historical reason to use the `dummy` multiprocessing API for IO bound processes? I would think that using threading directly would be more clear and asyncio would be 'cooler'

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                                cpu_count())) as delete_pool:
+            delete_pool.map(self.__delete_file, self.__get_files(manifests))
+            delete_pool.map(self.__delete_file, (m.manifest_path for m in manifests_to_delete))
+            delete_pool.map(self.__delete_file, manifest_lists_to_delete)
+            delete_pool.map(self.__delete_file, [ops.current_metadata_location])
+
+    def __get_files(self, manifests):
+        return itertools.chain.from_iterable((self.__get_data_files_by_manifest(m) for m in manifests))
+
+    def __get_data_files_by_manifest(self, manifest):
+        file = FileSystemInputFile.from_location(manifest.manifest_path, self.conf)
+        reader = ManifestReader.read(file)
+        return (i.path() for i in reader.iterator())
+
+    def __delete_file(self, path):

Review comment:
       This is also available through the table ops.

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()

Review comment:
       broadly speaking I prefer the use of iterators/itertools where possible. Currently I think the underlying data structures are all `list` structures and aren't lazy so the use of iterators to collect files for deletion is a bit distracting. 

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                                cpu_count())) as delete_pool:
+            delete_pool.map(self.__delete_file, self.__get_files(manifests))

Review comment:
       I believe in other places we denote private methods with one `_`

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                                cpu_count())) as delete_pool:
+            delete_pool.map(self.__delete_file, self.__get_files(manifests))
+            delete_pool.map(self.__delete_file, (m.manifest_path for m in manifests_to_delete))

Review comment:
       manifests can be in multiple snapshots and `__delete_file` will throw an exception if a file doesn't exist

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                                cpu_count())) as delete_pool:
+            delete_pool.map(self.__delete_file, self.__get_files(manifests))
+            delete_pool.map(self.__delete_file, (m.manifest_path for m in manifests_to_delete))
+            delete_pool.map(self.__delete_file, manifest_lists_to_delete)
+            delete_pool.map(self.__delete_file, [ops.current_metadata_location])
+
+    def __get_files(self, manifests):
+        return itertools.chain.from_iterable((self.__get_data_files_by_manifest(m) for m in manifests))
+
+    def __get_data_files_by_manifest(self, manifest):
+        file = FileSystemInputFile.from_location(manifest.manifest_path, self.conf)

Review comment:
       I believe we can get the reader direct from `BaseSnapshot.get_filtered_manifest()`

##########
File path: python/tests/hive/test_hive_tables.py
##########
@@ -157,3 +157,21 @@ def test_create_tables(client, current_call, base_scan_schema, base_scan_partiti
     current_call.return_value = tbl[0].args[0].parameters['metadata_location']
 
     tables.load("test.test_123")
+
+
+@mock.patch("iceberg.hive.HiveTableOperations.refresh_from_metadata_location")
+@mock.patch("iceberg.hive.HiveTables.delete_file")
+@mock.patch("iceberg.hive.HiveTableOperations.current")
+@mock.patch("iceberg.hive.HiveTables.get_client")
+def test_drop_tables(client, metadata, delete_file, refresh_call, tmpdir):

Review comment:
       I think it would be good to test purge as well tbh, most of the code complexity in this change lives there.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org