You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/11/19 18:23:55 UTC

[iceberg] branch master updated: AWS: Add Glue catalog and table operations (#1633)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 0beb7d6  AWS: Add Glue catalog and table operations (#1633)
0beb7d6 is described below

commit 0beb7d663694d78e6edecb241a008e9834c82d8c
Author: jackye1995 <yz...@amazon.com>
AuthorDate: Thu Nov 19 10:23:35 2020 -0800

    AWS: Add Glue catalog and table operations (#1633)
---
 .../org/apache/iceberg/aws/AwsIntegTestUtil.java   |  18 +
 .../iceberg/aws/glue/GlueCatalogNamespaceTest.java | 143 ++++++++
 .../iceberg/aws/glue/GlueCatalogTableTest.java     | 282 ++++++++++++++
 .../org/apache/iceberg/aws/glue/GlueTestBase.java  | 102 ++++++
 .../java/org/apache/iceberg/aws/AwsClientUtil.java |   7 +
 .../java/org/apache/iceberg/aws/AwsProperties.java |  43 +++
 .../org/apache/iceberg/aws/glue/GlueCatalog.java   | 403 +++++++++++++++++++++
 .../iceberg/aws/glue/GlueTableOperations.java      | 182 ++++++++++
 .../iceberg/aws/glue/GlueToIcebergConverter.java   |  53 +++
 .../iceberg/aws/glue/IcebergToGlueConverter.java   | 135 +++++++
 .../apache/iceberg/aws/glue/GlueCatalogTest.java   | 358 ++++++++++++++++++
 .../aws/glue/GlueToIcebergConverterTest.java       |  92 +++++
 .../aws/glue/IcebergToGlueConverterTest.java       |  67 ++++
 build.gradle                                       |   9 +
 14 files changed, 1894 insertions(+)

diff --git a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
index a71804c..d8faaf8 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
@@ -19,7 +19,12 @@
 
 package org.apache.iceberg.aws;
 
+import java.util.List;
 import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.Delete;
 import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
@@ -29,6 +34,8 @@ import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
 
 public class AwsIntegTestUtil {
 
+  private static final Logger LOG = LoggerFactory.getLogger(AwsIntegTestUtil.class);
+
   private AwsIntegTestUtil() {
   }
 
@@ -55,4 +62,15 @@ public class AwsIntegTestUtil {
       }
     }
   }
+
+  public static void cleanGlueCatalog(GlueClient glue, List<String> namespaces) {
+    for (String namespace : namespaces) {
+      try {
+        // delete db also delete tables
+        glue.deleteDatabase(DeleteDatabaseRequest.builder().name(namespace).build());
+      } catch (Exception e) {
+        LOG.error("Cannot delete namespace {}", namespace, e);
+      }
+    }
+  }
 }
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java
new file mode 100644
index 0000000..9dc3ce5
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+
+public class GlueCatalogNamespaceTest extends GlueTestBase {
+
+  @Test
+  public void testCreateNamespace() {
+    String namespace = getRandomName();
+    namespaces.add(namespace);
+    AssertHelpers.assertThrows("namespace does not exist before create",
+        EntityNotFoundException.class,
+        "not found",
+        () -> glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()));
+    glueCatalog.createNamespace(Namespace.of(namespace));
+    Database database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database();
+    Assert.assertEquals("namespace must equal database name", namespace, database.name());
+  }
+
+  @Test
+  public void testCreateDuplicate() {
+    String namespace = createNamespace();
+    AssertHelpers.assertThrows("should not create namespace with the same name",
+        AlreadyExistsException.class,
+        "it already exists in Glue",
+        () -> glueCatalog.createNamespace(Namespace.of(namespace)));
+  }
+
+  @Test
+  public void testCreateBadName() {
+    List<Namespace> invalidNamespaces = Lists.newArrayList(
+        Namespace.of("db-1"),
+        Namespace.of("db", "db2")
+    );
+
+    for (Namespace namespace : invalidNamespaces) {
+      AssertHelpers.assertThrows("should not create namespace with invalid or nested names",
+          ValidationException.class,
+          "Cannot convert namespace",
+          () -> glueCatalog.createNamespace(namespace));
+    }
+  }
+
+  @Test
+  public void testNamespaceExists() {
+    String namespace = createNamespace();
+    Assert.assertTrue(glueCatalog.namespaceExists(Namespace.of(namespace)));
+  }
+
+  @Test
+  public void testListNamespace() {
+    String namespace = createNamespace();
+    List<Namespace> namespaceList = glueCatalog.listNamespaces();
+    Assert.assertTrue(namespaceList.size() > 0);
+    Assert.assertTrue(namespaceList.contains(Namespace.of(namespace)));
+    namespaceList = glueCatalog.listNamespaces(Namespace.of(namespace));
+    Assert.assertTrue(namespaceList.isEmpty());
+  }
+
+  @Test
+  public void testNamespaceProperties() {
+    String namespace = createNamespace();
+    // set properties
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("key", "val");
+    properties.put("key2", "val2");
+    glueCatalog.setProperties(Namespace.of(namespace), properties);
+    Database database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database();
+    Assert.assertTrue(database.parameters().containsKey("key"));
+    Assert.assertEquals("val", database.parameters().get("key"));
+    Assert.assertTrue(database.parameters().containsKey("key2"));
+    Assert.assertEquals("val2", database.parameters().get("key2"));
+    // remove properties
+    glueCatalog.removeProperties(Namespace.of(namespace), Sets.newHashSet("key"));
+    database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database();
+    Assert.assertFalse(database.parameters().containsKey("key"));
+    Assert.assertTrue(database.parameters().containsKey("key2"));
+    Assert.assertEquals("val2", database.parameters().get("key2"));
+    // add back
+    properties = Maps.newHashMap();
+    properties.put("key", "val");
+    glueCatalog.setProperties(Namespace.of(namespace), properties);
+    database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database();
+    Assert.assertTrue(database.parameters().containsKey("key"));
+    Assert.assertEquals("val", database.parameters().get("key"));
+    Assert.assertTrue(database.parameters().containsKey("key2"));
+    Assert.assertEquals("val2", database.parameters().get("key2"));
+  }
+
+  @Test
+  public void testDropNamespace() {
+    String namespace = createNamespace();
+    glueCatalog.dropNamespace(Namespace.of(namespace));
+    AssertHelpers.assertThrows("namespace should not exist after deletion",
+        EntityNotFoundException.class,
+        "not found",
+        () -> glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()));
+  }
+
+  @Test
+  public void testDropNamespaceNonEmpty() {
+    String namespace = createNamespace();
+    createTable(namespace);
+    AssertHelpers.assertThrows("namespace should not be dropped when still has table",
+        NamespaceNotEmptyException.class,
+        "it is not empty",
+        () -> glueCatalog.dropNamespace(Namespace.of(namespace)));
+  }
+
+}
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogTableTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogTableTest.java
new file mode 100644
index 0000000..3de6703
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogTableTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTableVersionsRequest;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class GlueCatalogTableTest extends GlueTestBase {
+
+  @Test
+  public void testCreateTable() {
+    String namespace = createNamespace();
+    String tableName = getRandomName();
+    glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec);
+    // verify table exists in Glue
+    GetTableResponse response = glue.getTable(GetTableRequest.builder()
+        .databaseName(namespace).name(tableName).build());
+    Assert.assertEquals(namespace, response.table().databaseName());
+    Assert.assertEquals(tableName, response.table().name());
+    Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH),
+        response.table().parameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP));
+    Assert.assertTrue(response.table().parameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP));
+    // verify metadata file exists in S3
+    String metaLocation = response.table().parameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+    String key = metaLocation.split(testBucketName, -1)[1].substring(1);
+    s3.headObject(HeadObjectRequest.builder().bucket(testBucketName).key(key).build());
+    Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+    Assert.assertEquals(partitionSpec, table.spec());
+    Assert.assertEquals(schema.toString(), table.schema().toString());
+  }
+
+  @Test
+  public void testCreateTableDuplicate() {
+    String namespace = createNamespace();
+    String tableName = createTable(namespace);
+    AssertHelpers.assertThrows("should not create table with the same name",
+        AlreadyExistsException.class,
+        "Table already exists",
+        () -> glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec));
+  }
+
+  @Test
+  public void testCreateTableBadName() {
+    String namespace = createNamespace();
+    AssertHelpers.assertThrows("should not create table with bad name",
+        IllegalArgumentException.class,
+        "Invalid table identifier",
+        () -> glueCatalog.createTable(TableIdentifier.of(namespace, "table-1"), schema, partitionSpec));
+  }
+
+  @Test
+  public void testListTables() {
+    String namespace = createNamespace();
+    Assert.assertTrue("list namespace should have nothing before table creation",
+        glueCatalog.listTables(Namespace.of(namespace)).isEmpty());
+    String tableName = createTable(namespace);
+    List<TableIdentifier> tables = glueCatalog.listTables(Namespace.of(namespace));
+    Assert.assertEquals(1, tables.size());
+    Assert.assertEquals(TableIdentifier.of(namespace, tableName), tables.get(0));
+  }
+
+  @Test
+  public void testTableExists() {
+    String namespace = createNamespace();
+    String tableName = createTable(namespace);
+    Assert.assertTrue(glueCatalog.tableExists(TableIdentifier.of(namespace, tableName)));
+  }
+
+  @Test
+  public void testUpdateTable() {
+    String namespace = createNamespace();
+    String tableName = getRandomName();
+    // current should be null
+    TableOperations ops = glueCatalog.newTableOps(TableIdentifier.of(namespace, tableName));
+    TableMetadata current = ops.current();
+    Assert.assertNull(current);
+    // create table, refresh should update
+    createTable(namespace, tableName);
+    current = ops.refresh();
+    Assert.assertEquals(schema.toString(), current.schema().toString());
+    Assert.assertEquals(partitionSpec, current.spec());
+    Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+    Assert.assertTrue("initial table history should be empty", table.history().isEmpty());
+    // commit new version, should create a new snapshot
+    table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+    DataFile dataFile = DataFiles.builder(partitionSpec)
+        .withPath("/path/to/data-a.parquet")
+        .withFileSizeInBytes(10)
+        .withRecordCount(1)
+        .build();
+    table.newAppend().appendFile(dataFile).commit();
+    table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+    Assert.assertEquals("commit should create a new table version", 1, table.history().size());
+  }
+
+  @Test
+  public void testRenameTable() {
+    String namespace = createNamespace();
+    String tableName = createTable(namespace);
+    Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+    // rename table
+    String newTableName = tableName + "_2";
+    glueCatalog.renameTable(TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName));
+    Table renamedTable = glueCatalog.loadTable(TableIdentifier.of(namespace, newTableName));
+    Assert.assertEquals(table.location(), renamedTable.location());
+    Assert.assertEquals(table.schema().toString(), renamedTable.schema().toString());
+    Assert.assertEquals(table.spec(), renamedTable.spec());
+    Assert.assertEquals(table.currentSnapshot(), renamedTable.currentSnapshot());
+  }
+
+  @Test
+  public void testRenameTable_failToCreateNewTable() {
+    String namespace = createNamespace();
+    String tableName = createTable(namespace);
+    TableIdentifier id = TableIdentifier.of(namespace, tableName);
+    Table table = glueCatalog.loadTable(id);
+    // create a new table in Glue, so that rename to that table will fail
+    String newTableName = tableName + "_2";
+    glue.createTable(CreateTableRequest.builder()
+        .databaseName(namespace)
+        .tableInput(TableInput.builder().name(newTableName).build())
+        .build());
+    AssertHelpers.assertThrows("should fail to rename to an existing table",
+        software.amazon.awssdk.services.glue.model.AlreadyExistsException.class,
+        "Table already exists",
+        () -> glueCatalog.renameTable(
+            TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName)));
+    // old table can still be read with same metadata
+    Table oldTable = glueCatalog.loadTable(id);
+    Assert.assertEquals(table.location(), oldTable.location());
+    Assert.assertEquals(table.schema().toString(), oldTable.schema().toString());
+    Assert.assertEquals(table.spec(), oldTable.spec());
+    Assert.assertEquals(table.currentSnapshot(), oldTable.currentSnapshot());
+  }
+
+  @Test
+  public void testRenameTable_failToDeleteOldTable() {
+    String namespace = createNamespace();
+    String tableName = createTable(namespace);
+    TableIdentifier id = TableIdentifier.of(namespace, tableName);
+    Table table = glueCatalog.loadTable(id);
+    // delete the old table metadata, so that drop old table will fail
+    String newTableName = tableName + "_2";
+    glue.updateTable(UpdateTableRequest.builder()
+        .databaseName(namespace)
+        .tableInput(TableInput.builder().name(tableName).parameters(Maps.newHashMap()).build())
+        .build());
+    AssertHelpers.assertThrows("should fail to rename",
+        ValidationException.class,
+        "Input Glue table is not an iceberg table",
+        () -> glueCatalog.renameTable(
+            TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName)));
+    AssertHelpers.assertThrows("renamed table should be deleted",
+        EntityNotFoundException.class,
+        "not found",
+        () -> glue.getTable(GetTableRequest.builder().databaseName(namespace).name(newTableName).build()));
+  }
+
+  @Test
+  public void testDeleteTableWithoutPurge() {
+    String namespace = createNamespace();
+    String tableName = createTable(namespace);
+    glueCatalog.dropTable(TableIdentifier.of(namespace, tableName), false);
+    AssertHelpers.assertThrows("should not have table",
+        NoSuchTableException.class,
+        "Table does not exist",
+        () -> glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)));
+    String warehouseLocation = glueCatalog.defaultWarehouseLocation(TableIdentifier.of(namespace, tableName));
+    String prefix = warehouseLocation.split(testBucketName + "/", -1)[1];
+    ListObjectsV2Response response = s3.listObjectsV2(ListObjectsV2Request.builder()
+        .bucket(testBucketName).prefix(prefix + "/metadata/").build());
+    Assert.assertTrue(response.hasContents());
+    boolean hasMetaFile = false;
+    for (S3Object s3Object : response.contents()) {
+      if (s3Object.key().contains(".json")) {
+        hasMetaFile = true;
+        break;
+      }
+    }
+    Assert.assertTrue("metadata json file exists after delete without purge", hasMetaFile);
+  }
+
+  @Test
+  public void testDeleteTableWithPurge() {
+    String namespace = createNamespace();
+    String tableName = createTable(namespace);
+    glueCatalog.dropTable(TableIdentifier.of(namespace, tableName));
+    AssertHelpers.assertThrows("should not have table",
+        NoSuchTableException.class,
+        "Table does not exist",
+        () -> glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)));
+    String warehouseLocation = glueCatalog.defaultWarehouseLocation(TableIdentifier.of(namespace, tableName));
+    String prefix = warehouseLocation.split(testBucketName + "/", -1)[1];
+    ListObjectsV2Response response = s3.listObjectsV2(ListObjectsV2Request.builder()
+        .bucket(testBucketName).prefix(prefix).build());
+    if (response.hasContents()) {
+      // might have directory markers left
+      for (S3Object s3Object : response.contents()) {
+        Optional<Long> size = s3Object.getValueForField("Size", Long.class);
+        Assert.assertTrue(size.isPresent());
+        Assert.assertEquals(0L, (long) size.get());
+      }
+    }
+  }
+
+  @Test
+  public void testCommitTableSkipArchive() {
+    // create ns
+    String namespace = getRandomName();
+    namespaces.add(namespace);
+    glueCatalog.createNamespace(Namespace.of(namespace));
+    // create table and commit without skip
+    Schema schema = new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), "c1"));
+    PartitionSpec partitionSpec = PartitionSpec.builderFor(schema).build();
+    String tableName = getRandomName();
+    glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec);
+    Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+    DataFile dataFile = DataFiles.builder(partitionSpec)
+        .withPath("/path/to/data-a.parquet")
+        .withFileSizeInBytes(10)
+        .withRecordCount(1)
+        .build();
+    table.newAppend().appendFile(dataFile).commit();
+    Assert.assertEquals(2, glue.getTableVersions(GetTableVersionsRequest.builder()
+        .databaseName(namespace).tableName(tableName).build()).tableVersions().size());
+    // create table and commit with skip
+    tableName = getRandomName();
+    glueCatalogWithSkip.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec);
+    table = glueCatalogWithSkip.loadTable(TableIdentifier.of(namespace, tableName));
+    table.newAppend().appendFile(dataFile).commit();
+    Assert.assertEquals("skipArchive should not create new version",
+        1, glue.getTableVersions(GetTableVersionsRequest.builder()
+            .databaseName(namespace).tableName(tableName).build()).tableVersions().size());
+  }
+}
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
new file mode 100644
index 0000000..b787cb8
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.aws.AwsClientUtil;
+import org.apache.iceberg.aws.AwsIntegTestUtil;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.s3.S3Client;
+
+@SuppressWarnings({"VisibilityModifier", "HideUtilityClassConstructor"})
+public class GlueTestBase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GlueTestBase.class);
+
+  // the integration test requires the following env variables
+  static final String testBucketName = AwsIntegTestUtil.testBucketName();
+
+  static final String catalogName = "glue";
+  static final String testPathPrefix = getRandomName();
+  static final List<String> namespaces = Lists.newArrayList();
+
+  // aws clients
+  static final GlueClient glue = AwsClientUtil.defaultGlueClient();
+  static final S3Client s3 = AwsClientUtil.defaultS3Client();
+
+  // iceberg
+  static GlueCatalog glueCatalog;
+  static GlueCatalog glueCatalogWithSkip;
+
+  static Schema schema = new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), "c1"));
+  static PartitionSpec partitionSpec = PartitionSpec.builderFor(schema).build();
+
+  @BeforeClass
+  public static void beforeClass() {
+    String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix;
+    S3FileIO fileIO = new S3FileIO();
+    glueCatalog = new GlueCatalog(glue);
+    glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), fileIO);
+    AwsProperties properties = new AwsProperties();
+    properties.setGlueCatalogSkipArchive(true);
+    glueCatalogWithSkip = new GlueCatalog(glue);
+    glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, fileIO);
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    AwsIntegTestUtil.cleanGlueCatalog(glue, namespaces);
+    AwsIntegTestUtil.cleanS3Bucket(s3, testBucketName, testPathPrefix);
+  }
+
+  public static String getRandomName() {
+    return UUID.randomUUID().toString().replace("-", "");
+  }
+
+  public static String createNamespace() {
+    String namespace = getRandomName();
+    namespaces.add(namespace);
+    glueCatalog.createNamespace(Namespace.of(namespace));
+    return namespace;
+  }
+
+  public static String createTable(String namespace) {
+    String tableName = getRandomName();
+    return createTable(namespace, tableName);
+  }
+
+  public static String createTable(String namespace, String tableName) {
+    glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec);
+    return tableName;
+  }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
index e75b3f0..71c7ed0 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.aws;
 
 import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.glue.GlueClient;
 import software.amazon.awssdk.services.kms.KmsClient;
 import software.amazon.awssdk.services.s3.S3Client;
 
