You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/03/02 21:14:47 UTC

[GitHub] [iceberg] szehon-ho commented on a change in pull request #2266: Implement python drop_table

szehon-ho commented on a change in pull request #2266:
URL: https://github.com/apache/iceberg/pull/2266#discussion_r585913438



##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                                cpu_count())) as delete_pool:
+            delete_pool.map(self.__delete_file, self.__get_files(manifests))
+            delete_pool.map(self.__delete_file, (m.manifest_path for m in manifests_to_delete))

Review comment:
       Can add a check in delete_file first

##########
File path: python/tox.ini
##########
@@ -88,7 +88,7 @@ commands =
 #     python -m http.server {posargs}
 
 [flake8]
-ignore = E501,W503
+ignore = E501,W503,C901

Review comment:
       Will give a try

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())

Review comment:
       Yes weird, will check

##########
File path: python/tests/hive/test_hive_tables.py
##########
@@ -157,3 +157,21 @@ def test_create_tables(client, current_call, base_scan_schema, base_scan_partiti
     current_call.return_value = tbl[0].args[0].parameters['metadata_location']
 
     tables.load("test.test_123")
+
+
+@mock.patch("iceberg.hive.HiveTableOperations.refresh_from_metadata_location")
+@mock.patch("iceberg.hive.HiveTables.delete_file")
+@mock.patch("iceberg.hive.HiveTableOperations.current")
+@mock.patch("iceberg.hive.HiveTables.get_client")
+def test_drop_tables(client, metadata, delete_file, refresh_call, tmpdir):

Review comment:
       Will do

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,
+                                cpu_count())) as delete_pool:
+            delete_pool.map(self.__delete_file, self.__get_files(manifests))

Review comment:
       Got it

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()
+
+        if purge:
+            if metadata is not None:
+                for s in metadata.snapshots:
+                    manifests_to_delete = itertools.chain(manifests_to_delete, s.manifests)
+                    if s.manifest_location is not None:
+                        manifest_lists_to_delete.append(s.manifest_location())
+
+        # Make a copy, as it is drained as we explore the manifest to list files.
+        (manifests, manifests_to_delete) = itertools.tee(manifests_to_delete)
+
+        with Pool(self.conf.get(WORKER_THREAD_POOL_SIZE_PROP,

Review comment:
       OK let me know, to be honest, I'm not familiar with async either.

##########
File path: python/iceberg/hive/hive_tables.py
##########
@@ -34,8 +41,45 @@ def __init__(self, conf):
     def new_table_ops(self, conf, database, table):
         return HiveTableOperations(conf, self.get_client(), database, table)
 
-    def drop(self, database, table):
-        raise RuntimeError("Not yet implemented")
+    def drop(self, database, table, purge=False):
+        ops = self.new_table_ops(self.conf, database, table)
+        metadata = ops.current()
+
+        with self.get_client() as open_client:
+            _logger.info("Deleting {database}.{table} from Hive Metastore".format(database=database, table=table))
+            open_client.drop_table(database, table, deleteData=False)
+
+        manifest_lists_to_delete = []
+        manifests_to_delete = itertools.chain()

Review comment:
       Hi, I was a bit confused but I guess you mean you do not prefer itertools/iterators?
   
    base_snapshot.manifests is a generator function, which is why I started this iterator stuff in the first place (believe me I tried to avoid it:).  But I suppose it is more performant this way?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org