You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2022/11/21 14:35:36 UTC

[iceberg] branch master updated: Python: Add support for GlueCatalog (#6034)

This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f7ac12f5a4 Python: Add support for GlueCatalog (#6034)
f7ac12f5a4 is described below

commit f7ac12f5a47caa9b9e442c39fbb0b3f7cd87f696
Author: Rushan Jiang <ru...@andrew.cmu.edu>
AuthorDate: Mon Nov 21 09:35:30 2022 -0500

    Python: Add support for GlueCatalog (#6034)
    
    * add glueCatalog bone
    
    * add boto3
    
    * first draft based on previous PR
    
    * add personal test and implements load_table
    
    * test create table functionality
    
    * tested drop table functionality
    
    * reformat create_table and load_table and perform some test
    
    * add some todo item
    
    * reimplement create_table and load_table
    
    * update load_table's io initialization
    
    * suggestions 1 - 4
    
    * remain: refactor constant name
    
    * refactor code structure and add exception caught
    
    * fix some typo and make constants defined above
    
    * format fix partially
    
    * update environment
    
    * fix style issue
    
    * let comment be more specific
    
    * add ignore missing boto3 to mypy config
    
    * skip test_glue temporarily
    
    * fix typo PROP_GLUE_TABLE_TYPE
    
    * change dict[str, str] to properties
    
    * fix all sanity issue
    
    * make boto3 optional
    
    * implement list namespaces, change aws account for test
    
    * fix keyerror in default warehouse location
    
    * fix keyerror in default warehouse location
    
    * update apache header
    
    * add unit test prototype
    
    * format unit test
    
    * fix the fixture scope issue
    
    * add comment for the gist
    
    * add apache head
    
    * move fixtures to conftest in utils
    
    * rollback changes to hive apache license header
    
    * update nit, error messages. solve case sensitive error. remove duplicate code in create_table
    
    * update format
    
    * put io instantiation to _convert_glue_to_iceberg and make it consistent
    
    * make boto3 and moto extra
    
    * update makeFile to include boto3 and moto
    
    * add NoSuchIcebergTableError
    
    * use NoSuchPropertyException
    
    * make global var consistent with hive
    
    * move common global variable to pyiceberg.catalog.base
    
    * complete rest of gluecatalog first_draft
    
    * add comments for class methods
    
    * add integration test and fix some small bugs
    
    * add next_token for list_tables
    
    * add next_token for list_namespaces
    
    * make create_table's description key consistent with that of create_namespace
    
    * add unit tests for table-related operations
    
    * add unit tests for namespace-related operations
    
    * formalize integration test
    
    * aggregate all GlueCatalog instance to a fixture
    
    * remove redundant print
    
    * optimize test for rename table
    
    * refactor duplicated code segment and details
    
    * fix some potential keyError issue
    
    * reimplement purge_table to correct logic
    
    * add load glue catalog and move shared variables to __init__.py
    
    * fix format issue
    
    * move delete_data_files, delete_files to __init__.py
    
    * rename and make constant strings variables
    
    * fix minor style issues
    
    * fix configuration after rebase
    
    * add doc for glue catalog
    
    * handle loading database created by AWS Glue Console
    
    * add comment to helper functions
    
    * add check to disable hierarchical namespace, add removing strips for database_location and warehouse path
    
    * refactor the rename_table logic to add check if the table is a valid iceberg table
    
    * revise comments
    
    * fix create table input key error
    
    * persist table description when renaming the table
    
    * update after rebase
---
 python/Makefile                               |   2 +-
 python/mkdocs/docs/index.md                   |   9 +
 python/poetry.lock                            | 286 +++++++++++++-
 python/pyiceberg/catalog/__init__.py          |  59 +++
 python/pyiceberg/catalog/glue.py              | 549 ++++++++++++++++++++++++++
 python/pyiceberg/exceptions.py                |   4 +
 python/pyproject.toml                         |  19 +
 python/tests/catalog/integration_test_glue.py | 324 +++++++++++++++
 python/tests/catalog/test_glue.py             | 471 ++++++++++++++++++++++
 python/tests/conftest.py                      | 113 ++++++
 10 files changed, 1831 insertions(+), 5 deletions(-)

diff --git a/python/Makefile b/python/Makefile
index c934a563d0..32cd895f53 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -17,7 +17,7 @@
 
 install:
 	pip install poetry
-	poetry install -E pyarrow -E hive -E s3fs
+	poetry install -E pyarrow -E hive -E s3fs -E glue
 
 check-license:
 	./dev/check-license
diff --git a/python/mkdocs/docs/index.md b/python/mkdocs/docs/index.md
index b7597d0f00..c05af4effa 100644
--- a/python/mkdocs/docs/index.md
+++ b/python/mkdocs/docs/index.md
@@ -40,6 +40,7 @@ You can mix and match optional dependencies:
 | Key       | Description:                                                         |
 |-----------|----------------------------------------------------------------------|
 | hive      | Support for the Hive metastore                                       |
+| glue      | Support for AWS Glue                                                 |
 | pyarrow   | PyArrow as a FileIO implementation to interact with the object store |
 | s3fs      | S3FS as a FileIO implementation to interact with the object store    |
 | snappy    | Support for snappy Avro compression                                  |
@@ -99,6 +100,9 @@ catalog:
         cert: /absolute/path/to/client.crt
         key: /absolute/path/to/client.key
       cabundle: /absolute/path/to/cabundle.pem
+
+  glue:
+    type: glue
 ```
 
 Lastly, you can also set it using environment variables:
@@ -108,10 +112,15 @@ export PYICEBERG_CATALOG__DEFAULT__URI=thrift://localhost:9083
 
 export PYICEBERG_CATALOG__REST__URI=http://rest-catalog/ws/
 export PYICEBERG_CATALOG__REST__CREDENTIAL=t-1234:secret
+
+export PYICEBERG_CATALOG__GLUE__TYPE=glue
 ```
 
 Where the structure is equivalent to the YAML. The levels are separated using a double underscore (`__`).
 
+If you want to use AWS Glue as the catalog, you can use the last two ways to configure the pyiceberg and refer
+[How to configure AWS credentials](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html) to set your AWS account credentials locally.
+
 ## FileIO configuration
 
 For the FileIO there are several configuration options available:
diff --git a/python/poetry.lock b/python/poetry.lock
index e5157bda8c..caccecda60 100644
--- a/python/poetry.lock
+++ b/python/poetry.lock
@@ -80,12 +80,28 @@ docs = ["furo", "sphinx", "sphinx-notfound-page", "zope.interface"]
 tests = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy (>=0.900,!=0.940)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "zope.interface"]
 tests-no-zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy (>=0.900,!=0.940)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins"]
 
+[[package]]
+name = "boto3"
+version = "1.24.59"
+description = "The AWS SDK for Python"
+category = "main"
+optional = false
+python-versions = ">= 3.7"
+
+[package.dependencies]
+botocore = ">=1.27.59,<1.28.0"
+jmespath = ">=0.7.1,<2.0.0"
+s3transfer = ">=0.6.0,<0.7.0"
+
+[package.extras]
+crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
+
 [[package]]
 name = "botocore"
 version = "1.27.59"
 description = "Low-level, data-driven core of boto 3."
 category = "main"
-optional = true
+optional = false
 python-versions = ">= 3.7"
 
 [package.dependencies]
@@ -198,6 +214,25 @@ tomli = {version = "*", optional = true, markers = "python_full_version <= \"3.1
 [package.extras]
 toml = ["tomli"]
 
+[[package]]
+name = "cryptography"
+version = "38.0.3"
+description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
+category = "dev"
+optional = false
+python-versions = ">=3.6"
+
+[package.dependencies]
+cffi = ">=1.12"
+
+[package.extras]
+docs = ["sphinx (>=1.6.5,!=1.8.0,!=3.1.0,!=3.1.1)", "sphinx-rtd-theme"]
+docstest = ["pyenchant (>=1.6.11)", "sphinxcontrib-spelling (>=4.0.1)", "twine (>=1.12.0)"]
+pep8test = ["black", "flake8", "flake8-import-order", "pep8-naming"]
+sdist = ["setuptools-rust (>=0.11.4)"]
+ssh = ["bcrypt (>=3.1.5)"]
+test = ["hypothesis (>=1.11.4,!=3.79.2)", "iso8601", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-subtests", "pytest-xdist", "pytz"]
+
 [[package]]
 name = "distlib"
 version = "0.3.6"
@@ -333,12 +368,34 @@ category = "dev"
 optional = false
 python-versions = "*"
 
+[[package]]
+name = "jinja2"
+version = "3.1.2"
+description = "A very fast and expressive template engine."
+category = "dev"
+optional = false
+python-versions = ">=3.7"
+
+[package.dependencies]
+MarkupSafe = ">=2.0"
+
+[package.extras]
+i18n = ["Babel (>=2.7)"]
+
 [[package]]
 name = "jmespath"
 version = "1.0.1"
 description = "JSON Matching Expressions"
 category = "main"
-optional = true
+optional = false
+python-versions = ">=3.7"
+
+[[package]]
+name = "markupsafe"
+version = "2.1.1"
+description = "Safely add untrusted strings to HTML/XML markup."
+category = "dev"
+optional = false
 python-versions = ">=3.7"
 
 [[package]]
@@ -349,6 +406,51 @@ category = "main"
 optional = false
 python-versions = "*"
 
+[[package]]
+name = "moto"
+version = "4.0.9"
+description = "A library that allows your python tests to easily mock out the boto library"
+category = "dev"
+optional = false
+python-versions = ">=3.6"
+
+[package.dependencies]
+boto3 = ">=1.9.201"
+botocore = ">=1.12.201"
+cryptography = ">=3.3.1"
+Jinja2 = ">=2.10.1"
+MarkupSafe = "!=2.0.0a1"
+python-dateutil = ">=2.1,<3.0.0"
+pytz = "*"
+requests = ">=2.5"
+responses = ">=0.13.0"
+werkzeug = ">=0.5,<2.2.0 || >2.2.0,<2.2.1 || >2.2.1"
+xmltodict = "*"
+
+[package.extras]
+all = ["PyYAML (>=5.1)", "aws-xray-sdk (>=0.93,!=0.96)", "cfn-lint (>=0.4.0)", "docker (>=2.5.1)", "ecdsa (!=0.15)", "graphql-core", "idna (>=2.5,<4)", "jsondiff (>=1.1.2)", "openapi-spec-validator (>=0.2.8)", "pyparsing (>=3.0.7)", "python-jose[cryptography] (>=3.1.0,<4.0.0)", "setuptools", "sshpubkeys (>=3.1.0)"]
+apigateway = ["PyYAML (>=5.1)", "ecdsa (!=0.15)", "openapi-spec-validator (>=0.2.8)", "python-jose[cryptography] (>=3.1.0,<4.0.0)"]
+apigatewayv2 = ["PyYAML (>=5.1)"]
+appsync = ["graphql-core"]
+awslambda = ["docker (>=2.5.1)"]
+batch = ["docker (>=2.5.1)"]
+cloudformation = ["PyYAML (>=5.1)", "aws-xray-sdk (>=0.93,!=0.96)", "cfn-lint (>=0.4.0)", "docker (>=2.5.1)", "ecdsa (!=0.15)", "graphql-core", "idna (>=2.5,<4)", "jsondiff (>=1.1.2)", "openapi-spec-validator (>=0.2.8)", "pyparsing (>=3.0.7)", "python-jose[cryptography] (>=3.1.0,<4.0.0)", "setuptools", "sshpubkeys (>=3.1.0)"]
+cognitoidp = ["ecdsa (!=0.15)", "python-jose[cryptography] (>=3.1.0,<4.0.0)"]
+ds = ["sshpubkeys (>=3.1.0)"]
+dynamodb = ["docker (>=2.5.1)"]
+dynamodb2 = ["docker (>=2.5.1)"]
+dynamodbstreams = ["docker (>=2.5.1)"]
+ebs = ["sshpubkeys (>=3.1.0)"]
+ec2 = ["sshpubkeys (>=3.1.0)"]
+efs = ["sshpubkeys (>=3.1.0)"]
+glue = ["pyparsing (>=3.0.7)"]
+iotdata = ["jsondiff (>=1.1.2)"]
+route53resolver = ["sshpubkeys (>=3.1.0)"]
+s3 = ["PyYAML (>=5.1)"]
+server = ["PyYAML (>=5.1)", "aws-xray-sdk (>=0.93,!=0.96)", "cfn-lint (>=0.4.0)", "docker (>=2.5.1)", "ecdsa (!=0.15)", "flask (!=2.2.0,!=2.2.1)", "flask-cors", "graphql-core", "idna (>=2.5,<4)", "jsondiff (>=1.1.2)", "openapi-spec-validator (>=0.2.8)", "pyparsing (>=3.0.7)", "python-jose[cryptography] (>=3.1.0,<4.0.0)", "setuptools", "sshpubkeys (>=3.1.0)"]
+ssm = ["PyYAML (>=5.1)", "dataclasses"]
+xray = ["aws-xray-sdk (>=0.93,!=0.96)", "setuptools"]
+
 [[package]]
 name = "multidict"
 version = "6.0.2"
@@ -536,7 +638,7 @@ name = "python-dateutil"
 version = "2.8.2"
 description = "Extensions to the standard Python datetime module"
 category = "main"
-optional = true
+optional = false
 python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
 
 [package.dependencies]
@@ -550,6 +652,14 @@ category = "main"
 optional = true
 python-versions = "*"
 
+[[package]]
+name = "pytz"
+version = "2022.6"
+description = "World timezone definitions, modern and historical"
+category = "dev"
+optional = false
+python-versions = "*"
+
 [[package]]
 name = "pyyaml"
 version = "6.0"
@@ -592,6 +702,23 @@ six = "*"
 fixture = ["fixtures"]
 test = ["fixtures", "mock", "purl", "pytest", "requests-futures", "sphinx", "testrepository (>=0.0.18)", "testtools"]
 
+[[package]]
+name = "responses"
+version = "0.22.0"
+description = "A utility library for mocking out the `requests` Python library."
+category = "dev"
+optional = false
+python-versions = ">=3.7"
+
+[package.dependencies]
+requests = ">=2.22.0,<3.0"
+toml = "*"
+types-toml = "*"
+urllib3 = ">=1.25.10"
+
+[package.extras]
+tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "types-requests"]
+
 [[package]]
 name = "rich"
 version = "12.6.0"
@@ -625,6 +752,20 @@ fsspec = "2022.10.0"
 awscli = ["aiobotocore[awscli] (>=2.4.0,<2.5.0)"]
 boto3 = ["aiobotocore[boto3] (>=2.4.0,<2.5.0)"]
 
+[[package]]
+name = "s3transfer"
+version = "0.6.0"
+description = "An Amazon S3 Transfer Manager"
+category = "main"
+optional = false
+python-versions = ">= 3.7"
+
+[package.dependencies]
+botocore = ">=1.12.36,<2.0a.0"
+
+[package.extras]
+crt = ["botocore[crt] (>=1.20.29,<2.0a.0)"]
+
 [[package]]
 name = "setuptools"
 version = "65.5.1"
@@ -678,6 +819,14 @@ category = "dev"
 optional = false
 python-versions = ">=3.7"
 
+[[package]]
+name = "types-toml"
+version = "0.10.8.1"
+description = "Typing stubs for toml"
+category = "dev"
+optional = false
+python-versions = "*"
+
 [[package]]
 name = "typing-extensions"
 version = "4.4.0"
@@ -716,6 +865,20 @@ platformdirs = ">=2.4,<3"
 docs = ["proselint (>=0.13)", "sphinx (>=5.3)", "sphinx-argparse (>=0.3.2)", "sphinx-rtd-theme (>=1)", "towncrier (>=22.8)"]
 testing = ["coverage (>=6.2)", "coverage-enable-subprocess (>=1)", "flaky (>=3.7)", "packaging (>=21.3)", "pytest (>=7.0.1)", "pytest-env (>=0.6.2)", "pytest-freezegun (>=0.4.2)", "pytest-mock (>=3.6.1)", "pytest-randomly (>=3.10.3)", "pytest-timeout (>=2.1)"]
 
+[[package]]
+name = "werkzeug"
+version = "2.2.2"
+description = "The comprehensive WSGI web application library."
+category = "dev"
+optional = false
+python-versions = ">=3.7"
+
+[package.dependencies]
+MarkupSafe = ">=2.1.1"
+
+[package.extras]
+watchdog = ["watchdog"]
+
 [[package]]
 name = "wrapt"
 version = "1.14.1"
@@ -724,6 +887,14 @@ category = "main"
 optional = true
 python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
 
+[[package]]
+name = "xmltodict"
+version = "0.13.0"
+description = "Makes working with XML feel like you are working with JSON"
+category = "dev"
+optional = false
+python-versions = ">=3.4"
+
 [[package]]
 name = "yarl"
 version = "1.8.1"
@@ -763,6 +934,7 @@ cffi = {version = ">=1.11", markers = "platform_python_implementation == \"PyPy\
 cffi = ["cffi (>=1.11)"]
 
 [extras]
+glue = ["boto3"]
 hive = ["thrift"]
 pyarrow = ["pyarrow"]
 s3fs = ["s3fs"]
@@ -771,7 +943,7 @@ snappy = ["python-snappy"]
 [metadata]
 lock-version = "1.1"
 python-versions = "^3.8"
-content-hash = "fda6f8ad5abce655e854325c9e0e944c6ad818487479c97078d1dce93496465e"
+content-hash = "588c954b75cf70e3c695dfe27b859be09c183323becda37ebfdde00caa5730fd"
 
 [metadata.files]
 aiobotocore = [
@@ -883,6 +1055,10 @@ attrs = [
     {file = "attrs-22.1.0-py2.py3-none-any.whl", hash = "sha256:86efa402f67bf2df34f51a335487cf46b1ec130d02b8d39fd248abfd30da551c"},
     {file = "attrs-22.1.0.tar.gz", hash = "sha256:29adc2665447e5191d0e7c568fde78b21f9672d344281d0c6e1ab085429b22b6"},
 ]
+boto3 = [
+    {file = "boto3-1.24.59-py3-none-any.whl", hash = "sha256:34ab44146a2c4e7f4e72737f4b27e6eb5e0a7855c2f4599e3d9199b6a0a2d575"},
+    {file = "boto3-1.24.59.tar.gz", hash = "sha256:a50b4323f9579cfe22fcf5531fbd40b567d4d74c1adce06aeb5c95fce2a6fb40"},
+]
 botocore = [
     {file = "botocore-1.27.59-py3-none-any.whl", hash = "sha256:69d756791fc024bda54f6c53f71ae34e695ee41bbbc1743d9179c4837a4929da"},
     {file = "botocore-1.27.59.tar.gz", hash = "sha256:eda4aed6ee719a745d1288eaf1beb12f6f6448ad1fa12f159405db14ba9c92cf"},
@@ -1033,6 +1209,34 @@ coverage = [
     {file = "coverage-6.5.0-pp36.pp37.pp38-none-any.whl", hash = "sha256:1431986dac3923c5945271f169f59c45b8802a114c8f548d611f2015133df77a"},
     {file = "coverage-6.5.0.tar.gz", hash = "sha256:f642e90754ee3e06b0e7e51bce3379590e76b7f76b708e1a71ff043f87025c84"},
 ]
+cryptography = [
+    {file = "cryptography-38.0.3-cp36-abi3-macosx_10_10_universal2.whl", hash = "sha256:984fe150f350a3c91e84de405fe49e688aa6092b3525f407a18b9646f6612320"},
+    {file = "cryptography-38.0.3-cp36-abi3-macosx_10_10_x86_64.whl", hash = "sha256:ed7b00096790213e09eb11c97cc6e2b757f15f3d2f85833cd2d3ec3fe37c1722"},
+    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:bbf203f1a814007ce24bd4d51362991d5cb90ba0c177a9c08825f2cc304d871f"},
+    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:554bec92ee7d1e9d10ded2f7e92a5d70c1f74ba9524947c0ba0c850c7b011828"},
+    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b1b52c9e5f8aa2b802d48bd693190341fae201ea51c7a167d69fc48b60e8a959"},
+    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_24_x86_64.whl", hash = "sha256:728f2694fa743a996d7784a6194da430f197d5c58e2f4e278612b359f455e4a2"},
+    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:dfb4f4dd568de1b6af9f4cda334adf7d72cf5bc052516e1b2608b683375dd95c"},
+    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:5419a127426084933076132d317911e3c6eb77568a1ce23c3ac1e12d111e61e0"},
+    {file = "cryptography-38.0.3-cp36-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:9b24bcff7853ed18a63cfb0c2b008936a9554af24af2fb146e16d8e1aed75748"},
+    {file = "cryptography-38.0.3-cp36-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:25c1d1f19729fb09d42e06b4bf9895212292cb27bb50229f5aa64d039ab29146"},
+    {file = "cryptography-38.0.3-cp36-abi3-win32.whl", hash = "sha256:7f836217000342d448e1c9a342e9163149e45d5b5eca76a30e84503a5a96cab0"},
+    {file = "cryptography-38.0.3-cp36-abi3-win_amd64.whl", hash = "sha256:c46837ea467ed1efea562bbeb543994c2d1f6e800785bd5a2c98bc096f5cb220"},
+    {file = "cryptography-38.0.3-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06fc3cc7b6f6cca87bd56ec80a580c88f1da5306f505876a71c8cfa7050257dd"},
+    {file = "cryptography-38.0.3-pp37-pypy37_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:65535bc550b70bd6271984d9863a37741352b4aad6fb1b3344a54e6950249b55"},
+    {file = "cryptography-38.0.3-pp37-pypy37_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:5e89468fbd2fcd733b5899333bc54d0d06c80e04cd23d8c6f3e0542358c6060b"},
+    {file = "cryptography-38.0.3-pp38-pypy38_pp73-macosx_10_10_x86_64.whl", hash = "sha256:6ab9516b85bebe7aa83f309bacc5f44a61eeb90d0b4ec125d2d003ce41932d36"},
+    {file = "cryptography-38.0.3-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:068147f32fa662c81aebab95c74679b401b12b57494872886eb5c1139250ec5d"},
+    {file = "cryptography-38.0.3-pp38-pypy38_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:402852a0aea73833d982cabb6d0c3bb582c15483d29fb7085ef2c42bfa7e38d7"},
+    {file = "cryptography-38.0.3-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:b1b35d9d3a65542ed2e9d90115dfd16bbc027b3f07ee3304fc83580f26e43249"},
+    {file = "cryptography-38.0.3-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:6addc3b6d593cd980989261dc1cce38263c76954d758c3c94de51f1e010c9a50"},
+    {file = "cryptography-38.0.3-pp39-pypy39_pp73-macosx_10_10_x86_64.whl", hash = "sha256:be243c7e2bfcf6cc4cb350c0d5cdf15ca6383bbcb2a8ef51d3c9411a9d4386f0"},
+    {file = "cryptography-38.0.3-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:78cf5eefac2b52c10398a42765bfa981ce2372cbc0457e6bf9658f41ec3c41d8"},
+    {file = "cryptography-38.0.3-pp39-pypy39_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:4e269dcd9b102c5a3d72be3c45d8ce20377b8076a43cbed6f660a1afe365e436"},
+    {file = "cryptography-38.0.3-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:8d41a46251bf0634e21fac50ffd643216ccecfaf3701a063257fe0b2be1b6548"},
+    {file = "cryptography-38.0.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:785e4056b5a8b28f05a533fab69febf5004458e20dad7e2e13a3120d8ecec75a"},
+    {file = "cryptography-38.0.3.tar.gz", hash = "sha256:bfbe6ee19615b07a98b1d2287d6a6073f734735b49ee45b11324d85efc4d5cbd"},
+]
 distlib = [
     {file = "distlib-0.3.6-py2.py3-none-any.whl", hash = "sha256:f35c4b692542ca110de7ef0bea44d73981caeb34ca0b9b6b2e6d7790dda8f80e"},
     {file = "distlib-0.3.6.tar.gz", hash = "sha256:14bad2d9b04d3a36127ac97f30b12a19268f211063d8f8ee4f47108896e11b46"},
@@ -1164,10 +1368,56 @@ iniconfig = [
     {file = "iniconfig-1.1.1-py2.py3-none-any.whl", hash = "sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3"},
     {file = "iniconfig-1.1.1.tar.gz", hash = "sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32"},
 ]
+jinja2 = [
+    {file = "Jinja2-3.1.2-py3-none-any.whl", hash = "sha256:6088930bfe239f0e6710546ab9c19c9ef35e29792895fed6e6e31a023a182a61"},
+    {file = "Jinja2-3.1.2.tar.gz", hash = "sha256:31351a702a408a9e7595a8fc6150fc3f43bb6bf7e319770cbc0db9df9437e852"},
+]
 jmespath = [
     {file = "jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980"},
     {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"},
 ]
+markupsafe = [
+    {file = "MarkupSafe-2.1.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:86b1f75c4e7c2ac2ccdaec2b9022845dbb81880ca318bb7a0a01fbf7813e3812"},
+    {file = "MarkupSafe-2.1.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f121a1420d4e173a5d96e47e9a0c0dcff965afdf1626d28de1460815f7c4ee7a"},
+    {file = "MarkupSafe-2.1.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a49907dd8420c5685cfa064a1335b6754b74541bbb3706c259c02ed65b644b3e"},
+    {file = "MarkupSafe-2.1.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10c1bfff05d95783da83491be968e8fe789263689c02724e0c691933c52994f5"},
+    {file = "MarkupSafe-2.1.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b7bd98b796e2b6553da7225aeb61f447f80a1ca64f41d83612e6139ca5213aa4"},
+    {file = "MarkupSafe-2.1.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:b09bf97215625a311f669476f44b8b318b075847b49316d3e28c08e41a7a573f"},
+    {file = "MarkupSafe-2.1.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:694deca8d702d5db21ec83983ce0bb4b26a578e71fbdbd4fdcd387daa90e4d5e"},
+    {file = "MarkupSafe-2.1.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:efc1913fd2ca4f334418481c7e595c00aad186563bbc1ec76067848c7ca0a933"},
+    {file = "MarkupSafe-2.1.1-cp310-cp310-win32.whl", hash = "sha256:4a33dea2b688b3190ee12bd7cfa29d39c9ed176bda40bfa11099a3ce5d3a7ac6"},
+    {file = "MarkupSafe-2.1.1-cp310-cp310-win_amd64.whl", hash = "sha256:dda30ba7e87fbbb7eab1ec9f58678558fd9a6b8b853530e176eabd064da81417"},
+    {file = "MarkupSafe-2.1.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:671cd1187ed5e62818414afe79ed29da836dde67166a9fac6d435873c44fdd02"},
+    {file = "MarkupSafe-2.1.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3799351e2336dc91ea70b034983ee71cf2f9533cdff7c14c90ea126bfd95d65a"},
+    {file = "MarkupSafe-2.1.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e72591e9ecd94d7feb70c1cbd7be7b3ebea3f548870aa91e2732960fa4d57a37"},
+    {file = "MarkupSafe-2.1.1-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6fbf47b5d3728c6aea2abb0589b5d30459e369baa772e0f37a0320185e87c980"},
+    {file = "MarkupSafe-2.1.1-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:d5ee4f386140395a2c818d149221149c54849dfcfcb9f1debfe07a8b8bd63f9a"},
+    {file = "MarkupSafe-2.1.1-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:bcb3ed405ed3222f9904899563d6fc492ff75cce56cba05e32eff40e6acbeaa3"},
+    {file = "MarkupSafe-2.1.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:e1c0b87e09fa55a220f058d1d49d3fb8df88fbfab58558f1198e08c1e1de842a"},
+    {file = "MarkupSafe-2.1.1-cp37-cp37m-win32.whl", hash = "sha256:8dc1c72a69aa7e082593c4a203dcf94ddb74bb5c8a731e4e1eb68d031e8498ff"},
+    {file = "MarkupSafe-2.1.1-cp37-cp37m-win_amd64.whl", hash = "sha256:97a68e6ada378df82bc9f16b800ab77cbf4b2fada0081794318520138c088e4a"},
+    {file = "MarkupSafe-2.1.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:e8c843bbcda3a2f1e3c2ab25913c80a3c5376cd00c6e8c4a86a89a28c8dc5452"},
+    {file = "MarkupSafe-2.1.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:0212a68688482dc52b2d45013df70d169f542b7394fc744c02a57374a4207003"},
+    {file = "MarkupSafe-2.1.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8e576a51ad59e4bfaac456023a78f6b5e6e7651dcd383bcc3e18d06f9b55d6d1"},
+    {file = "MarkupSafe-2.1.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4b9fe39a2ccc108a4accc2676e77da025ce383c108593d65cc909add5c3bd601"},
+    {file = "MarkupSafe-2.1.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:96e37a3dc86e80bf81758c152fe66dbf60ed5eca3d26305edf01892257049925"},
+    {file = "MarkupSafe-2.1.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:6d0072fea50feec76a4c418096652f2c3238eaa014b2f94aeb1d56a66b41403f"},
+    {file = "MarkupSafe-2.1.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:089cf3dbf0cd6c100f02945abeb18484bd1ee57a079aefd52cffd17fba910b88"},
+    {file = "MarkupSafe-2.1.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:6a074d34ee7a5ce3effbc526b7083ec9731bb3cbf921bbe1d3005d4d2bdb3a63"},
+    {file = "MarkupSafe-2.1.1-cp38-cp38-win32.whl", hash = "sha256:421be9fbf0ffe9ffd7a378aafebbf6f4602d564d34be190fc19a193232fd12b1"},
+    {file = "MarkupSafe-2.1.1-cp38-cp38-win_amd64.whl", hash = "sha256:fc7b548b17d238737688817ab67deebb30e8073c95749d55538ed473130ec0c7"},
+    {file = "MarkupSafe-2.1.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:e04e26803c9c3851c931eac40c695602c6295b8d432cbe78609649ad9bd2da8a"},
+    {file = "MarkupSafe-2.1.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:b87db4360013327109564f0e591bd2a3b318547bcef31b468a92ee504d07ae4f"},
+    {file = "MarkupSafe-2.1.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:99a2a507ed3ac881b975a2976d59f38c19386d128e7a9a18b7df6fff1fd4c1d6"},
+    {file = "MarkupSafe-2.1.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:56442863ed2b06d19c37f94d999035e15ee982988920e12a5b4ba29b62ad1f77"},
+    {file = "MarkupSafe-2.1.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3ce11ee3f23f79dbd06fb3d63e2f6af7b12db1d46932fe7bd8afa259a5996603"},
+    {file = "MarkupSafe-2.1.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:33b74d289bd2f5e527beadcaa3f401e0df0a89927c1559c8566c066fa4248ab7"},
+    {file = "MarkupSafe-2.1.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:43093fb83d8343aac0b1baa75516da6092f58f41200907ef92448ecab8825135"},
+    {file = "MarkupSafe-2.1.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:8e3dcf21f367459434c18e71b2a9532d96547aef8a871872a5bd69a715c15f96"},
+    {file = "MarkupSafe-2.1.1-cp39-cp39-win32.whl", hash = "sha256:d4306c36ca495956b6d568d276ac11fdd9c30a36f1b6eb928070dc5360b22e1c"},
+    {file = "MarkupSafe-2.1.1-cp39-cp39-win_amd64.whl", hash = "sha256:46d00d6cfecdde84d40e572d63735ef81423ad31184100411e6e3388d405e247"},
+    {file = "MarkupSafe-2.1.1.tar.gz", hash = "sha256:7f91197cc9e48f989d12e4e6fbc46495c446636dfc81b9ccf50bb0ec74b91d4b"},
+]
 mmhash3 = [
     {file = "mmhash3-3.0.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:47deea30cd8d3d5cd52dc740902a4c70383bfe8248eac29d0877fe63e03c2713"},
     {file = "mmhash3-3.0.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ecdaf4d1de617818bf05cd526ca558db6010beeba7ea9e19f695f2bdcac0e0a4"},
@@ -1204,6 +1454,10 @@ mmhash3 = [
     {file = "mmhash3-3.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:0baeaa20cac5f75ed14f28056826bd9d9c8b2354b382073f3fd5190708992a0d"},
     {file = "mmhash3-3.0.1.tar.gz", hash = "sha256:a00d68f4a1cc434b9501513c8a29e18ed1ddad383677d72b41d71d0d862348af"},
 ]
+moto = [
+    {file = "moto-4.0.9-py3-none-any.whl", hash = "sha256:2fb909d2ea1b732f89604e4268e2c2207c253e590a635a410c3c2aaebb34e113"},
+    {file = "moto-4.0.9.tar.gz", hash = "sha256:ba03b638cf3b1cec64cbe9ac0d184ca898b69020c8e3c5b9b4961c1670629010"},
+]
 multidict = [
     {file = "multidict-6.0.2-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:0b9e95a740109c6047602f4db4da9949e6c5945cefbad34a1299775ddc9a62e2"},
     {file = "multidict-6.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:ac0e27844758d7177989ce406acc6a83c16ed4524ebc363c1f748cba184d89d3"},
@@ -1455,6 +1709,10 @@ python-snappy = [
     {file = "python_snappy-0.6.1-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0bdb6942180660bda7f7d01f4c0def3cfc72b1c6d99aad964801775a3e379aba"},
     {file = "python_snappy-0.6.1-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:03bb511380fca2a13325b6f16fe8234c8e12da9660f0258cd45d9a02ffc916af"},
 ]
+pytz = [
+    {file = "pytz-2022.6-py2.py3-none-any.whl", hash = "sha256:222439474e9c98fced559f1709d89e6c9cbf8d79c794ff3eb9f8800064291427"},
+    {file = "pytz-2022.6.tar.gz", hash = "sha256:e89512406b793ca39f5971bc999cc538ce125c0e51c27941bef4568b460095e2"},
+]
 pyyaml = [
     {file = "PyYAML-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53"},
     {file = "PyYAML-6.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9df7ed3b3d2e0ecfe09e14741b857df43adb5a3ddadc919a2d94fbdf78fea53c"},
@@ -1505,6 +1763,10 @@ requests-mock = [
     {file = "requests-mock-1.10.0.tar.gz", hash = "sha256:59c9c32419a9fb1ae83ec242d98e889c45bd7d7a65d48375cc243ec08441658b"},
     {file = "requests_mock-1.10.0-py2.py3-none-any.whl", hash = "sha256:2fdbb637ad17ee15c06f33d31169e71bf9fe2bdb7bc9da26185be0dd8d842699"},
 ]
+responses = [
+    {file = "responses-0.22.0-py3-none-any.whl", hash = "sha256:dcf294d204d14c436fddcc74caefdbc5764795a40ff4e6a7740ed8ddbf3294be"},
+    {file = "responses-0.22.0.tar.gz", hash = "sha256:396acb2a13d25297789a5866b4881cf4e46ffd49cc26c43ab1117f40b973102e"},
+]
 rich = [
     {file = "rich-12.6.0-py3-none-any.whl", hash = "sha256:a4eb26484f2c82589bd9a17c73d32a010b1e29d89f1604cd9bf3a2097b81bb5e"},
     {file = "rich-12.6.0.tar.gz", hash = "sha256:ba3a3775974105c221d31141f2c116f4fd65c5ceb0698657a11e9f295ec93fd0"},
@@ -1513,6 +1775,10 @@ s3fs = [
     {file = "s3fs-2022.10.0-py3-none-any.whl", hash = "sha256:1e134c3577171699feb7c1a0c4713260d5b48296e1708737ff940baef6e2c153"},
     {file = "s3fs-2022.10.0.tar.gz", hash = "sha256:e8deb80f20bd0b2059141b874fdb9d6aeb8cce35312ea5f2c02b225a78a00406"},
 ]
+s3transfer = [
+    {file = "s3transfer-0.6.0-py3-none-any.whl", hash = "sha256:06176b74f3a15f61f1b4f25a1fc29a4429040b7647133a463da8fa5bd28d5ecd"},
+    {file = "s3transfer-0.6.0.tar.gz", hash = "sha256:2ed07d3866f523cc561bf4a00fc5535827981b117dd7876f036b0c1aca42c947"},
+]
 setuptools = [
     {file = "setuptools-65.5.1-py3-none-any.whl", hash = "sha256:d0b9a8433464d5800cbe05094acf5c6d52a91bfac9b52bcfc4d41382be5d5d31"},
     {file = "setuptools-65.5.1.tar.gz", hash = "sha256:e197a19aa8ec9722928f2206f8de752def0e4c9fc6953527360d1c36d94ddb2f"},
@@ -1532,6 +1798,10 @@ tomli = [
     {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"},
     {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"},
 ]
+types-toml = [
+    {file = "types-toml-0.10.8.1.tar.gz", hash = "sha256:171bdb3163d79a520560f24ba916a9fc9bff81659c5448a9fea89240923722be"},
+    {file = "types_toml-0.10.8.1-py3-none-any.whl", hash = "sha256:b7b5c4977f96ab7b5ac06d8a6590d17c0bf252a96efc03b109c2711fb3e0eafd"},
+]
 typing-extensions = [
     {file = "typing_extensions-4.4.0-py3-none-any.whl", hash = "sha256:16fa4864408f655d35ec496218b85f79b3437c829e93320c7c9215ccfd92489e"},
     {file = "typing_extensions-4.4.0.tar.gz", hash = "sha256:1511434bb92bf8dd198c12b1cc812e800d4181cfcb867674e0f8279cc93087aa"},
@@ -1544,6 +1814,10 @@ virtualenv = [
     {file = "virtualenv-20.16.7-py3-none-any.whl", hash = "sha256:efd66b00386fdb7dbe4822d172303f40cd05e50e01740b19ea42425cbe653e29"},
     {file = "virtualenv-20.16.7.tar.gz", hash = "sha256:8691e3ff9387f743e00f6bb20f70121f5e4f596cae754531f2b3b3a1b1ac696e"},
 ]
+werkzeug = [
+    {file = "Werkzeug-2.2.2-py3-none-any.whl", hash = "sha256:f979ab81f58d7318e064e99c4506445d60135ac5cd2e177a2de0089bfd4c9bd5"},
+    {file = "Werkzeug-2.2.2.tar.gz", hash = "sha256:7ea2d48322cc7c0f8b3a215ed73eabd7b5d75d0b50e31ab006286ccff9e00b8f"},
+]
 wrapt = [
     {file = "wrapt-1.14.1-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:1b376b3f4896e7930f1f772ac4b064ac12598d1c38d04907e696cc4d794b43d3"},
     {file = "wrapt-1.14.1-cp27-cp27m-manylinux1_i686.whl", hash = "sha256:903500616422a40a98a5a3c4ff4ed9d0066f3b4c951fa286018ecdf0750194ef"},
@@ -1610,6 +1884,10 @@ wrapt = [
     {file = "wrapt-1.14.1-cp39-cp39-win_amd64.whl", hash = "sha256:dee60e1de1898bde3b238f18340eec6148986da0455d8ba7848d50470a7a32fb"},
     {file = "wrapt-1.14.1.tar.gz", hash = "sha256:380a85cf89e0e69b7cfbe2ea9f765f004ff419f34194018a6827ac0e3edfed4d"},
 ]
+xmltodict = [
+    {file = "xmltodict-0.13.0-py2.py3-none-any.whl", hash = "sha256:aa89e8fd76320154a40d19a0df04a4695fb9dc5ba977cbb68ab3e4eb225e7852"},
+    {file = "xmltodict-0.13.0.tar.gz", hash = "sha256:341595a488e3e01a85a9d8911d8912fd922ede5fecc4dce437eb4b6c8d037e56"},
+]
 yarl = [
     {file = "yarl-1.8.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:abc06b97407868ef38f3d172762f4069323de52f2b70d133d096a48d72215d28"},
     {file = "yarl-1.8.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:07b21e274de4c637f3e3b7104694e53260b5fc10d51fb3ec5fed1da8e0f754e3"},
diff --git a/python/pyiceberg/catalog/__init__.py b/python/pyiceberg/catalog/__init__.py
index 01fa6f3cf6..9a88d59e25 100644
--- a/python/pyiceberg/catalog/__init__.py
+++ b/python/pyiceberg/catalog/__init__.py
@@ -25,12 +25,14 @@ from typing import (
     Callable,
     List,
     Optional,
+    Set,
     Union,
     cast,
 )
 
 from pyiceberg.exceptions import NotInstalledError
 from pyiceberg.io import FileIO, load_file_io
+from pyiceberg.manifest import ManifestFile
 from pyiceberg.schema import Schema
 from pyiceberg.table import Table
 from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
@@ -49,12 +51,21 @@ _ENV_CONFIG = Config()
 
 TOKEN = "token"
 TYPE = "type"
+ICEBERG = "iceberg"
+TABLE_TYPE = "table_type"
+WAREHOUSE = "warehouse"
+METADATA_LOCATION = "metadata_location"
+MANIFEST = "manifest"
+MANIFEST_LIST = "manifest list"
+PREVIOUS_METADATA = "previous metadata"
+METADATA = "metadata"
 URI = "uri"
 
 
 class CatalogType(Enum):
     REST = "rest"
     HIVE = "hive"
+    GLUE = "glue"
 
 
 def load_rest(name: str, conf: Properties) -> Catalog:
@@ -72,9 +83,19 @@ def load_hive(name: str, conf: Properties) -> Catalog:
         raise NotInstalledError("Apache Hive support not installed: pip install 'pyiceberg[hive]'") from exc
 
 
+def load_glue(name: str, conf: Properties) -> Catalog:
+    try:
+        from pyiceberg.catalog.glue import GlueCatalog
+
+        return GlueCatalog(name, **conf)
+    except ImportError as exc:
+        raise NotInstalledError("AWS glue support not installed: pip install 'pyiceberg[glue]'") from exc
+
+
 AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = {
     CatalogType.REST: load_rest,
     CatalogType.HIVE: load_hive,
+    CatalogType.GLUE: load_glue,
 }
 
 
@@ -138,6 +159,44 @@ def load_catalog(name: str, **properties: Optional[str]) -> Catalog:
     raise ValueError(f"Could not initialize catalog with the following properties: {properties}")
 
 
+def delete_files(io: FileIO, files_to_delete: Set[str], file_type: str) -> None:
+    """Helper to delete files.
+
+    Log warnings if failing to delete any file
+
+    Args:
+        io: The FileIO used to delete the object
+        files_to_delete: A set of file paths to be deleted
+        file_type: The type of the file
+    """
+    for file in files_to_delete:
+        try:
+            io.delete(file)
+        except OSError as exc:
+            logger.warning(msg=f"Failed to delete {file_type} file {file}", exc_info=exc)
+
+
+def delete_data_files(io: FileIO, manifests_to_delete: List[ManifestFile]) -> None:
+    """Helper to delete data files linked to given manifests.
+
+    Log warnings if failing to delete any file
+
+    Args:
+        io: The FileIO used to delete the object
+        manifests_to_delete: A list of manifest contains paths of data files to be deleted
+    """
+    deleted_files: dict[str, bool] = {}
+    for manifest_file in manifests_to_delete:
+        for entry in manifest_file.fetch_manifest_entry(io):
+            path = entry.data_file.file_path
+            if not deleted_files.get(path, False):
+                try:
+                    io.delete(path)
+                except OSError as exc:
+                    logger.warning(msg=f"Failed to delete data file {path}", exc_info=exc)
+                deleted_files[path] = True
+
+
 @dataclass
 class PropertiesUpdateSummary:
     removed: List[str]
diff --git a/python/pyiceberg/catalog/glue.py b/python/pyiceberg/catalog/glue.py
new file mode 100644
index 0000000000..45af6ef97f
--- /dev/null
+++ b/python/pyiceberg/catalog/glue.py
@@ -0,0 +1,549 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+
+import uuid
+from typing import (
+    Any,
+    Dict,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Type,
+    Union,
+)
+
+import boto3
+
+from pyiceberg.catalog import (
+    ICEBERG,
+    MANIFEST,
+    MANIFEST_LIST,
+    METADATA,
+    METADATA_LOCATION,
+    PREVIOUS_METADATA,
+    TABLE_TYPE,
+    WAREHOUSE,
+    Catalog,
+    Identifier,
+    Properties,
+    PropertiesUpdateSummary,
+    delete_data_files,
+    delete_files,
+)
+from pyiceberg.exceptions import (
+    NamespaceAlreadyExistsError,
+    NamespaceNotEmptyError,
+    NoSuchIcebergTableError,
+    NoSuchNamespaceError,
+    NoSuchPropertyException,
+    NoSuchTableError,
+    TableAlreadyExistsError,
+)
+from pyiceberg.io import FileIO, load_file_io
+from pyiceberg.schema import Schema
+from pyiceberg.serializers import FromInputFile, ToOutputFile
+from pyiceberg.table import Table
+from pyiceberg.table.metadata import TableMetadata, new_table_metadata
+from pyiceberg.table.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from pyiceberg.typedef import EMPTY_DICT
+
+EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE"
+GLUE_CLIENT = "glue"
+
+PROP_GLUE_TABLE = "Table"
+PROP_GLUE_TABLE_TYPE = "TableType"
+PROP_GLUE_TABLE_DESCRIPTION = "Description"
+PROP_GLUE_TABLE_PARAMETERS = "Parameters"
+PROP_GLUE_TABLE_DATABASE_NAME = "DatabaseName"
+PROP_GLUE_TABLE_NAME = "Name"
+PROP_GLUE_TABLE_OWNER = "Owner"
+PROP_GLUE_TABLE_STORAGE_DESCRIPTOR = "StorageDescriptor"
+
+PROP_GLUE_TABLELIST = "TableList"
+
+PROP_GLUE_DATABASE = "Database"
+PROP_GLUE_DATABASE_LIST = "DatabaseList"
+PROP_GLUE_DATABASE_NAME = "Name"
+PROP_GLUE_DATABASE_LOCATION = "LocationUri"
+PROP_GLUE_DATABASE_DESCRIPTION = "Description"
+PROP_GLUE_DATABASE_PARAMETERS = "Parameters"
+
+PROP_GLUE_NEXT_TOKEN = "NextToken"
+
+GLUE_DESCRIPTION_KEY = "comment"
+GLUE_DATABASE_LOCATION_KEY = "location"
+
+
+def _construct_parameters(metadata_location: str) -> Properties:
+    return {TABLE_TYPE: ICEBERG.upper(), METADATA_LOCATION: metadata_location}
+
+
+def _construct_table_input(table_name: str, metadata_location: str, properties: Properties) -> Dict[str, Any]:
+    table_input = {
+        PROP_GLUE_TABLE_NAME: table_name,
+        PROP_GLUE_TABLE_TYPE: EXTERNAL_TABLE_TYPE,
+        PROP_GLUE_TABLE_PARAMETERS: _construct_parameters(metadata_location),
+    }
+
+    if table_description := properties.get(GLUE_DESCRIPTION_KEY):
+        table_input[PROP_GLUE_TABLE_DESCRIPTION] = table_description
+
+    return table_input
+
+
+def _construct_database_input(database_name: str, properties: Properties) -> Dict[str, Any]:
+    database_input: Dict[str, Any] = {PROP_GLUE_DATABASE_NAME: database_name}
+    parameters = {}
+    for k, v in properties.items():
+        if k == GLUE_DESCRIPTION_KEY:
+            database_input[PROP_GLUE_DATABASE_DESCRIPTION] = v
+        elif k == GLUE_DATABASE_LOCATION_KEY:
+            database_input[PROP_GLUE_DATABASE_LOCATION] = v
+        else:
+            parameters[k] = v
+    database_input[PROP_GLUE_DATABASE_PARAMETERS] = parameters
+    return database_input
+
+
+def _write_metadata(metadata: TableMetadata, io: FileIO, metadate_path: str) -> None:
+    ToOutputFile.table_metadata(metadata, io.new_output(metadate_path))
+
+
+class GlueCatalog(Catalog):
+    @staticmethod
+    def identifier_to_database(
+        identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError
+    ) -> str:
+        tuple_identifier = Catalog.identifier_to_tuple(identifier)
+        if len(tuple_identifier) != 1:
+            raise err(f"Invalid database, hierarchical namespaces are not supported: {identifier}")
+
+        return tuple_identifier[0]
+
+    @staticmethod
+    def identifier_to_database_and_table(
+        identifier: Union[str, Identifier],
+        err: Union[Type[ValueError], Type[NoSuchTableError], Type[NoSuchNamespaceError]] = ValueError,
+    ) -> Tuple[str, str]:
+        tuple_identifier = Catalog.identifier_to_tuple(identifier)
+        if len(tuple_identifier) != 2:
+            raise err(f"Invalid path, hierarchical namespaces are not supported: {identifier}")
+
+        return tuple_identifier[0], tuple_identifier[1]
+
+    def __init__(self, name: str, **properties: str):
+        super().__init__(name, **properties)
+        self.glue = boto3.client(GLUE_CLIENT)
+
+    def _convert_glue_to_iceberg(self, glue_table: Dict[str, Any]) -> Table:
+        properties: Properties = glue_table.get(PROP_GLUE_TABLE_PARAMETERS, {})
+
+        if TABLE_TYPE not in properties:
+            raise NoSuchPropertyException(
+                f"Property {TABLE_TYPE} missing, could not determine type: "
+                f"{glue_table[PROP_GLUE_TABLE_DATABASE_NAME]}.{glue_table[PROP_GLUE_TABLE_NAME]}"
+            )
+        glue_table_type = properties[TABLE_TYPE]
+
+        if glue_table_type.lower() != ICEBERG:
+            raise NoSuchIcebergTableError(
+                f"Property table_type is {glue_table_type}, expected {ICEBERG}: "
+                f"{glue_table[PROP_GLUE_TABLE_DATABASE_NAME]}.{glue_table[PROP_GLUE_TABLE_NAME]}"
+            )
+
+        if METADATA_LOCATION not in properties:
+            raise NoSuchPropertyException(
+                f"Table property {METADATA_LOCATION} is missing, cannot find metadata for: "
+                f"{glue_table[PROP_GLUE_TABLE_DATABASE_NAME]}.{glue_table[PROP_GLUE_TABLE_NAME]}"
+            )
+        metadata_location = properties[METADATA_LOCATION]
+
+        io = load_file_io(properties=self.properties, location=metadata_location)
+        file = io.new_input(metadata_location)
+        metadata = FromInputFile.table_metadata(file)
+        return Table(
+            identifier=(glue_table[PROP_GLUE_TABLE_DATABASE_NAME], glue_table[PROP_GLUE_TABLE_NAME]),
+            metadata=metadata,
+            metadata_location=metadata_location,
+            io=self._load_file_io(metadata.properties),
+        )
+
+    def _default_warehouse_location(self, database_name: str, table_name: str) -> str:
+        database_properties = self.load_namespace_properties(database_name)
+        if database_location := database_properties.get(GLUE_DATABASE_LOCATION_KEY):
+            database_location = database_location.rstrip("/")
+            return f"{database_location}/{table_name}"
+
+        if warehouse_path := self.properties.get(WAREHOUSE):
+            warehouse_path = warehouse_path.rstrip("/")
+            return f"{warehouse_path}/{database_name}.db/{table_name}"
+
+        raise ValueError("No default path is set, please specify a location when creating a table")
+
+    def _resolve_table_location(self, location: Optional[str], database_name: str, table_name: str) -> str:
+        if not location:
+            return self._default_warehouse_location(database_name, table_name)
+        return location
+
+    def _create_glue_table(self, identifier: Union[str, Identifier], table_input: Dict[str, Any]) -> None:
+        database_name, table_name = self.identifier_to_database_and_table(identifier)
+        try:
+            self.glue.create_table(DatabaseName=database_name, TableInput=table_input)
+        except self.glue.exceptions.AlreadyExistsException as e:
+            raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e
+        except self.glue.exceptions.EntityNotFoundException as e:
+            raise NoSuchNamespaceError(f"Database {database_name} does not exist") from e
+
+    def create_table(
+        self,
+        identifier: Union[str, Identifier],
+        schema: Schema,
+        location: Optional[str] = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> Table:
+        """Create an Iceberg table in Glue catalog
+
+        Args:
+            identifier: Table identifier.
+            schema: Table's schema.
+            location: Location for the table. Optional Argument.
+            partition_spec: PartitionSpec for the table.
+            sort_order: SortOrder for the table.
+            properties: Table properties that can be a string based dictionary.
+
+        Returns:
+            Table: the created table instance
+
+        Raises:
+            AlreadyExistsError: If a table with the name already exists
+            ValueError: If the identifier is invalid, or no path is given to store metadata
+
+        """
+        database_name, table_name = self.identifier_to_database_and_table(identifier)
+
+        location = self._resolve_table_location(location, database_name, table_name)
+        metadata_location = f"{location}/metadata/00000-{uuid.uuid4()}.metadata.json"
+        metadata = new_table_metadata(
+            location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
+        )
+        io = load_file_io(properties=self.properties, location=metadata_location)
+        _write_metadata(metadata, io, metadata_location)
+
+        self._create_glue_table(
+            identifier=identifier, table_input=_construct_table_input(table_name, metadata_location, properties)
+        )
+        loaded_table = self.load_table(identifier=(database_name, table_name))
+        return loaded_table
+
+    def load_table(self, identifier: Union[str, Identifier]) -> Table:
+        """Loads the table's metadata and returns the table instance.
+
+        You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'
+        Note: This method doesn't scan data stored in the table.
+
+        Args:
+            identifier: Table identifier.
+
+        Returns:
+            Table: the table instance with its metadata
+
+        Raises:
+            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid
+        """
+        database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
+        try:
+            load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
+        except self.glue.exceptions.EntityNotFoundException as e:
+            raise NoSuchTableError(f"Table does not exists: {database_name}.{table_name}") from e
+
+        return self._convert_glue_to_iceberg(load_table_response.get(PROP_GLUE_TABLE, {}))
+
+    def drop_table(self, identifier: Union[str, Identifier]) -> None:
+        """Drop a table.
+
+        Args:
+            identifier: Table identifier.
+
+        Raises:
+            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid
+        """
+        database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
+        try:
+            self.glue.delete_table(DatabaseName=database_name, Name=table_name)
+        except self.glue.exceptions.EntityNotFoundException as e:
+            raise NoSuchTableError(f"Table does not exists: {database_name}.{table_name}") from e
+
+    def purge_table(self, identifier: Union[str, Identifier]) -> None:
+        """Drop a table and purge all data and metadata files.
+
+        Note: This method only logs warning rather than raise exception when encountering file deletion failure
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+
+        Raises:
+            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid
+        """
+        table = self.load_table(identifier)
+        self.drop_table(identifier)
+        io = load_file_io(self.properties, table.metadata_location)
+        metadata = table.metadata
+        manifest_lists_to_delete = set()
+        manifests_to_delete = []
+        for snapshot in metadata.snapshots:
+            manifests_to_delete += snapshot.fetch_manifest_list(io)
+            if snapshot.manifest_list is not None:
+                manifest_lists_to_delete.add(snapshot.manifest_list)
+
+        manifest_paths_to_delete = {manifest.manifest_path for manifest in manifests_to_delete}
+        prev_metadata_files = {log.metadata_file for log in metadata.metadata_log}
+
+        delete_data_files(io, manifests_to_delete)
+        delete_files(io, manifest_paths_to_delete, MANIFEST)
+        delete_files(io, manifest_lists_to_delete, MANIFEST_LIST)
+        delete_files(io, prev_metadata_files, PREVIOUS_METADATA)
+        delete_files(io, {table.metadata_location}, METADATA)
+
+    def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
+        """Rename a fully classified table name
+
+        This method can only rename Iceberg tables in AWS Glue
+
+        Args:
+            from_identifier: Existing table identifier.
+            to_identifier: New table identifier.
+
+        Returns:
+            Table: the updated table instance with its metadata
+
+        Raises:
+            ValueError: When the from table identifier is invalid
+            NoSuchTableError: When a table with the name does not exist
+            NoSuchIcebergTableError: When the from table is not a valid iceberg table
+            NoSuchPropertyException: When the from table miss some required properties
+            NoSuchNamespaceError: When the destination namespace doesn't exist
+        """
+        from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
+        to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
+        try:
+            get_table_response = self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name)
+        except self.glue.exceptions.EntityNotFoundException as e:
+            raise NoSuchTableError(f"Table does not exists: {from_database_name}.{from_table_name}") from e
+
+        glue_table = get_table_response[PROP_GLUE_TABLE]
+
+        try:
+            # verify that from_identifier is a valid iceberg table
+            self._convert_glue_to_iceberg(glue_table)
+        except NoSuchPropertyException as e:
+            raise NoSuchPropertyException(
+                f"Failed to rename table {from_database_name}.{from_table_name} since it miss required properties"
+            ) from e
+        except NoSuchIcebergTableError as e:
+            raise NoSuchIcebergTableError(
+                f"Failed to rename table {from_database_name}.{from_table_name} since it is not a valid iceberg table"
+            ) from e
+
+        new_table_input = {PROP_GLUE_TABLE_NAME: to_table_name}
+        # use the same Glue info to create the new table, pointing to the old metadata
+        if table_type := glue_table.get(PROP_GLUE_TABLE_TYPE):
+            new_table_input[PROP_GLUE_TABLE_TYPE] = table_type
+        if table_parameters := glue_table.get(PROP_GLUE_TABLE_PARAMETERS):
+            new_table_input[PROP_GLUE_TABLE_PARAMETERS] = table_parameters
+        if table_owner := glue_table.get(PROP_GLUE_TABLE_OWNER):
+            new_table_input[PROP_GLUE_TABLE_OWNER] = table_owner
+        if table_storage_descriptor := glue_table.get(PROP_GLUE_TABLE_STORAGE_DESCRIPTOR):
+            new_table_input[PROP_GLUE_TABLE_STORAGE_DESCRIPTOR] = table_storage_descriptor
+        if table_description := glue_table.get(PROP_GLUE_TABLE_DESCRIPTION):
+            new_table_input[PROP_GLUE_TABLE_DESCRIPTION] = table_description
+
+        self._create_glue_table(identifier=to_identifier, table_input=new_table_input)
+        try:
+            self.drop_table(from_identifier)
+        except Exception as e:
+            self.drop_table(to_identifier)
+            raise ValueError(
+                f"Fail to drop old table {from_database_name}.{from_table_name}, "
+                f"after renaming to {to_database_name}.{to_table_name} roll back to use the old one"
+            ) from e
+        return self.load_table(to_identifier)
+
+    def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
+        """Create a namespace in the catalog.
+
+        Args:
+            namespace: Namespace identifier
+            properties: A string dictionary of properties for the given namespace
+
+        Raises:
+            ValueError: If the identifier is invalid
+            AlreadyExistsError: If a namespace with the given name already exists
+        """
+        database_name = self.identifier_to_database(namespace)
+        try:
+            self.glue.create_database(DatabaseInput=_construct_database_input(database_name, properties))
+        except self.glue.exceptions.AlreadyExistsException as e:
+            raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e
+
+    def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
+        """Drop a namespace.
+
+        A Glue namespace can only be dropped if it is empty
+
+        Args:
+            namespace: Namespace identifier
+
+        Raises:
+            NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid
+            NamespaceNotEmptyError: If the namespace is not empty
+        """
+        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
+        try:
+            table_list = self.list_tables(namespace=database_name)
+        except NoSuchNamespaceError as e:
+            raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e
+
+        if len(table_list) > 0:
+            raise NamespaceNotEmptyError(f"Database {database_name} is not empty")
+
+        self.glue.delete_database(Name=database_name)
+
+    def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
+        """List tables under the given namespace in the catalog (including non-Iceberg tables)
+
+        Args:
+            namespace (str | Identifier): Namespace identifier to search.
+
+        Returns:
+            List[Identifier]: list of table identifiers.
+
+        Raises:
+            NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid
+        """
+
+        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
+        table_list = []
+        try:
+            table_list_response = self.glue.get_tables(DatabaseName=database_name)
+            next_token = table_list_response.get(PROP_GLUE_NEXT_TOKEN)
+            table_list += table_list_response.get(PROP_GLUE_TABLELIST, [])
+            while next_token:
+                table_list_response = self.glue.get_tables(DatabaseName=database_name, NextToken=next_token)
+                next_token = table_list_response.get(PROP_GLUE_NEXT_TOKEN)
+                table_list += table_list_response.get(PROP_GLUE_TABLELIST, [])
+        except self.glue.exceptions.EntityNotFoundException as e:
+            raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e
+        return [(database_name, table.get(PROP_GLUE_TABLE_NAME)) for table in table_list]
+
+    def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
+        """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.
+
+        Returns:
+            List[Identifier]: a List of namespace identifiers
+        """
+        # Glue does not support hierarchical namespace, therefore return an empty list
+        if namespace:
+            return []
+        database_list = []
+        databases_response = self.glue.get_databases()
+        next_token = databases_response.get(PROP_GLUE_NEXT_TOKEN)
+        database_list += databases_response.get(PROP_GLUE_DATABASE_LIST, [])
+        while next_token:
+            databases_response = self.glue.get_databases(NextToken=next_token)
+            next_token = databases_response.get(PROP_GLUE_NEXT_TOKEN)
+            database_list += databases_response.get(PROP_GLUE_DATABASE_LIST, [])
+        return [self.identifier_to_tuple(database.get(PROP_GLUE_DATABASE_NAME)) for database in database_list]
+
+    def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
+        """Get properties for a namespace.
+
+        Args:
+            namespace: Namespace identifier
+
+        Returns:
+            Properties: Properties for the given namespace
+
+        Raises:
+            NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid
+        """
+        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
+        try:
+            database_response = self.glue.get_database(Name=database_name)
+        except self.glue.exceptions.EntityNotFoundException as e:
+            raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e
+        except self.glue.exceptions.InvalidInputException as e:
+            raise NoSuchNamespaceError(f"Invalid input for namespace {database_name}") from e
+
+        database = database_response[PROP_GLUE_DATABASE]
+        if PROP_GLUE_DATABASE_PARAMETERS not in database:
+            return {}
+
+        properties = dict(database[PROP_GLUE_DATABASE_PARAMETERS])
+        if database_location := database.get(PROP_GLUE_DATABASE_LOCATION):
+            properties[GLUE_DATABASE_LOCATION_KEY] = database_location
+        if database_description := database.get(PROP_GLUE_DATABASE_DESCRIPTION):
+            properties[GLUE_DESCRIPTION_KEY] = database_description
+
+        return properties
+
+    def update_namespace_properties(
+        self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
+    ) -> PropertiesUpdateSummary:
+        """Removes provided property keys and updates properties for a namespace.
+
+        Args:
+            namespace: Namespace identifier
+            removals: Set of property keys that need to be removed. Optional Argument.
+            updates: Properties to be updated for the given namespace.
+
+        Raises:
+            NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid
+            ValueError: If removals and updates have overlapping keys.
+        """
+        removed: Set[str] = set()
+        updated: Set[str] = set()
+
+        if updates and removals:
+            overlap = set(removals) & set(updates.keys())
+            if overlap:
+                raise ValueError(f"Updates and deletes have an overlap: {overlap}")
+        database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
+        current_properties = self.load_namespace_properties(namespace=database_name)
+        new_properties = dict(current_properties)
+
+        if removals:
+            for key in removals:
+                if key in new_properties:
+                    new_properties.pop(key)
+                    removed.add(key)
+        if updates:
+            for key, value in updates.items():
+                new_properties[key] = value
+                updated.add(key)
+
+        self.glue.update_database(Name=database_name, DatabaseInput=_construct_database_input(database_name, new_properties))
+
+        expected_to_change = (removals or set()).difference(removed)
+
+        return PropertiesUpdateSummary(
+            removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change)
+        )
diff --git a/python/pyiceberg/exceptions.py b/python/pyiceberg/exceptions.py
index b3cc2778e2..0438a5322a 100644
--- a/python/pyiceberg/exceptions.py
+++ b/python/pyiceberg/exceptions.py
@@ -36,6 +36,10 @@ class NoSuchTableError(Exception):
     """Raises when the table can't be found in the REST catalog"""
 
 
+class NoSuchIcebergTableError(NoSuchTableError):
+    """Raises when the table found in the REST catalog is not an iceberg table"""
+
+
 class NoSuchNamespaceError(Exception):
     """Raised when a referenced name-space is not found"""
 
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 181093e469..514ad9e74f 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -64,6 +64,7 @@ python-snappy = { version = "0.6.1", optional = true }
 thrift = { version = "0.16.0", optional = true }
 
 s3fs = { version = "2022.10.0", optional = true }
+boto3 = {version = "1.24.59", optional = true}
 
 [tool.poetry.dev-dependencies]
 pytest = "7.2.0"
@@ -72,6 +73,7 @@ pre-commit = "2.20.0"
 fastavro = "1.6.1"
 coverage = { version = "^6.5.0", extras = ["toml"] }
 requests-mock = "1.10.0"
+moto = "^4.0.6"
 typing-extensions = '4.4.0'
 
 [tool.poetry.scripts]
@@ -87,6 +89,7 @@ pyarrow = ["pyarrow"]
 snappy = ["python-snappy"]
 hive = ["thrift"]
 s3fs = ["s3fs"]
+glue = ["boto3"]
 
 [tool.pytest.ini_options]
 markers = [
@@ -178,9 +181,25 @@ ignore_missing_imports = true
 module = "tests.*"
 ignore_missing_imports = true
 
+[[tool.mypy.overrides]]
+module = "boto3"
+ignore_missing_imports = true
+
 [[tool.mypy.overrides]]
 module = "botocore.*"
 ignore_missing_imports = true
 
+[[tool.mypy.overrides]]
+module = "moto"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "aiobotocore.*"
+ignore_missing_imports = true
+
+[[tool.mypy.overrides]]
+module = "aiohttp.*"
+ignore_missing_imports = true
+
 [tool.coverage.run]
 source = ['pyiceberg/']
diff --git a/python/tests/catalog/integration_test_glue.py b/python/tests/catalog/integration_test_glue.py
new file mode 100644
index 0000000000..70ac3a00a4
--- /dev/null
+++ b/python/tests/catalog/integration_test_glue.py
@@ -0,0 +1,324 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import os
+
+import boto3
+import pytest
+from botocore.exceptions import ClientError
+
+from pyiceberg.catalog.glue import GlueCatalog
+from pyiceberg.exceptions import (
+    NamespaceAlreadyExistsError,
+    NamespaceNotEmptyError,
+    NoSuchNamespaceError,
+    NoSuchTableError,
+    TableAlreadyExistsError,
+)
+from pyiceberg.schema import Schema
+from tests.catalog.test_glue import (
+    get_random_database_name,
+    get_random_databases,
+    get_random_table_name,
+    get_random_tables,
+)
+
+# The number of random characters in generated table/database name
+RANDOM_LENGTH = 20
+# The number of tables/databases used in list_table/namespace test
+LIST_TEST_NUMBER = 2
+
+
+def get_bucket_name():
+    """
+    Set the environment variable AWS_TEST_BUCKET for a default bucket to test
+    """
+    bucket_name = os.getenv("AWS_TEST_BUCKET")
+    if bucket_name is None:
+        raise ValueError("Please specify a bucket to run the test by setting environment variable AWS_TEST_BUCKET")
+    return bucket_name
+
+
+def get_s3_path(bucket_name, database_name=None, table_name=None):
+    result_path = f"s3://{bucket_name}"
+    if database_name is not None:
+        result_path += f"/{database_name}.db"
+
+    if table_name is not None:
+        result_path += f"/{table_name}"
+    return result_path
+
+
+@pytest.fixture(name="s3", scope="module")
+def fixture_s3_client():
+    yield boto3.client("s3")
+
+
+@pytest.fixture(name="glue", scope="module")
+def fixture_glue_client():
+    yield boto3.client("glue")
+
+
+def clean_up(test_catalog):
+    """Clean all databases and tables created during the integration test"""
+    for database_name in test_catalog.list_namespaces():
+        database_name = database_name[0]
+        if "my_iceberg_database-" in database_name:
+            for identifier in test_catalog.list_tables(database_name):
+                test_catalog.purge_table(identifier)
+            test_catalog.drop_namespace(database_name)
+
+
+@pytest.fixture(name="test_catalog", scope="module")
+def fixture_test_catalog():
+    """The pre- and post-setting of aws integration test"""
+    test_catalog = GlueCatalog("glue", warehouse=get_s3_path(get_bucket_name()))
+    yield test_catalog
+    clean_up(test_catalog)
+
+
+def test_create_table(test_catalog, s3, table_schema_nested: Schema):
+    table_name = get_random_table_name()
+    database_name = get_random_database_name()
+    identifier = (database_name, table_name)
+    test_catalog.create_namespace(database_name)
+    test_catalog.create_table(identifier, table_schema_nested, get_s3_path(get_bucket_name(), database_name, table_name))
+    table = test_catalog.load_table(identifier)
+    assert table.identifier == identifier
+    metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
+    s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
+
+
+def test_create_table_with_invalid_location(table_schema_nested: Schema):
+    table_name = get_random_table_name()
+    database_name = get_random_database_name()
+    identifier = (database_name, table_name)
+    test_catalog_no_warehouse = GlueCatalog("glue")
+    test_catalog_no_warehouse.create_namespace(database_name)
+    with pytest.raises(ValueError):
+        test_catalog_no_warehouse.create_table(identifier, table_schema_nested)
+    test_catalog_no_warehouse.drop_namespace(database_name)
+
+
+def test_create_table_with_default_location(test_catalog, s3, table_schema_nested: Schema):
+    table_name = get_random_table_name()
+    database_name = get_random_database_name()
+    identifier = (database_name, table_name)
+    test_catalog.create_namespace(database_name)
+    test_catalog.create_table(identifier, table_schema_nested)
+    table = test_catalog.load_table(identifier)
+    assert table.identifier == identifier
+    metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
+    s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
+
+
+def test_create_table_with_invalid_database(test_catalog, table_schema_nested: Schema):
+    table_name = get_random_table_name()
+    identifier = ("invalid", table_name)
+    with pytest.raises(NoSuchNamespaceError):
+        test_catalog.create_table(identifier, table_schema_nested)
+
+
+def test_create_duplicated_table(test_catalog, table_schema_nested: Schema):
+    table_name = get_random_table_name()
+    database_name = get_random_database_name()
+    test_catalog.create_namespace(database_name)
+    test_catalog.create_table((database_name, table_name), table_schema_nested)
+    with pytest.raises(TableAlreadyExistsError):
+        test_catalog.create_table((database_name, table_name), table_schema_nested)
+
+
+def test_load_table(test_catalog, table_schema_nested):
+    table_name = get_random_table_name()
+    database_name = get_random_database_name()
+    identifier = (database_name, table_name)
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    loaded_table = test_catalog.load_table(identifier)
+    assert table.identifier == loaded_table.identifier
+    assert table.metadata_location == loaded_table.metadata_location
+    assert table.metadata == loaded_table.metadata
+
+
+def test_list_tables(test_catalog, table_schema_nested):
+    test_tables = get_random_tables(LIST_TEST_NUMBER)
+    database_name = get_random_database_name()
+    test_catalog.create_namespace(database_name)
+    for table_name in test_tables:
+        test_catalog.create_table((database_name, table_name), table_schema_nested)
+    identifier_list = test_catalog.list_tables(database_name)
+    assert len(identifier_list) == LIST_TEST_NUMBER
+    for table_name in test_tables:
+        assert (database_name, table_name) in identifier_list
+
+
+def test_rename_table(test_catalog, s3, table_schema_nested):
+    table_name = get_random_table_name()
+    database_name = get_random_database_name()
+    new_database_name = get_random_database_name()
+    test_catalog.create_namespace(database_name)
+    test_catalog.create_namespace(new_database_name)
+    new_table_name = f"rename-{table_name}"
+    identifier = (database_name, table_name)
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    assert table.identifier == identifier
+    new_identifier = (new_database_name, new_table_name)
+    test_catalog.rename_table(identifier, new_identifier)
+    new_table = test_catalog.load_table(new_identifier)
+    assert new_table.identifier == new_identifier
+    assert new_table.metadata_location == table.metadata_location
+    metadata_location = new_table.metadata_location.split(get_bucket_name())[1][1:]
+    s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(identifier)
+
+
+def test_drop_table(test_catalog, table_schema_nested):
+    table_name = get_random_table_name()
+    database_name = get_random_database_name()
+    identifier = (database_name, table_name)
+    test_catalog.create_namespace(database_name)
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    assert table.identifier == identifier
+    test_catalog.drop_table(identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(identifier)
+
+
+def test_purge_table(test_catalog, s3, table_schema_nested):
+    table_name = get_random_table_name()
+    database_name = get_random_database_name()
+    identifier = (database_name, table_name)
+    test_catalog.create_namespace(database_name)
+    test_catalog.create_table(identifier, table_schema_nested)
+    table = test_catalog.load_table(identifier)
+    assert table.identifier == identifier
+    metadata_location = table.metadata_location.split(get_bucket_name())[1][1:]
+    s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
+    test_catalog.purge_table(identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(identifier)
+    with pytest.raises(ClientError):
+        s3.head_object(Bucket=get_bucket_name(), Key=metadata_location)
+
+
+def test_create_namespace(test_catalog):
+    database_name = get_random_database_name()
+    test_catalog.create_namespace(database_name)
+    assert (database_name,) in test_catalog.list_namespaces()
+
+
+def test_create_duplicate_namespace(test_catalog):
+    database_name = get_random_database_name()
+    test_catalog.create_namespace(database_name)
+    with pytest.raises(NamespaceAlreadyExistsError):
+        test_catalog.create_namespace(database_name)
+
+
+def test_create_namespace_with_comment_and_location(test_catalog):
+    database_name = get_random_database_name()
+    test_location = get_s3_path(get_bucket_name(), database_name)
+    test_properties = {
+        "comment": "this is a test description",
+        "location": test_location,
+    }
+    test_catalog.create_namespace(namespace=database_name, properties=test_properties)
+    loaded_database_list = test_catalog.list_namespaces()
+    assert (database_name,) in loaded_database_list
+    properties = test_catalog.load_namespace_properties(database_name)
+    assert properties["comment"] == "this is a test description"
+    assert properties["location"] == test_location
+
+
+def test_list_namespaces(test_catalog):
+    database_list = get_random_databases(LIST_TEST_NUMBER)
+    for database_name in database_list:
+        test_catalog.create_namespace(database_name)
+    db_list = test_catalog.list_namespaces()
+    for database_name in database_list:
+        assert (database_name,) in db_list
+    assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0
+
+
+def test_drop_namespace(test_catalog, table_schema_nested: Schema):
+    table_name = get_random_table_name()
+    database_name = get_random_database_name()
+    test_catalog.create_namespace(database_name)
+    assert (database_name,) in test_catalog.list_namespaces()
+    test_catalog.create_table((database_name, table_name), table_schema_nested)
+    with pytest.raises(NamespaceNotEmptyError):
+        test_catalog.drop_namespace(database_name)
+    test_catalog.drop_table((database_name, table_name))
+    test_catalog.drop_namespace(database_name)
+    assert (database_name,) not in test_catalog.list_namespaces()
+
+
+def test_load_namespace_properties(test_catalog):
+    warehouse_location = get_s3_path(get_bucket_name())
+    database_name = get_random_database_name()
+    test_properties = {
+        "comment": "this is a test description",
+        "location": f"{warehouse_location}/{database_name}.db",
+        "test_property1": "1",
+        "test_property2": "2",
+        "test_property3": "3",
+    }
+
+    test_catalog.create_namespace(database_name, test_properties)
+    listed_properties = test_catalog.load_namespace_properties(database_name)
+    for k, v in listed_properties.items():
+        assert k in test_properties
+        assert v == test_properties[k]
+
+
+def test_load_empty_namespace_properties(test_catalog):
+    database_name = get_random_database_name()
+    test_catalog.create_namespace(database_name)
+    listed_properties = test_catalog.load_namespace_properties(database_name)
+    assert listed_properties == {}
+
+
+def test_load_default_namespace_properties(test_catalog, glue):
+    database_name = get_random_database_name()
+    # simulate creating database with default settings through AWS Glue Web Console
+    glue.create_database(DatabaseInput={"Name": database_name})
+    listed_properties = test_catalog.load_namespace_properties(database_name)
+    assert listed_properties == {}
+
+
+def test_update_namespace_properties(test_catalog):
+    warehouse_location = get_s3_path(get_bucket_name())
+    database_name = get_random_database_name()
+    test_properties = {
+        "comment": "this is a test description",
+        "location": f"{warehouse_location}/{database_name}.db",
+        "test_property1": "1",
+        "test_property2": "2",
+        "test_property3": "3",
+    }
+    removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
+    updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"}
+    test_catalog.create_namespace(database_name, test_properties)
+    update_report = test_catalog.update_namespace_properties(database_name, removals, updates)
+    for k in updates.keys():
+        assert k in update_report.updated
+    for k in removals:
+        if k == "should_not_removed":
+            assert k in update_report.missing
+        else:
+            assert k in update_report.removed
+    assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]
diff --git a/python/tests/catalog/test_glue.py b/python/tests/catalog/test_glue.py
new file mode 100644
index 0000000000..ccdeaff033
--- /dev/null
+++ b/python/tests/catalog/test_glue.py
@@ -0,0 +1,471 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+import random
+import re
+import string
+
+import pytest
+from moto import mock_glue
+
+from pyiceberg.catalog.glue import GlueCatalog
+from pyiceberg.exceptions import (
+    NamespaceAlreadyExistsError,
+    NamespaceNotEmptyError,
+    NoSuchIcebergTableError,
+    NoSuchNamespaceError,
+    NoSuchPropertyException,
+    NoSuchTableError,
+    TableAlreadyExistsError,
+)
+
+BUCKET_NAME = "test_bucket"
+RANDOM_LENGTH = 20
+LIST_TEST_NUMBER = 100
+table_metadata_location_regex = re.compile(
+    r"""s3://test_bucket/my_iceberg_database-[a-z]{20}.db/
+    my_iceberg_table-[a-z]{20}/metadata/
+    00000-[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.metadata.json""",
+    re.X,
+)
+
+
+def get_random_table_name():
+    prefix = "my_iceberg_table-"
+    random_tag = "".join(random.choice(string.ascii_letters) for _ in range(RANDOM_LENGTH))
+    return (prefix + random_tag).lower()
+
+
+def get_random_tables(n):
+    result = set()
+    for _ in range(n):
+        result.add(get_random_table_name())
+    return result
+
+
+def get_random_database_name():
+    prefix = "my_iceberg_database-"
+    random_tag = "".join(random.choice(string.ascii_letters) for _ in range(RANDOM_LENGTH))
+    return (prefix + random_tag).lower()
+
+
+def get_random_databases(n):
+    result = set()
+    for _ in range(n):
+        result.add(get_random_database_name())
+    return result
+
+
+@pytest.fixture(name="_bucket_initialize")
+def fixture_s3_bucket(_s3):
+    _s3.create_bucket(Bucket=BUCKET_NAME)
+
+
+@mock_glue
+def test_create_table_with_database_location(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db"})
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    assert table.identifier == identifier
+    assert table_metadata_location_regex.match(table.metadata_location)
+
+
+@mock_glue
+def test_create_table_with_default_warehouse(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    test_catalog.create_namespace(namespace=database_name)
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    assert table.identifier == identifier
+    assert table_metadata_location_regex.match(table.metadata_location)
+
+
+@mock_glue
+def test_create_table_with_given_location(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(namespace=database_name)
+    table = test_catalog.create_table(
+        identifier=identifier, schema=table_schema_nested, location=f"s3://{BUCKET_NAME}/{database_name}.db/{table_name}"
+    )
+    assert table.identifier == identifier
+    assert table_metadata_location_regex.match(table.metadata_location)
+
+
+@mock_glue
+def test_create_table_with_no_location(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(namespace=database_name)
+    with pytest.raises(ValueError):
+        test_catalog.create_table(identifier=identifier, schema=table_schema_nested)
+
+
+@mock_glue
+def test_create_table_with_strips(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(namespace=database_name, properties={"location": f"s3://{BUCKET_NAME}/{database_name}.db/"})
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    assert table.identifier == identifier
+    assert table_metadata_location_regex.match(table.metadata_location)
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog_strip = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}/")
+    test_catalog_strip.create_namespace(namespace=database_name)
+    table_strip = test_catalog_strip.create_table(identifier, table_schema_nested)
+    assert table_strip.identifier == identifier
+    assert table_metadata_location_regex.match(table_strip.metadata_location)
+
+
+@mock_glue
+def test_create_table_with_no_database(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue")
+    with pytest.raises(NoSuchNamespaceError):
+        test_catalog.create_table(identifier=identifier, schema=table_schema_nested)
+
+
+@mock_glue
+def test_create_duplicated_table(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    test_catalog.create_namespace(namespace=database_name)
+    test_catalog.create_table(identifier, table_schema_nested)
+    with pytest.raises(TableAlreadyExistsError):
+        test_catalog.create_table(identifier, table_schema_nested)
+
+
+@mock_glue
+def test_load_table(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    test_catalog.create_namespace(namespace=database_name)
+    test_catalog.create_table(identifier, table_schema_nested)
+    table = test_catalog.load_table(identifier)
+    assert table.identifier == identifier
+    assert table_metadata_location_regex.match(table.metadata_location)
+
+
+@mock_glue
+def test_load_non_exist_table(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    test_catalog.create_namespace(namespace=database_name)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(identifier)
+
+
+@mock_glue
+def test_drop_table(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    test_catalog.create_namespace(namespace=database_name)
+    test_catalog.create_table(identifier, table_schema_nested)
+    table = test_catalog.load_table(identifier)
+    assert table.identifier == identifier
+    assert table_metadata_location_regex.match(table.metadata_location)
+    test_catalog.drop_table(identifier)
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(identifier)
+
+
+@mock_glue
+def test_drop_non_exist_table(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    with pytest.raises(NoSuchTableError):
+        test_catalog.drop_table(identifier)
+
+
+@mock_glue
+def test_rename_table(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    new_table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    new_identifier = (database_name, new_table_name)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    test_catalog.create_namespace(namespace=database_name)
+    table = test_catalog.create_table(identifier, table_schema_nested)
+    assert table.identifier == identifier
+    assert table_metadata_location_regex.match(table.metadata_location)
+    test_catalog.rename_table(identifier, new_identifier)
+    new_table = test_catalog.load_table(new_identifier)
+    assert new_table.identifier == new_identifier
+    # the metadata_location should not change
+    assert new_table.metadata_location == table.metadata_location
+    # old table should be dropped
+    with pytest.raises(NoSuchTableError):
+        test_catalog.load_table(identifier)
+
+
+@mock_glue
+def test_rename_table_no_params(_glue, _bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    new_database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    new_table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    new_identifier = (new_database_name, new_table_name)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    test_catalog.create_namespace(namespace=database_name)
+    test_catalog.create_namespace(namespace=new_database_name)
+    _glue.create_table(
+        DatabaseName=database_name,
+        TableInput={"Name": table_name, "TableType": "EXTERNAL_TABLE", "Parameters": {"table_type": "iceberg"}},
+    )
+    with pytest.raises(NoSuchPropertyException):
+        test_catalog.rename_table(identifier, new_identifier)
+
+
+@mock_glue
+def test_rename_non_iceberg_table(_glue, _bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    new_database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    new_table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    new_identifier = (new_database_name, new_table_name)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    test_catalog.create_namespace(namespace=database_name)
+    test_catalog.create_namespace(namespace=new_database_name)
+    _glue.create_table(
+        DatabaseName=database_name,
+        TableInput={
+            "Name": table_name,
+            "TableType": "EXTERNAL_TABLE",
+            "Parameters": {"table_type": "noniceberg", "metadata_location": "test"},
+        },
+    )
+    with pytest.raises(NoSuchIcebergTableError):
+        test_catalog.rename_table(identifier, new_identifier)
+
+
+@mock_glue
+def test_list_tables(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_list = get_random_tables(LIST_TEST_NUMBER)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    test_catalog.create_namespace(namespace=database_name)
+    for table_name in table_list:
+        test_catalog.create_table((database_name, table_name), table_schema_nested)
+    loaded_table_list = test_catalog.list_tables(database_name)
+    for table_name in table_list:
+        assert (database_name, table_name) in loaded_table_list
+
+
+@mock_glue
+def test_list_namespaces(_bucket_initialize, _patch_aiobotocore):
+    database_list = get_random_databases(LIST_TEST_NUMBER)
+    test_catalog = GlueCatalog("glue")
+    for database_name in database_list:
+        test_catalog.create_namespace(namespace=database_name)
+    loaded_database_list = test_catalog.list_namespaces()
+    for database_name in database_list:
+        assert (database_name,) in loaded_database_list
+
+
+@mock_glue
+def test_create_namespace_no_properties(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(namespace=database_name)
+    loaded_database_list = test_catalog.list_namespaces()
+    assert len(loaded_database_list) == 1
+    assert (database_name,) in loaded_database_list
+    properties = test_catalog.load_namespace_properties(database_name)
+    assert properties == {}
+
+
+@mock_glue
+def test_create_namespace_with_comment_and_location(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    test_location = f"s3://{BUCKET_NAME}/{database_name}.db"
+    test_properties = {
+        "comment": "this is a test description",
+        "location": test_location,
+    }
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(namespace=database_name, properties=test_properties)
+    loaded_database_list = test_catalog.list_namespaces()
+    assert len(loaded_database_list) == 1
+    assert (database_name,) in loaded_database_list
+    properties = test_catalog.load_namespace_properties(database_name)
+    assert properties["comment"] == "this is a test description"
+    assert properties["location"] == test_location
+
+
+@mock_glue
+def test_create_duplicated_namespace(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(namespace=database_name)
+    loaded_database_list = test_catalog.list_namespaces()
+    assert len(loaded_database_list) == 1
+    assert (database_name,) in loaded_database_list
+    with pytest.raises(NamespaceAlreadyExistsError):
+        test_catalog.create_namespace(namespace=database_name, properties={"test": "test"})
+
+
+@mock_glue
+def test_drop_namespace(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(namespace=database_name)
+    loaded_database_list = test_catalog.list_namespaces()
+    assert len(loaded_database_list) == 1
+    assert (database_name,) in loaded_database_list
+    test_catalog.drop_namespace(database_name)
+    loaded_database_list = test_catalog.list_namespaces()
+    assert len(loaded_database_list) == 0
+
+
+@mock_glue
+def test_drop_non_empty_namespace(_bucket_initialize, _patch_aiobotocore, table_schema_nested):
+    database_name = get_random_database_name()
+    table_name = get_random_table_name()
+    identifier = (database_name, table_name)
+    test_catalog = GlueCatalog("glue", warehouse=f"s3://{BUCKET_NAME}")
+    test_catalog.create_namespace(namespace=database_name)
+    test_catalog.create_table(identifier, table_schema_nested)
+    assert len(test_catalog.list_tables(database_name)) == 1
+    with pytest.raises(NamespaceNotEmptyError):
+        test_catalog.drop_namespace(database_name)
+
+
+@mock_glue
+def test_drop_non_exist_namespace(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    test_catalog = GlueCatalog("glue")
+    with pytest.raises(NoSuchNamespaceError):
+        test_catalog.drop_namespace(database_name)
+
+
+@mock_glue
+def test_load_namespace_properties(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    test_location = f"s3://{BUCKET_NAME}/{database_name}.db"
+    test_properties = {
+        "comment": "this is a test description",
+        "location": test_location,
+        "test_property1": "1",
+        "test_property2": "2",
+        "test_property3": "3",
+    }
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(database_name, test_properties)
+    listed_properties = test_catalog.load_namespace_properties(database_name)
+    for k, v in listed_properties.items():
+        assert k in test_properties
+        assert v == test_properties[k]
+
+
+@mock_glue
+def test_load_non_exist_namespace_properties(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    test_catalog = GlueCatalog("glue")
+    with pytest.raises(NoSuchNamespaceError):
+        test_catalog.load_namespace_properties(database_name)
+
+
+@mock_glue
+def test_update_namespace_properties(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    test_properties = {
+        "comment": "this is a test description",
+        "location": f"s3://{BUCKET_NAME}/{database_name}.db",
+        "test_property1": "1",
+        "test_property2": "2",
+        "test_property3": "3",
+    }
+    removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
+    updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"}
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(database_name, test_properties)
+    update_report = test_catalog.update_namespace_properties(database_name, removals, updates)
+    for k in updates.keys():
+        assert k in update_report.updated
+    for k in removals:
+        if k == "should_not_removed":
+            assert k in update_report.missing
+        else:
+            assert k in update_report.removed
+    assert "updated test description" == test_catalog.load_namespace_properties(database_name)["comment"]
+    test_catalog.drop_namespace(database_name)
+
+
+@mock_glue
+def test_load_empty_namespace_properties(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(database_name)
+    listed_properties = test_catalog.load_namespace_properties(database_name)
+    assert listed_properties == {}
+
+
+@mock_glue
+def test_load_default_namespace_properties(_glue, _bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    # simulate creating database with default settings through AWS Glue Web Console
+    _glue.create_database(DatabaseInput={"Name": database_name})
+    test_catalog = GlueCatalog("glue")
+    listed_properties = test_catalog.load_namespace_properties(database_name)
+    assert listed_properties == {}
+
+
+@mock_glue
+def test_update_namespace_properties_overlap_update_removal(_bucket_initialize, _patch_aiobotocore):
+    database_name = get_random_database_name()
+    test_properties = {
+        "comment": "this is a test description",
+        "location": f"s3://{BUCKET_NAME}/{database_name}.db",
+        "test_property1": "1",
+        "test_property2": "2",
+        "test_property3": "3",
+    }
+    removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"}
+    updates = {"test_property1": "4", "test_property5": "5", "comment": "updated test description"}
+    test_catalog = GlueCatalog("glue")
+    test_catalog.create_namespace(database_name, test_properties)
+    with pytest.raises(ValueError):
+        test_catalog.update_namespace_properties(database_name, removals, updates)
+    # should not modify the properties
+    assert test_catalog.load_namespace_properties(database_name) == test_properties
diff --git a/python/tests/conftest.py b/python/tests/conftest.py
index be51c90357..d11d155b47 100644
--- a/python/tests/conftest.py
+++ b/python/tests/conftest.py
@@ -28,13 +28,24 @@ import os
 from tempfile import TemporaryDirectory
 from typing import (
     Any,
+    Callable,
     Dict,
     Generator,
     Union,
 )
+from unittest.mock import MagicMock
 from urllib.parse import urlparse
 
+import aiobotocore.awsrequest
+import aiobotocore.endpoint
+import aiohttp
+import aiohttp.client_reqrep
+import aiohttp.typedefs
+import boto3
+import botocore.awsrequest
+import botocore.model
 import pytest
+from moto import mock_glue, mock_s3
 
 from pyiceberg import schema
 from pyiceberg.io import (
@@ -1142,3 +1153,105 @@ def fsspec_fileio(request):
         "s3.secret-access-key": request.config.getoption("--s3.secret-access-key"),
     }
     return fsspec.FsspecFileIO(properties=properties)
+
+
+class MockAWSResponse(aiobotocore.awsrequest.AioAWSResponse):
+    """
+    A mocked aws response implementation (for test use only)
+    See https://github.com/aio-libs/aiobotocore/issues/755
+    """
+
+    def __init__(self, response: botocore.awsrequest.AWSResponse):
+        self._moto_response = response
+        self.status_code = response.status_code
+        self.raw = MockHttpClientResponse(response)
+
+    # adapt async methods to use moto's response
+    async def _content_prop(self) -> bytes:
+        return self._moto_response.content
+
+    async def _text_prop(self) -> str:
+        return self._moto_response.text
+
+
+class MockHttpClientResponse(aiohttp.client_reqrep.ClientResponse):
+    """
+    A mocked http client response implementation (for test use only)
+    See https://github.com/aio-libs/aiobotocore/issues/755
+    """
+
+    def __init__(self, response: botocore.awsrequest.AWSResponse):
+        async def read(*_) -> bytes:
+            # streaming/range requests. used by s3fs
+            return response.content
+
+        self.content = MagicMock(aiohttp.StreamReader)
+        self.content.read = read
+        self.response = response
+
+    @property
+    def raw_headers(self) -> aiohttp.typedefs.RawHeaders:
+        # Return the headers encoded the way that aiobotocore expects them
+        return {k.encode("utf-8"): str(v).encode("utf-8") for k, v in self.response.headers.items()}.items()
+
+
+def patch_aiobotocore():
+    """
+    Patch aiobotocore to work with moto
+    See https://github.com/aio-libs/aiobotocore/issues/755
+    """
+
+    def factory(original: Callable) -> Callable:
+        def patched_convert_to_response_dict(
+            http_response: botocore.awsrequest.AWSResponse, operation_model: botocore.model.OperationModel
+        ):
+            return original(MockAWSResponse(http_response), operation_model)
+
+        return patched_convert_to_response_dict
+
+    aiobotocore.endpoint.convert_to_response_dict = factory(aiobotocore.endpoint.convert_to_response_dict)
+
+
+@pytest.fixture(name="_patch_aiobotocore")
+def fixture_aiobotocore():
+    """
+    Patch aiobotocore to work with moto
+    pending close of this issue: https://github.com/aio-libs/aiobotocore/issues/755
+    """
+    stored_method = aiobotocore.endpoint.convert_to_response_dict
+    yield patch_aiobotocore()
+    # restore the changed method after the fixture is destroyed
+    aiobotocore.endpoint.convert_to_response_dict = stored_method
+
+
+def aws_credentials():
+    os.environ["AWS_ACCESS_KEY_ID"] = "testing"
+    os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
+    os.environ["AWS_SECURITY_TOKEN"] = "testing"
+    os.environ["AWS_SESSION_TOKEN"] = "testing"
+    os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
+
+
+@pytest.fixture(name="_aws_credentials")
+def fixture_aws_credentials():
+    """Mocked AWS Credentials for moto."""
+    yield aws_credentials()
+    os.environ.pop("AWS_ACCESS_KEY_ID")
+    os.environ.pop("AWS_SECRET_ACCESS_KEY")
+    os.environ.pop("AWS_SECURITY_TOKEN")
+    os.environ.pop("AWS_SESSION_TOKEN")
+    os.environ.pop("AWS_DEFAULT_REGION")
+
+
+@pytest.fixture(name="_s3")
+def fixture_s3(_aws_credentials):
+    """Mocked S3 client"""
+    with mock_s3():
+        yield boto3.client("s3", region_name="us-east-1")
+
+
+@pytest.fixture(name="_glue")
+def fixture_glue(_aws_credentials):
+    """Mocked glue client"""
+    with mock_glue():
+        yield boto3.client("glue", region_name="us-east-1")