@@ -47,4 +48,10 @@ public class AwsClientUtil {
         .httpClient(UrlConnectionHttpClient.create())
         .build();
   }
+
+  public static GlueClient defaultGlueClient() {
+    return GlueClient.builder()
+        .httpClient(UrlConnectionHttpClient.create())
+        .build();
+  }
 }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index b9c33df..358a8c9 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.aws;
 
 import java.util.Map;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
 
 public class AwsProperties {
 
@@ -65,14 +66,36 @@ public class AwsProperties {
    */
   public static final String S3FILEIO_SSE_MD5 = "s3fileio.sse.md5";
 
+  /**
+   * The ID of the Glue Data Catalog where the tables reside.
+   * If none is provided, Glue automatically uses the caller's AWS account ID by default.
+   * For more details, see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html
+   */
+  public static final String GLUE_CATALOG_ID = "gluecatalog.id";
+
+  /**
+   * If Glue should skip archiving an old table version when creating a new version in a commit.
+   * By default Glue archives all old table versions after an UpdateTable call,
+   * but Glue has a default max number of archived table versions (can be increased).
+   * So for streaming use case with lots of commits, it is recommended to set this value to true.
+   */
+  public static final String GLUE_CATALOG_SKIP_ARCHIVE = "gluecatalog.skip-archive";
+  public static final boolean GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT = false;
+
   private String s3FileIoSseType;
   private String s3FileIoSseKey;
   private String s3FileIoSseMd5;
 
+  private String glueCatalogId;
+  private boolean glueCatalogSkipArchive;
+
   public AwsProperties() {
     this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
     this.s3FileIoSseKey = null;
     this.s3FileIoSseMd5 = null;
+
+    this.glueCatalogId = null;
+    this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
   }
 
   public AwsProperties(Map<String, String> properties) {
@@ -84,6 +107,10 @@ public class AwsProperties {
       Preconditions.checkNotNull(s3FileIoSseKey, "Cannot initialize SSE-C S3FileIO with null encryption key");
       Preconditions.checkNotNull(s3FileIoSseMd5, "Cannot initialize SSE-C S3FileIO with null encryption key MD5");
     }
+
+    this.glueCatalogId = properties.get(GLUE_CATALOG_ID);
+    this.glueCatalogSkipArchive = PropertyUtil.propertyAsBoolean(properties,
+        AwsProperties.GLUE_CATALOG_SKIP_ARCHIVE, AwsProperties.GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT);
   }
 
   public String s3FileIoSseType() {
@@ -109,4 +136,20 @@ public class AwsProperties {
   public void setS3FileIoSseMd5(String sseMd5) {
     this.s3FileIoSseMd5 = sseMd5;
   }
+
+  public String glueCatalogId() {
+    return glueCatalogId;
+  }
+
+  public void setGlueCatalogId(String id) {
+    this.glueCatalogId = id;
+  }
+
+  public boolean glueCatalogSkipArchive() {
+    return glueCatalogSkipArchive;
+  }
+
+  public void setGlueCatalogSkipArchive(boolean skipArchive) {
+    this.glueCatalogSkipArchive = skipArchive;
+  }
 }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
new file mode 100644
index 0000000..86ce858
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientUtil;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.InvalidInputException;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+
+public class GlueCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);
+
+  private final GlueClient glue;
+  private Configuration hadoopConf;
+  private String catalogName;
+  private String warehousePath;
+  private AwsProperties awsProperties;
+  private FileIO fileIO;
+
+  /**
+   * No-arg constructor to load the catalog dynamically.
+   * <p>
+   * Only the AWS Glue client is initialized.
+   * Other fields must be initialized by calling {@link GlueCatalog#initialize(String, Map)} later.
+   */
+  public GlueCatalog() {
+    this(AwsClientUtil.defaultGlueClient());
+  }
+
+  @VisibleForTesting
+  GlueCatalog(GlueClient glue) {
+    this.glue = glue;
+  }
+
+  @Override
+  public void initialize(String name, Map<String, String> properties) {
+    String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+    initialize(
+        name,
+        properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+        new AwsProperties(properties),
+        fileIOImpl == null ? new S3FileIO() : CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf));
+  }
+
+  @VisibleForTesting
+  void initialize(String name, String path, AwsProperties properties, FileIO io) {
+    this.catalogName = name;
+    this.awsProperties = properties;
+    this.warehousePath = cleanWarehousePath(path);
+    this.fileIO = io;
+  }
+
+  private String cleanWarehousePath(String path) {
+    Preconditions.checkArgument(path != null && path.length() > 0,
+        "Cannot initialize GlueCatalog because warehousePath must not be null");
+    int len = path.length();
+    if (path.charAt(len - 1) == '/') {
+      return path.substring(0, len - 1);
+    } else {
+      return path;
+    }
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    return new GlueTableOperations(glue, catalogName, awsProperties, fileIO, tableIdentifier);
+  }
+
+  /**
+   * This method produces the same result as using a HiveCatalog.
+   * If databaseUri exists for the Glue database URI, the default location is databaseUri/tableName.
+   * If not, the default location is warehousePath/databaseName.db/tableName
+   * @param tableIdentifier table id
+   * @return default warehouse path
+   */
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+    // check if value is set in database
+    GetDatabaseResponse response = glue.getDatabase(GetDatabaseRequest.builder()
+        .name(IcebergToGlueConverter.getDatabaseName(tableIdentifier))
+        .build());
+    String dbLocationUri = response.database().locationUri();
+    if (dbLocationUri != null) {
+      return String.format("%s/%s", dbLocationUri, tableIdentifier.name());
+    }
+
+    return String.format(
+        "%s/%s.db/%s",
+        warehousePath,
+        IcebergToGlueConverter.getDatabaseName(tableIdentifier),
+        tableIdentifier.name());
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    namespaceExists(namespace);
+    // should be safe to list all before returning the list, instead of dynamically load the list.
+    String nextToken = null;
+    List<TableIdentifier> results = Lists.newArrayList();
+    do {
+      GetTablesResponse response = glue.getTables(GetTablesRequest.builder()
+          .catalogId(awsProperties.glueCatalogId())
+          .databaseName(IcebergToGlueConverter.toDatabaseName(namespace))
+          .nextToken(nextToken)
+          .build());
+      nextToken = response.nextToken();
+      if (response.hasTableList()) {
+        results.addAll(response.tableList().stream()
+            .map(GlueToIcebergConverter::toTableId)
+            .collect(Collectors.toList()));
+      }
+    } while (nextToken != null);
+
+    LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+    return results;
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    try {
+      TableOperations ops = newTableOps(identifier);
+      TableMetadata lastMetadata = ops.current();
+      glue.deleteTable(DeleteTableRequest.builder()
+          .catalogId(awsProperties.glueCatalogId())
+          .databaseName(IcebergToGlueConverter.getDatabaseName(identifier))
+          .name(identifier.name())
+          .build());
+      LOG.info("Successfully dropped table {} from Glue", identifier);
+      if (purge && lastMetadata != null) {
+        CatalogUtil.dropTableData(ops.io(), lastMetadata);
+        LOG.info("Glue table {} data purged", identifier);
+      }
+      LOG.info("Dropped table: {}", identifier);
+      return true;
+    } catch (EntityNotFoundException e) {
+      LOG.error("Cannot drop table {} because table not found or not accessible", identifier, e);
+      return false;
+    } catch (Exception e) {
+      LOG.error("Cannot complete drop table operation for {} due to unexpected exception", identifier, e);
+      throw e;
+    }
+  }
+
+  /**
+   * Rename table in Glue is a drop table and create table.
+   * @param from identifier of the table to rename
+   * @param to new table name
+   */
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    // check new namespace exists
+    if (!namespaceExists(to.namespace())) {
+      throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist",
+          from, to, to.namespace());
+    }
+    // keep metadata
+    Table fromTable = null;
+    String fromTableDbName = IcebergToGlueConverter.getDatabaseName(from);
+    String fromTableName = IcebergToGlueConverter.getTableName(from);
+    String toTableDbName = IcebergToGlueConverter.getDatabaseName(to);
+    String toTableName = IcebergToGlueConverter.getTableName(to);
+    try {
+      GetTableResponse response = glue.getTable(GetTableRequest.builder()
+          .catalogId(awsProperties.glueCatalogId())
+          .databaseName(fromTableDbName)
+          .name(fromTableName)
+          .build());
+      fromTable = response.table();
+    } catch (EntityNotFoundException e) {
+      throw new NoSuchTableException(e, "Cannot rename %s because the table does not exist in Glue", from);
+    }
+
+    // use the same Glue info to create the new table, pointing to the old metadata
+    TableInput.Builder tableInputBuilder = TableInput.builder()
+        .owner(fromTable.owner())
+        .tableType(fromTable.tableType())
+        .parameters(fromTable.parameters());
+
+    glue.createTable(CreateTableRequest.builder()
+        .catalogId(awsProperties.glueCatalogId())
+        .databaseName(toTableDbName)
+        .tableInput(tableInputBuilder.name(toTableName).build())
+        .build());
+    LOG.info("created rename destination table {}", to);
+
+    try {
+      dropTable(from, false);
+    } catch (Exception e) {
+      // rollback, delete renamed table
+      LOG.error("Fail to drop old table {} after renaming to {}, rollback to use the old table", from, to, e);
+      glue.deleteTable(DeleteTableRequest.builder()
+          .catalogId(awsProperties.glueCatalogId())
+          .databaseName(toTableDbName)
+          .name(toTableName)
+          .build());
+      throw e;
+    }
+
+    LOG.info("Successfully renamed table from {} to {}", from, to);
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+    try {
+      glue.createDatabase(CreateDatabaseRequest.builder()
+          .catalogId(awsProperties.glueCatalogId())
+          .databaseInput(IcebergToGlueConverter.toDatabaseInput(namespace, metadata))
+          .build());
+      LOG.info("Created namespace: {}", namespace);
+    } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) {
+      throw new AlreadyExistsException("Cannot create namespace %s because it already exists in Glue", namespace);
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+    if (!namespace.isEmpty()) {
+      // if it is not a list all op, just check if the namespace exists and return empty.
+      if (namespaceExists(namespace)) {
+        return Lists.newArrayList();
+      }
+      throw new NoSuchNamespaceException(
+          "Glue does not support nested namespace, cannot list namespaces under %s", namespace);
+    }
+
+    // should be safe to list all before returning the list, instead of dynamically load the list.
+    String nextToken = null;
+    List<Namespace> results = Lists.newArrayList();
+    do {
+      GetDatabasesResponse response = glue.getDatabases(GetDatabasesRequest.builder()
+          .catalogId(awsProperties.glueCatalogId())
+          .nextToken(nextToken)
+          .build());
+      nextToken = response.nextToken();
+      if (response.hasDatabaseList()) {
+        results.addAll(response.databaseList().stream()
+            .map(GlueToIcebergConverter::toNamespace)
+            .collect(Collectors.toList()));
+      }
+    } while (nextToken != null);
+
+    LOG.debug("Listing namespace {} returned namespaces: {}", namespace, results);
+    return results;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+    String databaseName = IcebergToGlueConverter.toDatabaseName(namespace);
+    try {
+      GetDatabaseResponse response = glue.getDatabase(GetDatabaseRequest.builder()
+          .catalogId(awsProperties.glueCatalogId())
+          .name(databaseName)
+          .build());
+      Map<String, String> result = response.database().parameters();
+      LOG.debug("Loaded metadata for namespace {} found {}", namespace, result);
+      return result;
+    } catch (InvalidInputException e) {
+      throw new NoSuchNamespaceException("invalid input for namespace %s, error message: %s",
+          namespace, e.getMessage());
+    } catch (EntityNotFoundException e) {
+      throw new NoSuchNamespaceException("fail to find Glue database for namespace %s, error message: %s",
+          databaseName, e.getMessage());
+    }
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+    namespaceExists(namespace);
+    List<TableIdentifier> tableIdentifiers = listTables(namespace);
+    if (tableIdentifiers != null && !tableIdentifiers.isEmpty()) {
+      throw new NamespaceNotEmptyException("Cannot drop namespace %s because it is not empty. " +
+          "The following tables still exist under the namespace: %s", namespace, tableIdentifiers);
+    }
+
+    glue.deleteDatabase(DeleteDatabaseRequest.builder()
+        .catalogId(awsProperties.glueCatalogId())
+        .name(IcebergToGlueConverter.toDatabaseName(namespace))
+        .build());
+    LOG.info("Dropped namespace: {}", namespace);
+    // Always successful, otherwise exception is thrown
+    return true;
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+    Map<String, String> newProperties = Maps.newHashMap();
+    newProperties.putAll(loadNamespaceMetadata(namespace));
+    newProperties.putAll(properties);
+    glue.updateDatabase(UpdateDatabaseRequest.builder()
+        .catalogId(awsProperties.glueCatalogId())
+        .name(IcebergToGlueConverter.toDatabaseName(namespace))
+        .databaseInput(IcebergToGlueConverter.toDatabaseInput(namespace, newProperties))
+        .build());
+    LOG.debug("Successfully set properties {} for {}", properties.keySet(), namespace);
+    // Always successful, otherwise exception is thrown
+    return true;
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+    Map<String, String> metadata = Maps.newHashMap(loadNamespaceMetadata(namespace));
+    for (String property : properties) {
+      metadata.remove(property);
+    }
+
+    glue.updateDatabase(UpdateDatabaseRequest.builder()
+        .catalogId(awsProperties.glueCatalogId())
+        .name(IcebergToGlueConverter.toDatabaseName(namespace))
+        .databaseInput(IcebergToGlueConverter.toDatabaseInput(namespace, metadata))
+        .build());
+    LOG.debug("Successfully removed properties {} from {}", properties, namespace);
+    // Always successful, otherwise exception is thrown
+    return true;
+  }
+
+  @Override
+  protected boolean isValidIdentifier(TableIdentifier tableIdentifier) {
+    return IcebergToGlueConverter.isValidNamespace(tableIdentifier.namespace()) &&
+        IcebergToGlueConverter.isValidTableName(tableIdentifier.name());
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  public void close() throws IOException {
+    glue.close();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.hadoopConf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return hadoopConf;
+  }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
new file mode 100644
index 0000000..92f0d55
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.ConcurrentModificationException;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+
+class GlueTableOperations extends BaseMetastoreTableOperations {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GlueTableOperations.class);
+
+  // same as org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE
+  // more details: https://docs.aws.amazon.com/glue/latest/webapi/API_TableInput.html
+  private static final String GLUE_EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE";
+
+  private final GlueClient glue;
+  private final AwsProperties awsProperties;
+  private final String databaseName;
+  private final String tableName;
+  private final String fullTableName;
+  private final FileIO fileIO;
+
+  GlueTableOperations(GlueClient glue, String catalogName, AwsProperties awsProperties,
+                      FileIO fileIO, TableIdentifier tableIdentifier) {
+    this.glue = glue;
+    this.awsProperties = awsProperties;
+    this.databaseName = IcebergToGlueConverter.getDatabaseName(tableIdentifier);
+    this.tableName = IcebergToGlueConverter.getTableName(tableIdentifier);
+    this.fullTableName = String.format("%s.%s.%s", catalogName, databaseName, tableName);
+    this.fileIO = fileIO;
+  }
+
+  @Override
+  public FileIO io() {
+    return fileIO;
+  }
+
+  @Override
+  protected String tableName() {
+    return fullTableName;
+  }
+
+  @Override
+  protected void doRefresh() {
+    String metadataLocation = null;
+    Table table = getGlueTable();
+    if (table != null) {
+      GlueToIcebergConverter.validateTable(table, tableName());
+      metadataLocation = table.parameters().get(METADATA_LOCATION_PROP);
+    } else {
+      if (currentMetadataLocation() != null) {
+        throw new NoSuchTableException("Cannot find Glue table %s after refresh, " +
+            "maybe another process deleted it or revoked your access permission", tableName());
+      }
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+    boolean exceptionThrown = true;
+    Table glueTable = getGlueTable();
+    checkMetadataLocation(glueTable, base);
+    Map<String, String> properties = prepareProperties(glueTable, newMetadataLocation);
+    try {
+      persistGlueTable(glueTable, properties);
+      exceptionThrown = false;
+    } catch (ConcurrentModificationException e) {
+      throw new CommitFailedException(e, "Cannot commit %s because Glue detected concurrent update", tableName());
+    } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) {
+      throw new AlreadyExistsException(e,
+          "Cannot commit %s because its Glue table already exists when trying to create one", tableName());
+    } catch (SdkException e) {
+      throw new CommitFailedException(e, "Cannot commit %s because unexpected exception contacting AWS", tableName());
+    } finally {
+      if (exceptionThrown) {
+        io().deleteFile(newMetadataLocation);
+      }
+    }
+  }
+
+  private void checkMetadataLocation(Table glueTable, TableMetadata base) {
+    String glueMetadataLocation = glueTable != null ? glueTable.parameters().get(METADATA_LOCATION_PROP) : null;
+    String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
+    if (!Objects.equals(baseMetadataLocation, glueMetadataLocation)) {
+      throw new CommitFailedException(
+          "Cannot commit %s because base metadata location '%s' is not same as the current Glue location '%s'",
+          tableName(), baseMetadataLocation, glueMetadataLocation);
+    }
+  }
+
+  private Table getGlueTable() {
+    try {
+      GetTableResponse response = glue.getTable(GetTableRequest.builder()
+          .catalogId(awsProperties.glueCatalogId())
+          .databaseName(databaseName)
+          .name(tableName)
+          .build());
+      return response.table();
+    } catch (EntityNotFoundException e) {
+      return null;
+    }
+  }
+
+  private Map<String, String> prepareProperties(Table glueTable, String newMetadataLocation) {
+    Map<String, String> properties = glueTable != null ? Maps.newHashMap(glueTable.parameters()) : Maps.newHashMap();
+    properties.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
+    properties.put(METADATA_LOCATION_PROP, newMetadataLocation);
+    if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) {
+      properties.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
+    }
+
+    return properties;
+  }
+
+  private void persistGlueTable(Table glueTable, Map<String, String> parameters) {
+    if (glueTable != null) {
+      LOG.debug("Committing existing Glue table: {}", tableName());
+      glue.updateTable(UpdateTableRequest.builder()
+          .catalogId(awsProperties.glueCatalogId())
+          .databaseName(databaseName)
+          .skipArchive(awsProperties.glueCatalogSkipArchive())
+          .tableInput(TableInput.builder()
+              .name(tableName)
+              .parameters(parameters)
+              .build())
+          .build());
+    } else {
+      LOG.debug("Committing new Glue table: {}", tableName());
+      glue.createTable(CreateTableRequest.builder()
+          .catalogId(awsProperties.glueCatalogId())
+          .databaseName(databaseName)
+          .tableInput(TableInput.builder()
+              .name(tableName)
+              .tableType(GLUE_EXTERNAL_TABLE_TYPE)
+              .parameters(parameters)
+              .build())
+          .build());
+    }
+  }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueToIcebergConverter.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueToIcebergConverter.java
new file mode 100644
index 0000000..5078ba3
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueToIcebergConverter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ValidationException;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.Table;
+
+class GlueToIcebergConverter {
+
+  private GlueToIcebergConverter() {
+  }
+
+  static Namespace toNamespace(Database database) {
+    return Namespace.of(database.name());
+  }
+
+  static TableIdentifier toTableId(Table table) {
+    return TableIdentifier.of(table.databaseName(), table.name());
+  }
+
+  /**
+   * Validate the Glue table is Iceberg table by checking its parameters
+   * @param table glue table
+   * @param fullName full table name for logging
+   */
+  static void validateTable(Table table, String fullName) {
+    String tableType = table.parameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
+    ValidationException.check(tableType != null && tableType.equalsIgnoreCase(
+        BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE),
+        "Input Glue table is not an iceberg table: %s (type=%s)", fullName, tableType);
+  }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java b/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java
new file mode 100644
index 0000000..ecb94b9
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+
+class IcebergToGlueConverter {
+
+  private IcebergToGlueConverter() {
+  }
+
+  private static final Pattern GLUE_DB_PATTERN = Pattern.compile("^[a-z0-9_]{1,252}$");
+  private static final Pattern GLUE_TABLE_PATTERN = Pattern.compile("^[a-z0-9_]{1,255}$");
+
+  /**
+   * A Glue database name cannot be longer than 252 characters.
+   * The only acceptable characters are lowercase letters, numbers, and the underscore character.
+   * More details: https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html
+   * @param namespace namespace
+   * @return if namespace can be accepted by Glue
+   */
+  static boolean isValidNamespace(Namespace namespace) {
+    if (namespace.levels().length != 1) {
+      return false;
+    }
+    String dbName = namespace.level(0);
+    return dbName != null && GLUE_DB_PATTERN.matcher(dbName).find();
+  }
+
+  /**
+   * Validate if an Iceberg namespace is valid in Glue
+   * @param namespace namespace
+   * @throws NoSuchNamespaceException if namespace is not valid in Glue
+   */
+  static void validateNamespace(Namespace namespace) {
+    ValidationException.check(isValidNamespace(namespace), "Cannot convert namespace %s to Glue database name, " +
+        "because it must be 1-252 chars of lowercase letters, numbers, underscore", namespace);
+  }
+
+  /**
+   * Validate and convert Iceberg namespace to Glue database name
+   * @param namespace Iceberg namespace
+   * @return database name
+   */
+  static String toDatabaseName(Namespace namespace) {
+    validateNamespace(namespace);
+    return namespace.level(0);
+  }
+
+  /**
+   * Validate and get Glue database name from Iceberg TableIdentifier
+   * @param tableIdentifier Iceberg table identifier
+   * @return database name
+   */
+  static String getDatabaseName(TableIdentifier tableIdentifier) {
+    return toDatabaseName(tableIdentifier.namespace());
+  }
+
+  /**
+   * Validate and convert Iceberg name to Glue DatabaseInput
+   * @param namespace Iceberg namespace
+   * @param metadata metadata map
+   * @return Glue DatabaseInput
+   */
+  static DatabaseInput toDatabaseInput(Namespace namespace, Map<String, String> metadata) {
+    return DatabaseInput.builder()
+        .name(toDatabaseName(namespace))
+        .parameters(metadata)
+        .build();
+  }
+
+  /**
+   * A Glue table name cannot be longer than 255 characters.
+   * The only acceptable characters are lowercase letters, numbers, and the underscore character.
+   * More details: https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html
+   * @param tableName table name
+   * @return if a table name can be accepted by Glue
+   */
+  static boolean isValidTableName(String tableName) {
+    return tableName != null && GLUE_TABLE_PATTERN.matcher(tableName).find();
+  }
+
+  /**
+   * Validate if a table name is valid in Glue
+   * @param tableName table name
+   * @throws NoSuchTableException if table name not valid in Glue
+   */
+  static void validateTableName(String tableName) {
+    ValidationException.check(isValidTableName(tableName), "Cannot use %s as Glue table name, " +
+        "because it must be 1-255 chars of lowercase letters, numbers, underscore", tableName);
+  }
+
+  /**
+   * Validate and get Glue table name from Iceberg TableIdentifier
+   * @param tableIdentifier table identifier
+   * @return table name
+   */
+  static String getTableName(TableIdentifier tableIdentifier) {
+    validateTableName(tableIdentifier.name());
+    return tableIdentifier.name();
+  }
+
+  /**
+   * Validate Iceberg TableIdentifier is valid in Glue
+   * @param tableIdentifier Iceberg table identifier
+   */
+  static void validateTableIdentifier(TableIdentifier tableIdentifier) {
+    validateNamespace(tableIdentifier.namespace());
+    validateTableName(tableIdentifier.name());
+  }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java
new file mode 100644
index 0000000..a80b168
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+
+public class GlueCatalogTest {
+
+  private static final String WAREHOUSE_PATH = "s3://bucket";
+  private static final String CATALOG_NAME = "glue";
+  private GlueClient glue;
+  private GlueCatalog glueCatalog;
+
+  @Before
+  public void before() {
+    glue = Mockito.mock(GlueClient.class);
+    glueCatalog = new GlueCatalog(glue);
+    glueCatalog.initialize(CATALOG_NAME, WAREHOUSE_PATH, new AwsProperties(), null);
+  }
+
+  @Test
+  public void constructor_emptyWarehousePath() {
+    AssertHelpers.assertThrows("warehouse path cannot be null",
+        IllegalArgumentException.class,
+        "Cannot initialize GlueCatalog because warehousePath must not be null",
+        () -> {
+            GlueCatalog catalog = new GlueCatalog(glue);
+            catalog.initialize(CATALOG_NAME, null, new AwsProperties(), null);
+        });
+  }
+
+  @Test
+  public void constructor_warehousePathWithEndSlash() {
+    GlueCatalog catalogWithSlash = new GlueCatalog(glue);
+    catalogWithSlash.initialize(CATALOG_NAME, WAREHOUSE_PATH + "/", new AwsProperties(), null);
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    String location = catalogWithSlash.defaultWarehouseLocation(TableIdentifier.of("db", "table"));
+    Assert.assertEquals(WAREHOUSE_PATH + "/db.db/table", location);
+  }
+
+  @Test
+  public void defaultWarehouseLocation_noDbUri() {
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    String location = glueCatalog.defaultWarehouseLocation(TableIdentifier.of("db", "table"));
+    Assert.assertEquals(WAREHOUSE_PATH + "/db.db/table", location);
+  }
+
+  @Test
+  public void defaultWarehouseLocation_dbUri() {
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db").locationUri("s3://bucket2/db").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    String location = glueCatalog.defaultWarehouseLocation(TableIdentifier.of("db", "table"));
+    Assert.assertEquals("s3://bucket2/db/table", location);
+  }
+
+  @Test
+  public void listTables() {
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Mockito.doReturn(GetTablesResponse.builder()
+        .tableList(
+            Table.builder().databaseName("db1").name("t1").build(),
+            Table.builder().databaseName("db1").name("t2").build()
+        ).build())
+        .when(glue).getTables(Mockito.any(GetTablesRequest.class));
+    Assert.assertEquals(
+        Lists.newArrayList(
+            TableIdentifier.of("db1", "t1"),
+            TableIdentifier.of("db1", "t2")
+        ),
+        glueCatalog.listTables(Namespace.of("db1"))
+    );
+  }
+
+  @Test
+  public void listTables_pagination() {
+    AtomicInteger counter = new AtomicInteger(10);
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Mockito.doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (counter.decrementAndGet() > 0) {
+          return GetTablesResponse.builder()
+              .tableList(
+                  Table.builder().databaseName("db1").name(
+                      UUID.randomUUID().toString().replace("-", "")).build()
+              )
+              .nextToken("token")
+              .build();
+        } else {
+          return GetTablesResponse.builder()
+              .tableList(Table.builder().databaseName("db1").name("tb1").build())
+              .build();
+        }
+      }
+    }).when(glue).getTables(Mockito.any(GetTablesRequest.class));
+    Assert.assertEquals(10, glueCatalog.listTables(Namespace.of("db1")).size());
+  }
+
+  @Test
+  public void dropTable() {
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+        BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE);
+    Mockito.doReturn(GetTableResponse.builder()
+        .table(Table.builder().databaseName("db1").name("t1").parameters(properties).build()).build())
+        .when(glue).getTable(Mockito.any(GetTableRequest.class));
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Mockito.doReturn(DeleteTableResponse.builder().build())
+        .when(glue).deleteTable(Mockito.any(DeleteTableRequest.class));
+    glueCatalog.dropTable(TableIdentifier.of("db1", "t1"));
+  }
+
+  @Test
+  public void renameTable() {
+    AtomicInteger counter = new AtomicInteger(1);
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+        BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE);
+    Mockito.doReturn(GetTableResponse.builder()
+        .table(Table.builder().databaseName("db1").name("t1").parameters(properties).build()).build())
+        .when(glue).getTable(Mockito.any(GetTableRequest.class));
+    Mockito.doReturn(GetTablesResponse.builder().build())
+        .when(glue).getTables(Mockito.any(GetTablesRequest.class));
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Mockito.doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        counter.decrementAndGet();
+        return DeleteTableResponse.builder().build();
+      }
+    }).when(glue).deleteTable(Mockito.any(DeleteTableRequest.class));
+    glueCatalog.dropTable(TableIdentifier.of("db1", "t1"));
+    Assert.assertEquals(0, counter.get());
+  }
+
+  @Test
+  public void createNamespace() {
+    Mockito.doReturn(CreateDatabaseResponse.builder().build())
+        .when(glue).createDatabase(Mockito.any(CreateDatabaseRequest.class));
+    glueCatalog.createNamespace(Namespace.of("db"));
+  }
+
+  @Test
+  public void createNamespace_badName() {
+    Mockito.doReturn(CreateDatabaseResponse.builder().build())
+        .when(glue).createDatabase(Mockito.any(CreateDatabaseRequest.class));
+    List<Namespace> invalidNamespaces = Lists.newArrayList(
+        Namespace.of("db-1"),
+        Namespace.of("db", "db2")
+    );
+
+    for (Namespace namespace : invalidNamespaces) {
+      AssertHelpers.assertThrows("should not create namespace with invalid or nested names",
+          ValidationException.class,
+          "Cannot convert namespace",
+          () -> glueCatalog.createNamespace(namespace));
+    }
+  }
+
+  @Test
+  public void listNamespaces_all() {
+    Mockito.doReturn(GetDatabasesResponse.builder()
+        .databaseList(
+            Database.builder().name("db1").build(),
+            Database.builder().name("db2").build()
+        ).build())
+        .when(glue).getDatabases(Mockito.any(GetDatabasesRequest.class));
+    Assert.assertEquals(
+        Lists.newArrayList(
+            Namespace.of("db1"),
+            Namespace.of("db2")
+        ),
+        glueCatalog.listNamespaces()
+    );
+  }
+
+  @Test
+  public void listNamespaces_pagination() {
+    AtomicInteger counter = new AtomicInteger(10);
+    Mockito.doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        if (counter.decrementAndGet() > 0) {
+          return GetDatabasesResponse.builder()
+              .databaseList(
+                  Database.builder().name(UUID.randomUUID().toString().replace("-", "")).build()
+              )
+              .nextToken("token")
+              .build();
+        } else {
+          return GetDatabasesResponse.builder()
+              .databaseList(Database.builder().name("db").build())
+              .build();
+        }
+      }
+    }).when(glue).getDatabases(Mockito.any(GetDatabasesRequest.class));
+    Assert.assertEquals(10, glueCatalog.listNamespaces().size());
+  }
+
+  @Test
+  public void listNamespaces_self() {
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Assert.assertEquals(
+        "list self should return empty list",
+        Lists.newArrayList(),
+        glueCatalog.listNamespaces(Namespace.of("db1"))
+    );
+  }
+
+  @Test
+  public void listNamespaces_selfInvalid() {
+    AssertHelpers.assertThrows("table name invalid",
+        ValidationException.class,
+        "Cannot convert namespace",
+        () -> glueCatalog.listNamespaces(Namespace.of("db-1")));
+  }
+
+  @Test
+  public void loadNamespaceMetadata() {
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("key", "val");
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1")
+            .parameters(parameters)
+            .build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Assert.assertEquals(parameters, glueCatalog.loadNamespaceMetadata(Namespace.of("db1")));
+  }
+
+  @Test
+  public void dropNamespace() {
+    Mockito.doReturn(GetTablesResponse.builder().build())
+        .when(glue).getTables(Mockito.any(GetTablesRequest.class));
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Mockito.doReturn(DeleteDatabaseResponse.builder().build())
+        .when(glue).deleteDatabase(Mockito.any(DeleteDatabaseRequest.class));
+    glueCatalog.dropNamespace(Namespace.of("db1"));
+  }
+
+  @Test
+  public void dropNamespace_notEmpty() {
+    Mockito.doReturn(GetTablesResponse.builder()
+        .tableList(
+            Table.builder().databaseName("db1").name("t1").build(),
+            Table.builder().databaseName("db1").name("t2").build()
+        ).build())
+        .when(glue).getTables(Mockito.any(GetTablesRequest.class));
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1").build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Mockito.doReturn(DeleteDatabaseResponse.builder().build())
+        .when(glue).deleteDatabase(Mockito.any(DeleteDatabaseRequest.class));
+    AssertHelpers.assertThrows("namespace should not be dropped when still has table",
+        NamespaceNotEmptyException.class,
+        "Cannot drop namespace",
+        () -> glueCatalog.dropNamespace(Namespace.of("db1")));
+  }
+
+  @Test
+  public void setProperties() {
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("key", "val");
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1")
+            .parameters(parameters)
+            .build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Mockito.doReturn(UpdateDatabaseResponse.builder().build())
+        .when(glue).updateDatabase(Mockito.any(UpdateDatabaseRequest.class));
+    glueCatalog.setProperties(Namespace.of("db1"), parameters);
+  }
+
+  @Test
+  public void removeProperties() {
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("key", "val");
+    Mockito.doReturn(GetDatabaseResponse.builder()
+        .database(Database.builder().name("db1")
+            .parameters(parameters)
+            .build()).build())
+        .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+    Mockito.doReturn(UpdateDatabaseResponse.builder().build())
+        .when(glue).updateDatabase(Mockito.any(UpdateDatabaseRequest.class));
+    glueCatalog.removeProperties(Namespace.of("db1"), Sets.newHashSet("key"));
+  }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueToIcebergConverterTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueToIcebergConverterTest.java
new file mode 100644
index 0000000..88a9eee
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueToIcebergConverterTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.Table;
+
+public class GlueToIcebergConverterTest {
+
+  @Test
+  public void toNamespace() {
+    Database database = Database.builder()
+        .name("db")
+        .build();
+    Namespace namespace = Namespace.of("db");
+    Assert.assertEquals(namespace, GlueToIcebergConverter.toNamespace(database));
+  }
+
+  @Test
+  public void toTableId() {
+    Table table = Table.builder()
+        .databaseName("db")
+        .name("name")
+        .build();
+    TableIdentifier icebergId = TableIdentifier.of("db", "name");
+    Assert.assertEquals(icebergId, GlueToIcebergConverter.toTableId(table));
+  }
+
+  @Test
+  public void validateTable() {
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+        BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE);
+    Table table = Table.builder()
+        .parameters(properties)
+        .build();
+    GlueToIcebergConverter.validateTable(table, "name");
+  }
+
+  @Test
+  public void validateTable_icebergPropertyNotFound() {
+    Map<String, String> properties = new HashMap<>();
+    Table table = Table.builder()
+        .parameters(properties)
+        .build();
+    AssertHelpers.assertThrows("Iceberg property not found",
+        ValidationException.class,
+        "Input Glue table is not an iceberg table",
+        () -> GlueToIcebergConverter.validateTable(table, "name")
+    );
+  }
+
+  @Test
+  public void validateTable_icebergPropertyValueWrong() {
+    Map<String, String> properties = new HashMap<>();
+    properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, "other");
+    Table table = Table.builder()
+        .parameters(properties)
+        .build();
+    AssertHelpers.assertThrows("Iceberg property value wrong",
+        ValidationException.class,
+        "Input Glue table is not an iceberg table",
+        () -> GlueToIcebergConverter.validateTable(table, "name")
+    );
+  }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/IcebergToGlueConverterTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/IcebergToGlueConverterTest.java
new file mode 100644
index 0000000..d4c20b7
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/glue/IcebergToGlueConverterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+
+public class IcebergToGlueConverterTest {
+
+  @Test
+  public void toDatabaseName() {
+    Assert.assertEquals("db", IcebergToGlueConverter.toDatabaseName(Namespace.of("db")));
+  }
+
+  @Test
+  public void toDatabaseName_fail() {
+    List<Namespace> badNames = Lists.newArrayList(
+        Namespace.of("db", "a"),
+        Namespace.of("db-1"),
+        Namespace.empty(),
+        Namespace.of(""),
+        Namespace.of(new String(new char[600]).replace("\0", "a")));
+    for (Namespace name : badNames) {
+      AssertHelpers.assertThrows("bad namespace name",
+          ValidationException.class,
+          "Cannot convert namespace",
+          () -> IcebergToGlueConverter.toDatabaseName(name)
+      );
+    }
+  }
+
+  @Test
+  public void toDatabaseInput() {
+    Map<String, String> param = new HashMap<>();
+    DatabaseInput input = DatabaseInput.builder()
+        .name("db")
+        .parameters(param)
+        .build();
+    Namespace namespace = Namespace.of("db");
+    Assert.assertEquals(input, IcebergToGlueConverter.toDatabaseInput(namespace, param));
+  }
+}
diff --git a/build.gradle b/build.gradle
index 7df7c9f..a385685 100644
--- a/build.gradle
+++ b/build.gradle
@@ -252,7 +252,16 @@ project(':iceberg-aws') {
     compile 'software.amazon.awssdk:url-connection-client'
     compile 'software.amazon.awssdk:s3'
     compile 'software.amazon.awssdk:kms'
+    compile 'software.amazon.awssdk:glue'
 
+    compileOnly("org.apache.hadoop:hadoop-common") {
+      exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+      exclude group: 'javax.servlet', module: 'servlet-api'
+      exclude group: 'com.google.code.gson', module: 'gson'
+    }
+
+    testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
     testCompile("com.adobe.testing:s3mock-junit4") {
       exclude module: "spring-boot-starter-logging"
       exclude module: "logback-classic"