You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/11/19 18:23:55 UTC
[iceberg] branch master updated: AWS: Add Glue catalog and table
operations (#1633)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 0beb7d6 AWS: Add Glue catalog and table operations (#1633)
0beb7d6 is described below
commit 0beb7d663694d78e6edecb241a008e9834c82d8c
Author: jackye1995 <yz...@amazon.com>
AuthorDate: Thu Nov 19 10:23:35 2020 -0800
AWS: Add Glue catalog and table operations (#1633)
---
.../org/apache/iceberg/aws/AwsIntegTestUtil.java | 18 +
.../iceberg/aws/glue/GlueCatalogNamespaceTest.java | 143 ++++++++
.../iceberg/aws/glue/GlueCatalogTableTest.java | 282 ++++++++++++++
.../org/apache/iceberg/aws/glue/GlueTestBase.java | 102 ++++++
.../java/org/apache/iceberg/aws/AwsClientUtil.java | 7 +
.../java/org/apache/iceberg/aws/AwsProperties.java | 43 +++
.../org/apache/iceberg/aws/glue/GlueCatalog.java | 403 +++++++++++++++++++++
.../iceberg/aws/glue/GlueTableOperations.java | 182 ++++++++++
.../iceberg/aws/glue/GlueToIcebergConverter.java | 53 +++
.../iceberg/aws/glue/IcebergToGlueConverter.java | 135 +++++++
.../apache/iceberg/aws/glue/GlueCatalogTest.java | 358 ++++++++++++++++++
.../aws/glue/GlueToIcebergConverterTest.java | 92 +++++
.../aws/glue/IcebergToGlueConverterTest.java | 67 ++++
build.gradle | 9 +
14 files changed, 1894 insertions(+)
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
index a71804c..d8faaf8 100644
--- a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
+++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
@@ -19,7 +19,12 @@
package org.apache.iceberg.aws;
+import java.util.List;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
@@ -29,6 +34,8 @@ import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
public class AwsIntegTestUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(AwsIntegTestUtil.class);
+
private AwsIntegTestUtil() {
}
@@ -55,4 +62,15 @@ public class AwsIntegTestUtil {
}
}
}
+
+ public static void cleanGlueCatalog(GlueClient glue, List<String> namespaces) {
+ for (String namespace : namespaces) {
+ try {
+ // delete db also delete tables
+ glue.deleteDatabase(DeleteDatabaseRequest.builder().name(namespace).build());
+ } catch (Exception e) {
+ LOG.error("Cannot delete namespace {}", namespace, e);
+ }
+ }
+ }
}
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java
new file mode 100644
index 0000000..9dc3ce5
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogNamespaceTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+
+public class GlueCatalogNamespaceTest extends GlueTestBase {
+
+ @Test
+ public void testCreateNamespace() {
+ String namespace = getRandomName();
+ namespaces.add(namespace);
+ AssertHelpers.assertThrows("namespace does not exist before create",
+ EntityNotFoundException.class,
+ "not found",
+ () -> glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()));
+ glueCatalog.createNamespace(Namespace.of(namespace));
+ Database database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database();
+ Assert.assertEquals("namespace must equal database name", namespace, database.name());
+ }
+
+ @Test
+ public void testCreateDuplicate() {
+ String namespace = createNamespace();
+ AssertHelpers.assertThrows("should not create namespace with the same name",
+ AlreadyExistsException.class,
+ "it already exists in Glue",
+ () -> glueCatalog.createNamespace(Namespace.of(namespace)));
+ }
+
+ @Test
+ public void testCreateBadName() {
+ List<Namespace> invalidNamespaces = Lists.newArrayList(
+ Namespace.of("db-1"),
+ Namespace.of("db", "db2")
+ );
+
+ for (Namespace namespace : invalidNamespaces) {
+ AssertHelpers.assertThrows("should not create namespace with invalid or nested names",
+ ValidationException.class,
+ "Cannot convert namespace",
+ () -> glueCatalog.createNamespace(namespace));
+ }
+ }
+
+ @Test
+ public void testNamespaceExists() {
+ String namespace = createNamespace();
+ Assert.assertTrue(glueCatalog.namespaceExists(Namespace.of(namespace)));
+ }
+
+ @Test
+ public void testListNamespace() {
+ String namespace = createNamespace();
+ List<Namespace> namespaceList = glueCatalog.listNamespaces();
+ Assert.assertTrue(namespaceList.size() > 0);
+ Assert.assertTrue(namespaceList.contains(Namespace.of(namespace)));
+ namespaceList = glueCatalog.listNamespaces(Namespace.of(namespace));
+ Assert.assertTrue(namespaceList.isEmpty());
+ }
+
+ @Test
+ public void testNamespaceProperties() {
+ String namespace = createNamespace();
+ // set properties
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("key", "val");
+ properties.put("key2", "val2");
+ glueCatalog.setProperties(Namespace.of(namespace), properties);
+ Database database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database();
+ Assert.assertTrue(database.parameters().containsKey("key"));
+ Assert.assertEquals("val", database.parameters().get("key"));
+ Assert.assertTrue(database.parameters().containsKey("key2"));
+ Assert.assertEquals("val2", database.parameters().get("key2"));
+ // remove properties
+ glueCatalog.removeProperties(Namespace.of(namespace), Sets.newHashSet("key"));
+ database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database();
+ Assert.assertFalse(database.parameters().containsKey("key"));
+ Assert.assertTrue(database.parameters().containsKey("key2"));
+ Assert.assertEquals("val2", database.parameters().get("key2"));
+ // add back
+ properties = Maps.newHashMap();
+ properties.put("key", "val");
+ glueCatalog.setProperties(Namespace.of(namespace), properties);
+ database = glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()).database();
+ Assert.assertTrue(database.parameters().containsKey("key"));
+ Assert.assertEquals("val", database.parameters().get("key"));
+ Assert.assertTrue(database.parameters().containsKey("key2"));
+ Assert.assertEquals("val2", database.parameters().get("key2"));
+ }
+
+ @Test
+ public void testDropNamespace() {
+ String namespace = createNamespace();
+ glueCatalog.dropNamespace(Namespace.of(namespace));
+ AssertHelpers.assertThrows("namespace should not exist after deletion",
+ EntityNotFoundException.class,
+ "not found",
+ () -> glue.getDatabase(GetDatabaseRequest.builder().name(namespace).build()));
+ }
+
+ @Test
+ public void testDropNamespaceNonEmpty() {
+ String namespace = createNamespace();
+ createTable(namespace);
+ AssertHelpers.assertThrows("namespace should not be dropped when still has table",
+ NamespaceNotEmptyException.class,
+ "it is not empty",
+ () -> glueCatalog.dropNamespace(Namespace.of(namespace)));
+ }
+
+}
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogTableTest.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogTableTest.java
new file mode 100644
index 0000000..3de6703
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueCatalogTableTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTableVersionsRequest;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+public class GlueCatalogTableTest extends GlueTestBase {
+
+ @Test
+ public void testCreateTable() {
+ String namespace = createNamespace();
+ String tableName = getRandomName();
+ glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec);
+ // verify table exists in Glue
+ GetTableResponse response = glue.getTable(GetTableRequest.builder()
+ .databaseName(namespace).name(tableName).build());
+ Assert.assertEquals(namespace, response.table().databaseName());
+ Assert.assertEquals(tableName, response.table().name());
+ Assert.assertEquals(BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH),
+ response.table().parameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP));
+ Assert.assertTrue(response.table().parameters().containsKey(BaseMetastoreTableOperations.METADATA_LOCATION_PROP));
+ // verify metadata file exists in S3
+ String metaLocation = response.table().parameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+ String key = metaLocation.split(testBucketName, -1)[1].substring(1);
+ s3.headObject(HeadObjectRequest.builder().bucket(testBucketName).key(key).build());
+ Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+ Assert.assertEquals(partitionSpec, table.spec());
+ Assert.assertEquals(schema.toString(), table.schema().toString());
+ }
+
+ @Test
+ public void testCreateTableDuplicate() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ AssertHelpers.assertThrows("should not create table with the same name",
+ AlreadyExistsException.class,
+ "Table already exists",
+ () -> glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec));
+ }
+
+ @Test
+ public void testCreateTableBadName() {
+ String namespace = createNamespace();
+ AssertHelpers.assertThrows("should not create table with bad name",
+ IllegalArgumentException.class,
+ "Invalid table identifier",
+ () -> glueCatalog.createTable(TableIdentifier.of(namespace, "table-1"), schema, partitionSpec));
+ }
+
+ @Test
+ public void testListTables() {
+ String namespace = createNamespace();
+ Assert.assertTrue("list namespace should have nothing before table creation",
+ glueCatalog.listTables(Namespace.of(namespace)).isEmpty());
+ String tableName = createTable(namespace);
+ List<TableIdentifier> tables = glueCatalog.listTables(Namespace.of(namespace));
+ Assert.assertEquals(1, tables.size());
+ Assert.assertEquals(TableIdentifier.of(namespace, tableName), tables.get(0));
+ }
+
+ @Test
+ public void testTableExists() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ Assert.assertTrue(glueCatalog.tableExists(TableIdentifier.of(namespace, tableName)));
+ }
+
+ @Test
+ public void testUpdateTable() {
+ String namespace = createNamespace();
+ String tableName = getRandomName();
+ // current should be null
+ TableOperations ops = glueCatalog.newTableOps(TableIdentifier.of(namespace, tableName));
+ TableMetadata current = ops.current();
+ Assert.assertNull(current);
+ // create table, refresh should update
+ createTable(namespace, tableName);
+ current = ops.refresh();
+ Assert.assertEquals(schema.toString(), current.schema().toString());
+ Assert.assertEquals(partitionSpec, current.spec());
+ Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+ Assert.assertTrue("initial table history should be empty", table.history().isEmpty());
+ // commit new version, should create a new snapshot
+ table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+ DataFile dataFile = DataFiles.builder(partitionSpec)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ table.newAppend().appendFile(dataFile).commit();
+ table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+ Assert.assertEquals("commit should create a new table version", 1, table.history().size());
+ }
+
+ @Test
+ public void testRenameTable() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+ // rename table
+ String newTableName = tableName + "_2";
+ glueCatalog.renameTable(TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName));
+ Table renamedTable = glueCatalog.loadTable(TableIdentifier.of(namespace, newTableName));
+ Assert.assertEquals(table.location(), renamedTable.location());
+ Assert.assertEquals(table.schema().toString(), renamedTable.schema().toString());
+ Assert.assertEquals(table.spec(), renamedTable.spec());
+ Assert.assertEquals(table.currentSnapshot(), renamedTable.currentSnapshot());
+ }
+
+ @Test
+ public void testRenameTable_failToCreateNewTable() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ TableIdentifier id = TableIdentifier.of(namespace, tableName);
+ Table table = glueCatalog.loadTable(id);
+ // create a new table in Glue, so that rename to that table will fail
+ String newTableName = tableName + "_2";
+ glue.createTable(CreateTableRequest.builder()
+ .databaseName(namespace)
+ .tableInput(TableInput.builder().name(newTableName).build())
+ .build());
+ AssertHelpers.assertThrows("should fail to rename to an existing table",
+ software.amazon.awssdk.services.glue.model.AlreadyExistsException.class,
+ "Table already exists",
+ () -> glueCatalog.renameTable(
+ TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName)));
+ // old table can still be read with same metadata
+ Table oldTable = glueCatalog.loadTable(id);
+ Assert.assertEquals(table.location(), oldTable.location());
+ Assert.assertEquals(table.schema().toString(), oldTable.schema().toString());
+ Assert.assertEquals(table.spec(), oldTable.spec());
+ Assert.assertEquals(table.currentSnapshot(), oldTable.currentSnapshot());
+ }
+
+ @Test
+ public void testRenameTable_failToDeleteOldTable() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ TableIdentifier id = TableIdentifier.of(namespace, tableName);
+ Table table = glueCatalog.loadTable(id);
+ // delete the old table metadata, so that drop old table will fail
+ String newTableName = tableName + "_2";
+ glue.updateTable(UpdateTableRequest.builder()
+ .databaseName(namespace)
+ .tableInput(TableInput.builder().name(tableName).parameters(Maps.newHashMap()).build())
+ .build());
+ AssertHelpers.assertThrows("should fail to rename",
+ ValidationException.class,
+ "Input Glue table is not an iceberg table",
+ () -> glueCatalog.renameTable(
+ TableIdentifier.of(namespace, tableName), TableIdentifier.of(namespace, newTableName)));
+ AssertHelpers.assertThrows("renamed table should be deleted",
+ EntityNotFoundException.class,
+ "not found",
+ () -> glue.getTable(GetTableRequest.builder().databaseName(namespace).name(newTableName).build()));
+ }
+
+ @Test
+ public void testDeleteTableWithoutPurge() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ glueCatalog.dropTable(TableIdentifier.of(namespace, tableName), false);
+ AssertHelpers.assertThrows("should not have table",
+ NoSuchTableException.class,
+ "Table does not exist",
+ () -> glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)));
+ String warehouseLocation = glueCatalog.defaultWarehouseLocation(TableIdentifier.of(namespace, tableName));
+ String prefix = warehouseLocation.split(testBucketName + "/", -1)[1];
+ ListObjectsV2Response response = s3.listObjectsV2(ListObjectsV2Request.builder()
+ .bucket(testBucketName).prefix(prefix + "/metadata/").build());
+ Assert.assertTrue(response.hasContents());
+ boolean hasMetaFile = false;
+ for (S3Object s3Object : response.contents()) {
+ if (s3Object.key().contains(".json")) {
+ hasMetaFile = true;
+ break;
+ }
+ }
+ Assert.assertTrue("metadata json file exists after delete without purge", hasMetaFile);
+ }
+
+ @Test
+ public void testDeleteTableWithPurge() {
+ String namespace = createNamespace();
+ String tableName = createTable(namespace);
+ glueCatalog.dropTable(TableIdentifier.of(namespace, tableName));
+ AssertHelpers.assertThrows("should not have table",
+ NoSuchTableException.class,
+ "Table does not exist",
+ () -> glueCatalog.loadTable(TableIdentifier.of(namespace, tableName)));
+ String warehouseLocation = glueCatalog.defaultWarehouseLocation(TableIdentifier.of(namespace, tableName));
+ String prefix = warehouseLocation.split(testBucketName + "/", -1)[1];
+ ListObjectsV2Response response = s3.listObjectsV2(ListObjectsV2Request.builder()
+ .bucket(testBucketName).prefix(prefix).build());
+ if (response.hasContents()) {
+ // might have directory markers left
+ for (S3Object s3Object : response.contents()) {
+ Optional<Long> size = s3Object.getValueForField("Size", Long.class);
+ Assert.assertTrue(size.isPresent());
+ Assert.assertEquals(0L, (long) size.get());
+ }
+ }
+ }
+
+ @Test
+ public void testCommitTableSkipArchive() {
+ // create ns
+ String namespace = getRandomName();
+ namespaces.add(namespace);
+ glueCatalog.createNamespace(Namespace.of(namespace));
+ // create table and commit without skip
+ Schema schema = new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), "c1"));
+ PartitionSpec partitionSpec = PartitionSpec.builderFor(schema).build();
+ String tableName = getRandomName();
+ glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec);
+ Table table = glueCatalog.loadTable(TableIdentifier.of(namespace, tableName));
+ DataFile dataFile = DataFiles.builder(partitionSpec)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ table.newAppend().appendFile(dataFile).commit();
+ Assert.assertEquals(2, glue.getTableVersions(GetTableVersionsRequest.builder()
+ .databaseName(namespace).tableName(tableName).build()).tableVersions().size());
+ // create table and commit with skip
+ tableName = getRandomName();
+ glueCatalogWithSkip.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec);
+ table = glueCatalogWithSkip.loadTable(TableIdentifier.of(namespace, tableName));
+ table.newAppend().appendFile(dataFile).commit();
+ Assert.assertEquals("skipArchive should not create new version",
+ 1, glue.getTableVersions(GetTableVersionsRequest.builder()
+ .databaseName(namespace).tableName(tableName).build()).tableVersions().size());
+ }
+}
diff --git a/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
new file mode 100644
index 0000000..b787cb8
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/glue/GlueTestBase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.aws.AwsClientUtil;
+import org.apache.iceberg.aws.AwsIntegTestUtil;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.s3.S3Client;
+
+@SuppressWarnings({"VisibilityModifier", "HideUtilityClassConstructor"})
+public class GlueTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GlueTestBase.class);
+
+ // the integration test requires the following env variables
+ static final String testBucketName = AwsIntegTestUtil.testBucketName();
+
+ static final String catalogName = "glue";
+ static final String testPathPrefix = getRandomName();
+ static final List<String> namespaces = Lists.newArrayList();
+
+ // aws clients
+ static final GlueClient glue = AwsClientUtil.defaultGlueClient();
+ static final S3Client s3 = AwsClientUtil.defaultS3Client();
+
+ // iceberg
+ static GlueCatalog glueCatalog;
+ static GlueCatalog glueCatalogWithSkip;
+
+ static Schema schema = new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), "c1"));
+ static PartitionSpec partitionSpec = PartitionSpec.builderFor(schema).build();
+
+ @BeforeClass
+ public static void beforeClass() {
+ String testBucketPath = "s3://" + testBucketName + "/" + testPathPrefix;
+ S3FileIO fileIO = new S3FileIO();
+ glueCatalog = new GlueCatalog(glue);
+ glueCatalog.initialize(catalogName, testBucketPath, new AwsProperties(), fileIO);
+ AwsProperties properties = new AwsProperties();
+ properties.setGlueCatalogSkipArchive(true);
+ glueCatalogWithSkip = new GlueCatalog(glue);
+ glueCatalogWithSkip.initialize(catalogName, testBucketPath, properties, fileIO);
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ AwsIntegTestUtil.cleanGlueCatalog(glue, namespaces);
+ AwsIntegTestUtil.cleanS3Bucket(s3, testBucketName, testPathPrefix);
+ }
+
+ public static String getRandomName() {
+ return UUID.randomUUID().toString().replace("-", "");
+ }
+
+ public static String createNamespace() {
+ String namespace = getRandomName();
+ namespaces.add(namespace);
+ glueCatalog.createNamespace(Namespace.of(namespace));
+ return namespace;
+ }
+
+ public static String createTable(String namespace) {
+ String tableName = getRandomName();
+ return createTable(namespace, tableName);
+ }
+
+ public static String createTable(String namespace, String tableName) {
+ glueCatalog.createTable(TableIdentifier.of(namespace, tableName), schema, partitionSpec);
+ return tableName;
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java b/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
index e75b3f0..71c7ed0 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.aws;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;
@@ -47,4 +48,10 @@ public class AwsClientUtil {
.httpClient(UrlConnectionHttpClient.create())
.build();
}
+
+ public static GlueClient defaultGlueClient() {
+ return GlueClient.builder()
+ .httpClient(UrlConnectionHttpClient.create())
+ .build();
+ }
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index b9c33df..358a8c9 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.aws;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.util.PropertyUtil;
public class AwsProperties {
@@ -65,14 +66,36 @@ public class AwsProperties {
*/
public static final String S3FILEIO_SSE_MD5 = "s3fileio.sse.md5";
+ /**
+ * The ID of the Glue Data Catalog where the tables reside.
+ * If none is provided, Glue automatically uses the caller's AWS account ID by default.
+ * For more details, see https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html
+ */
+ public static final String GLUE_CATALOG_ID = "gluecatalog.id";
+
+ /**
+ * If Glue should skip archiving an old table version when creating a new version in a commit.
+ * By default Glue archives all old table versions after an UpdateTable call,
+ * but Glue has a default max number of archived table versions (can be increased).
+ * So for streaming use case with lots of commits, it is recommended to set this value to true.
+ */
+ public static final String GLUE_CATALOG_SKIP_ARCHIVE = "gluecatalog.skip-archive";
+ public static final boolean GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT = false;
+
private String s3FileIoSseType;
private String s3FileIoSseKey;
private String s3FileIoSseMd5;
+ private String glueCatalogId;
+ private boolean glueCatalogSkipArchive;
+
public AwsProperties() {
this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
this.s3FileIoSseKey = null;
this.s3FileIoSseMd5 = null;
+
+ this.glueCatalogId = null;
+ this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
}
public AwsProperties(Map<String, String> properties) {
@@ -84,6 +107,10 @@ public class AwsProperties {
Preconditions.checkNotNull(s3FileIoSseKey, "Cannot initialize SSE-C S3FileIO with null encryption key");
Preconditions.checkNotNull(s3FileIoSseMd5, "Cannot initialize SSE-C S3FileIO with null encryption key MD5");
}
+
+ this.glueCatalogId = properties.get(GLUE_CATALOG_ID);
+ this.glueCatalogSkipArchive = PropertyUtil.propertyAsBoolean(properties,
+ AwsProperties.GLUE_CATALOG_SKIP_ARCHIVE, AwsProperties.GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT);
}
public String s3FileIoSseType() {
@@ -109,4 +136,20 @@ public class AwsProperties {
public void setS3FileIoSseMd5(String sseMd5) {
this.s3FileIoSseMd5 = sseMd5;
}
+
+ public String glueCatalogId() {
+ return glueCatalogId;
+ }
+
+ public void setGlueCatalogId(String id) {
+ this.glueCatalogId = id;
+ }
+
+ public boolean glueCatalogSkipArchive() {
+ return glueCatalogSkipArchive;
+ }
+
+ public void setGlueCatalogSkipArchive(boolean skipArchive) {
+ this.glueCatalogSkipArchive = skipArchive;
+ }
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
new file mode 100644
index 0000000..86ce858
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.aws.AwsClientUtil;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.s3.S3FileIO;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.InvalidInputException;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+
+public class GlueCatalog extends BaseMetastoreCatalog implements Closeable, SupportsNamespaces, Configurable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);
+
+ private final GlueClient glue;
+ private Configuration hadoopConf;
+ private String catalogName;
+ private String warehousePath;
+ private AwsProperties awsProperties;
+ private FileIO fileIO;
+
+ /**
+ * No-arg constructor to load the catalog dynamically.
+ * <p>
+ * Only the AWS Glue client is initialized.
+ * Other fields must be initialized by calling {@link GlueCatalog#initialize(String, Map)} later.
+ */
+ public GlueCatalog() {
+ this(AwsClientUtil.defaultGlueClient());
+ }
+
+ @VisibleForTesting
+ GlueCatalog(GlueClient glue) {
+ this.glue = glue;
+ }
+
+ @Override
+ public void initialize(String name, Map<String, String> properties) {
+ String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
+ initialize(
+ name,
+ properties.get(CatalogProperties.WAREHOUSE_LOCATION),
+ new AwsProperties(properties),
+ fileIOImpl == null ? new S3FileIO() : CatalogUtil.loadFileIO(fileIOImpl, properties, hadoopConf));
+ }
+
+ @VisibleForTesting
+ void initialize(String name, String path, AwsProperties properties, FileIO io) {
+ this.catalogName = name;
+ this.awsProperties = properties;
+ this.warehousePath = cleanWarehousePath(path);
+ this.fileIO = io;
+ }
+
+ private String cleanWarehousePath(String path) {
+ Preconditions.checkArgument(path != null && path.length() > 0,
+ "Cannot initialize GlueCatalog because warehousePath must not be null");
+ int len = path.length();
+ if (path.charAt(len - 1) == '/') {
+ return path.substring(0, len - 1);
+ } else {
+ return path;
+ }
+ }
+
+ @Override
+ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+ return new GlueTableOperations(glue, catalogName, awsProperties, fileIO, tableIdentifier);
+ }
+
+ /**
+ * This method produces the same result as using a HiveCatalog.
+ * If databaseUri exists for the Glue database URI, the default location is databaseUri/tableName.
+ * If not, the default location is warehousePath/databaseName.db/tableName
+ * @param tableIdentifier table id
+ * @return default warehouse path
+ */
+ @Override
+ protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
+ // check if value is set in database
+ GetDatabaseResponse response = glue.getDatabase(GetDatabaseRequest.builder()
+ .name(IcebergToGlueConverter.getDatabaseName(tableIdentifier))
+ .build());
+ String dbLocationUri = response.database().locationUri();
+ if (dbLocationUri != null) {
+ return String.format("%s/%s", dbLocationUri, tableIdentifier.name());
+ }
+
+ return String.format(
+ "%s/%s.db/%s",
+ warehousePath,
+ IcebergToGlueConverter.getDatabaseName(tableIdentifier),
+ tableIdentifier.name());
+ }
+
+ @Override
+ public List<TableIdentifier> listTables(Namespace namespace) {
+ namespaceExists(namespace);
+ // should be safe to list all before returning the list, instead of dynamically load the list.
+ String nextToken = null;
+ List<TableIdentifier> results = Lists.newArrayList();
+ do {
+ GetTablesResponse response = glue.getTables(GetTablesRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .databaseName(IcebergToGlueConverter.toDatabaseName(namespace))
+ .nextToken(nextToken)
+ .build());
+ nextToken = response.nextToken();
+ if (response.hasTableList()) {
+ results.addAll(response.tableList().stream()
+ .map(GlueToIcebergConverter::toTableId)
+ .collect(Collectors.toList()));
+ }
+ } while (nextToken != null);
+
+ LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, results);
+ return results;
+ }
+
+ @Override
+ public boolean dropTable(TableIdentifier identifier, boolean purge) {
+ try {
+ TableOperations ops = newTableOps(identifier);
+ TableMetadata lastMetadata = ops.current();
+ glue.deleteTable(DeleteTableRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .databaseName(IcebergToGlueConverter.getDatabaseName(identifier))
+ .name(identifier.name())
+ .build());
+ LOG.info("Successfully dropped table {} from Glue", identifier);
+ if (purge && lastMetadata != null) {
+ CatalogUtil.dropTableData(ops.io(), lastMetadata);
+ LOG.info("Glue table {} data purged", identifier);
+ }
+ LOG.info("Dropped table: {}", identifier);
+ return true;
+ } catch (EntityNotFoundException e) {
+ LOG.error("Cannot drop table {} because table not found or not accessible", identifier, e);
+ return false;
+ } catch (Exception e) {
+ LOG.error("Cannot complete drop table operation for {} due to unexpected exception", identifier, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Rename table in Glue is a drop table and create table.
+ * @param from identifier of the table to rename
+ * @param to new table name
+ */
+ @Override
+ public void renameTable(TableIdentifier from, TableIdentifier to) {
+ // check new namespace exists
+ if (!namespaceExists(to.namespace())) {
+ throw new NoSuchNamespaceException("Cannot rename %s to %s because namespace %s does not exist",
+ from, to, to.namespace());
+ }
+ // keep metadata
+ Table fromTable = null;
+ String fromTableDbName = IcebergToGlueConverter.getDatabaseName(from);
+ String fromTableName = IcebergToGlueConverter.getTableName(from);
+ String toTableDbName = IcebergToGlueConverter.getDatabaseName(to);
+ String toTableName = IcebergToGlueConverter.getTableName(to);
+ try {
+ GetTableResponse response = glue.getTable(GetTableRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .databaseName(fromTableDbName)
+ .name(fromTableName)
+ .build());
+ fromTable = response.table();
+ } catch (EntityNotFoundException e) {
+ throw new NoSuchTableException(e, "Cannot rename %s because the table does not exist in Glue", from);
+ }
+
+ // use the same Glue info to create the new table, pointing to the old metadata
+ TableInput.Builder tableInputBuilder = TableInput.builder()
+ .owner(fromTable.owner())
+ .tableType(fromTable.tableType())
+ .parameters(fromTable.parameters());
+
+ glue.createTable(CreateTableRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .databaseName(toTableDbName)
+ .tableInput(tableInputBuilder.name(toTableName).build())
+ .build());
+ LOG.info("created rename destination table {}", to);
+
+ try {
+ dropTable(from, false);
+ } catch (Exception e) {
+ // rollback, delete renamed table
+ LOG.error("Fail to drop old table {} after renaming to {}, rollback to use the old table", from, to, e);
+ glue.deleteTable(DeleteTableRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .databaseName(toTableDbName)
+ .name(toTableName)
+ .build());
+ throw e;
+ }
+
+ LOG.info("Successfully renamed table from {} to {}", from, to);
+ }
+
+ @Override
+ public void createNamespace(Namespace namespace, Map<String, String> metadata) {
+ try {
+ glue.createDatabase(CreateDatabaseRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .databaseInput(IcebergToGlueConverter.toDatabaseInput(namespace, metadata))
+ .build());
+ LOG.info("Created namespace: {}", namespace);
+ } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) {
+ throw new AlreadyExistsException("Cannot create namespace %s because it already exists in Glue", namespace);
+ }
+ }
+
+ @Override
+ public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
+ if (!namespace.isEmpty()) {
+ // if it is not a list all op, just check if the namespace exists and return empty.
+ if (namespaceExists(namespace)) {
+ return Lists.newArrayList();
+ }
+ throw new NoSuchNamespaceException(
+ "Glue does not support nested namespace, cannot list namespaces under %s", namespace);
+ }
+
+ // should be safe to list all before returning the list, instead of dynamically load the list.
+ String nextToken = null;
+ List<Namespace> results = Lists.newArrayList();
+ do {
+ GetDatabasesResponse response = glue.getDatabases(GetDatabasesRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .nextToken(nextToken)
+ .build());
+ nextToken = response.nextToken();
+ if (response.hasDatabaseList()) {
+ results.addAll(response.databaseList().stream()
+ .map(GlueToIcebergConverter::toNamespace)
+ .collect(Collectors.toList()));
+ }
+ } while (nextToken != null);
+
+ LOG.debug("Listing namespace {} returned namespaces: {}", namespace, results);
+ return results;
+ }
+
+ @Override
+ public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
+ String databaseName = IcebergToGlueConverter.toDatabaseName(namespace);
+ try {
+ GetDatabaseResponse response = glue.getDatabase(GetDatabaseRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .name(databaseName)
+ .build());
+ Map<String, String> result = response.database().parameters();
+ LOG.debug("Loaded metadata for namespace {} found {}", namespace, result);
+ return result;
+ } catch (InvalidInputException e) {
+ throw new NoSuchNamespaceException("invalid input for namespace %s, error message: %s",
+ namespace, e.getMessage());
+ } catch (EntityNotFoundException e) {
+ throw new NoSuchNamespaceException("fail to find Glue database for namespace %s, error message: %s",
+ databaseName, e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
+ namespaceExists(namespace);
+ List<TableIdentifier> tableIdentifiers = listTables(namespace);
+ if (tableIdentifiers != null && !tableIdentifiers.isEmpty()) {
+ throw new NamespaceNotEmptyException("Cannot drop namespace %s because it is not empty. " +
+ "The following tables still exist under the namespace: %s", namespace, tableIdentifiers);
+ }
+
+ glue.deleteDatabase(DeleteDatabaseRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .name(IcebergToGlueConverter.toDatabaseName(namespace))
+ .build());
+ LOG.info("Dropped namespace: {}", namespace);
+ // Always successful, otherwise exception is thrown
+ return true;
+ }
+
+ @Override
+ public boolean setProperties(Namespace namespace, Map<String, String> properties) throws NoSuchNamespaceException {
+ Map<String, String> newProperties = Maps.newHashMap();
+ newProperties.putAll(loadNamespaceMetadata(namespace));
+ newProperties.putAll(properties);
+ glue.updateDatabase(UpdateDatabaseRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .name(IcebergToGlueConverter.toDatabaseName(namespace))
+ .databaseInput(IcebergToGlueConverter.toDatabaseInput(namespace, newProperties))
+ .build());
+ LOG.debug("Successfully set properties {} for {}", properties.keySet(), namespace);
+ // Always successful, otherwise exception is thrown
+ return true;
+ }
+
+ @Override
+ public boolean removeProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException {
+ Map<String, String> metadata = Maps.newHashMap(loadNamespaceMetadata(namespace));
+ for (String property : properties) {
+ metadata.remove(property);
+ }
+
+ glue.updateDatabase(UpdateDatabaseRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .name(IcebergToGlueConverter.toDatabaseName(namespace))
+ .databaseInput(IcebergToGlueConverter.toDatabaseInput(namespace, metadata))
+ .build());
+ LOG.debug("Successfully removed properties {} from {}", properties, namespace);
+ // Always successful, otherwise exception is thrown
+ return true;
+ }
+
+ @Override
+ protected boolean isValidIdentifier(TableIdentifier tableIdentifier) {
+ return IcebergToGlueConverter.isValidNamespace(tableIdentifier.namespace()) &&
+ IcebergToGlueConverter.isValidTableName(tableIdentifier.name());
+ }
+
+ @Override
+ public String name() {
+ return catalogName;
+ }
+
+ @Override
+ public void close() throws IOException {
+ glue.close();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.hadoopConf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return hadoopConf;
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
new file mode 100644
index 0000000..92f0d55
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.ConcurrentModificationException;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+
+class GlueTableOperations extends BaseMetastoreTableOperations {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GlueTableOperations.class);
+
+ // same as org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE
+ // more details: https://docs.aws.amazon.com/glue/latest/webapi/API_TableInput.html
+ private static final String GLUE_EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE";
+
+ private final GlueClient glue;
+ private final AwsProperties awsProperties;
+ private final String databaseName;
+ private final String tableName;
+ private final String fullTableName;
+ private final FileIO fileIO;
+
+ GlueTableOperations(GlueClient glue, String catalogName, AwsProperties awsProperties,
+ FileIO fileIO, TableIdentifier tableIdentifier) {
+ this.glue = glue;
+ this.awsProperties = awsProperties;
+ this.databaseName = IcebergToGlueConverter.getDatabaseName(tableIdentifier);
+ this.tableName = IcebergToGlueConverter.getTableName(tableIdentifier);
+ this.fullTableName = String.format("%s.%s.%s", catalogName, databaseName, tableName);
+ this.fileIO = fileIO;
+ }
+
+ @Override
+ public FileIO io() {
+ return fileIO;
+ }
+
+ @Override
+ protected String tableName() {
+ return fullTableName;
+ }
+
+ @Override
+ protected void doRefresh() {
+ String metadataLocation = null;
+ Table table = getGlueTable();
+ if (table != null) {
+ GlueToIcebergConverter.validateTable(table, tableName());
+ metadataLocation = table.parameters().get(METADATA_LOCATION_PROP);
+ } else {
+ if (currentMetadataLocation() != null) {
+ throw new NoSuchTableException("Cannot find Glue table %s after refresh, " +
+ "maybe another process deleted it or revoked your access permission", tableName());
+ }
+ }
+
+ refreshFromMetadataLocation(metadataLocation);
+ }
+
+ @Override
+ protected void doCommit(TableMetadata base, TableMetadata metadata) {
+ String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+ boolean exceptionThrown = true;
+ Table glueTable = getGlueTable();
+ checkMetadataLocation(glueTable, base);
+ Map<String, String> properties = prepareProperties(glueTable, newMetadataLocation);
+ try {
+ persistGlueTable(glueTable, properties);
+ exceptionThrown = false;
+ } catch (ConcurrentModificationException e) {
+ throw new CommitFailedException(e, "Cannot commit %s because Glue detected concurrent update", tableName());
+ } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException e) {
+ throw new AlreadyExistsException(e,
+ "Cannot commit %s because its Glue table already exists when trying to create one", tableName());
+ } catch (SdkException e) {
+ throw new CommitFailedException(e, "Cannot commit %s because unexpected exception contacting AWS", tableName());
+ } finally {
+ if (exceptionThrown) {
+ io().deleteFile(newMetadataLocation);
+ }
+ }
+ }
+
+ private void checkMetadataLocation(Table glueTable, TableMetadata base) {
+ String glueMetadataLocation = glueTable != null ? glueTable.parameters().get(METADATA_LOCATION_PROP) : null;
+ String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
+ if (!Objects.equals(baseMetadataLocation, glueMetadataLocation)) {
+ throw new CommitFailedException(
+ "Cannot commit %s because base metadata location '%s' is not same as the current Glue location '%s'",
+ tableName(), baseMetadataLocation, glueMetadataLocation);
+ }
+ }
+
+ private Table getGlueTable() {
+ try {
+ GetTableResponse response = glue.getTable(GetTableRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .databaseName(databaseName)
+ .name(tableName)
+ .build());
+ return response.table();
+ } catch (EntityNotFoundException e) {
+ return null;
+ }
+ }
+
+ private Map<String, String> prepareProperties(Table glueTable, String newMetadataLocation) {
+ Map<String, String> properties = glueTable != null ? Maps.newHashMap(glueTable.parameters()) : Maps.newHashMap();
+ properties.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE.toUpperCase(Locale.ENGLISH));
+ properties.put(METADATA_LOCATION_PROP, newMetadataLocation);
+ if (currentMetadataLocation() != null && !currentMetadataLocation().isEmpty()) {
+ properties.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation());
+ }
+
+ return properties;
+ }
+
+ private void persistGlueTable(Table glueTable, Map<String, String> parameters) {
+ if (glueTable != null) {
+ LOG.debug("Committing existing Glue table: {}", tableName());
+ glue.updateTable(UpdateTableRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .databaseName(databaseName)
+ .skipArchive(awsProperties.glueCatalogSkipArchive())
+ .tableInput(TableInput.builder()
+ .name(tableName)
+ .parameters(parameters)
+ .build())
+ .build());
+ } else {
+ LOG.debug("Committing new Glue table: {}", tableName());
+ glue.createTable(CreateTableRequest.builder()
+ .catalogId(awsProperties.glueCatalogId())
+ .databaseName(databaseName)
+ .tableInput(TableInput.builder()
+ .name(tableName)
+ .tableType(GLUE_EXTERNAL_TABLE_TYPE)
+ .parameters(parameters)
+ .build())
+ .build());
+ }
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueToIcebergConverter.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueToIcebergConverter.java
new file mode 100644
index 0000000..5078ba3
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueToIcebergConverter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ValidationException;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.Table;
+
+class GlueToIcebergConverter {
+
+ private GlueToIcebergConverter() {
+ }
+
+ static Namespace toNamespace(Database database) {
+ return Namespace.of(database.name());
+ }
+
+ static TableIdentifier toTableId(Table table) {
+ return TableIdentifier.of(table.databaseName(), table.name());
+ }
+
+ /**
+ * Validate the Glue table is Iceberg table by checking its parameters
+ * @param table glue table
+ * @param fullName full table name for logging
+ */
+ static void validateTable(Table table, String fullName) {
+ String tableType = table.parameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP);
+ ValidationException.check(tableType != null && tableType.equalsIgnoreCase(
+ BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE),
+ "Input Glue table is not an iceberg table: %s (type=%s)", fullName, tableType);
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java b/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java
new file mode 100644
index 0000000..ecb94b9
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/glue/IcebergToGlueConverter.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ValidationException;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+
+class IcebergToGlueConverter {
+
+ private IcebergToGlueConverter() {
+ }
+
+ private static final Pattern GLUE_DB_PATTERN = Pattern.compile("^[a-z0-9_]{1,252}$");
+ private static final Pattern GLUE_TABLE_PATTERN = Pattern.compile("^[a-z0-9_]{1,255}$");
+
+ /**
+ * A Glue database name cannot be longer than 252 characters.
+ * The only acceptable characters are lowercase letters, numbers, and the underscore character.
+ * More details: https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html
+ * @param namespace namespace
+ * @return if namespace can be accepted by Glue
+ */
+ static boolean isValidNamespace(Namespace namespace) {
+ if (namespace.levels().length != 1) {
+ return false;
+ }
+ String dbName = namespace.level(0);
+ return dbName != null && GLUE_DB_PATTERN.matcher(dbName).find();
+ }
+
+ /**
+ * Validate if an Iceberg namespace is valid in Glue
+ * @param namespace namespace
+ * @throws NoSuchNamespaceException if namespace is not valid in Glue
+ */
+ static void validateNamespace(Namespace namespace) {
+ ValidationException.check(isValidNamespace(namespace), "Cannot convert namespace %s to Glue database name, " +
+ "because it must be 1-252 chars of lowercase letters, numbers, underscore", namespace);
+ }
+
+ /**
+ * Validate and convert Iceberg namespace to Glue database name
+ * @param namespace Iceberg namespace
+ * @return database name
+ */
+ static String toDatabaseName(Namespace namespace) {
+ validateNamespace(namespace);
+ return namespace.level(0);
+ }
+
+ /**
+ * Validate and get Glue database name from Iceberg TableIdentifier
+ * @param tableIdentifier Iceberg table identifier
+ * @return database name
+ */
+ static String getDatabaseName(TableIdentifier tableIdentifier) {
+ return toDatabaseName(tableIdentifier.namespace());
+ }
+
+ /**
+ * Validate and convert Iceberg name to Glue DatabaseInput
+ * @param namespace Iceberg namespace
+ * @param metadata metadata map
+ * @return Glue DatabaseInput
+ */
+ static DatabaseInput toDatabaseInput(Namespace namespace, Map<String, String> metadata) {
+ return DatabaseInput.builder()
+ .name(toDatabaseName(namespace))
+ .parameters(metadata)
+ .build();
+ }
+
+ /**
+ * A Glue table name cannot be longer than 255 characters.
+ * The only acceptable characters are lowercase letters, numbers, and the underscore character.
+ * More details: https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html
+ * @param tableName table name
+ * @return if a table name can be accepted by Glue
+ */
+ static boolean isValidTableName(String tableName) {
+ return tableName != null && GLUE_TABLE_PATTERN.matcher(tableName).find();
+ }
+
+ /**
+ * Validate if a table name is valid in Glue
+ * @param tableName table name
+ * @throws NoSuchTableException if table name not valid in Glue
+ */
+ static void validateTableName(String tableName) {
+ ValidationException.check(isValidTableName(tableName), "Cannot use %s as Glue table name, " +
+ "because it must be 1-255 chars of lowercase letters, numbers, underscore", tableName);
+ }
+
+ /**
+ * Validate and get Glue table name from Iceberg TableIdentifier
+ * @param tableIdentifier table identifier
+ * @return table name
+ */
+ static String getTableName(TableIdentifier tableIdentifier) {
+ validateTableName(tableIdentifier.name());
+ return tableIdentifier.name();
+ }
+
+ /**
+ * Validate Iceberg TableIdentifier is valid in Glue
+ * @param tableIdentifier Iceberg table identifier
+ */
+ static void validateTableIdentifier(TableIdentifier tableIdentifier) {
+ validateNamespace(tableIdentifier.namespace());
+ validateTableName(tableIdentifier.name());
+ }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java
new file mode 100644
index 0000000..a80b168
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueCatalogTest.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+
+public class GlueCatalogTest {
+
+ private static final String WAREHOUSE_PATH = "s3://bucket";
+ private static final String CATALOG_NAME = "glue";
+ private GlueClient glue;
+ private GlueCatalog glueCatalog;
+
+ @Before
+ public void before() {
+ glue = Mockito.mock(GlueClient.class);
+ glueCatalog = new GlueCatalog(glue);
+ glueCatalog.initialize(CATALOG_NAME, WAREHOUSE_PATH, new AwsProperties(), null);
+ }
+
+ @Test
+ public void constructor_emptyWarehousePath() {
+ AssertHelpers.assertThrows("warehouse path cannot be null",
+ IllegalArgumentException.class,
+ "Cannot initialize GlueCatalog because warehousePath must not be null",
+ () -> {
+ GlueCatalog catalog = new GlueCatalog(glue);
+ catalog.initialize(CATALOG_NAME, null, new AwsProperties(), null);
+ });
+ }
+
+ @Test
+ public void constructor_warehousePathWithEndSlash() {
+ GlueCatalog catalogWithSlash = new GlueCatalog(glue);
+ catalogWithSlash.initialize(CATALOG_NAME, WAREHOUSE_PATH + "/", new AwsProperties(), null);
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db").build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ String location = catalogWithSlash.defaultWarehouseLocation(TableIdentifier.of("db", "table"));
+ Assert.assertEquals(WAREHOUSE_PATH + "/db.db/table", location);
+ }
+
+ @Test
+ public void defaultWarehouseLocation_noDbUri() {
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db").build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ String location = glueCatalog.defaultWarehouseLocation(TableIdentifier.of("db", "table"));
+ Assert.assertEquals(WAREHOUSE_PATH + "/db.db/table", location);
+ }
+
+ @Test
+ public void defaultWarehouseLocation_dbUri() {
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db").locationUri("s3://bucket2/db").build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ String location = glueCatalog.defaultWarehouseLocation(TableIdentifier.of("db", "table"));
+ Assert.assertEquals("s3://bucket2/db/table", location);
+ }
+
+ @Test
+ public void listTables() {
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db1").build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ Mockito.doReturn(GetTablesResponse.builder()
+ .tableList(
+ Table.builder().databaseName("db1").name("t1").build(),
+ Table.builder().databaseName("db1").name("t2").build()
+ ).build())
+ .when(glue).getTables(Mockito.any(GetTablesRequest.class));
+ Assert.assertEquals(
+ Lists.newArrayList(
+ TableIdentifier.of("db1", "t1"),
+ TableIdentifier.of("db1", "t2")
+ ),
+ glueCatalog.listTables(Namespace.of("db1"))
+ );
+ }
+
+ @Test
+ public void listTables_pagination() {
+ AtomicInteger counter = new AtomicInteger(10);
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db1").build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (counter.decrementAndGet() > 0) {
+ return GetTablesResponse.builder()
+ .tableList(
+ Table.builder().databaseName("db1").name(
+ UUID.randomUUID().toString().replace("-", "")).build()
+ )
+ .nextToken("token")
+ .build();
+ } else {
+ return GetTablesResponse.builder()
+ .tableList(Table.builder().databaseName("db1").name("tb1").build())
+ .build();
+ }
+ }
+ }).when(glue).getTables(Mockito.any(GetTablesRequest.class));
+ Assert.assertEquals(10, glueCatalog.listTables(Namespace.of("db1")).size());
+ }
+
+ @Test
+ public void dropTable() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+ BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE);
+ Mockito.doReturn(GetTableResponse.builder()
+ .table(Table.builder().databaseName("db1").name("t1").parameters(properties).build()).build())
+ .when(glue).getTable(Mockito.any(GetTableRequest.class));
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db1").build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ Mockito.doReturn(DeleteTableResponse.builder().build())
+ .when(glue).deleteTable(Mockito.any(DeleteTableRequest.class));
+ glueCatalog.dropTable(TableIdentifier.of("db1", "t1"));
+ }
+
+ @Test
+ public void renameTable() {
+ AtomicInteger counter = new AtomicInteger(1);
+ Map<String, String> properties = new HashMap<>();
+ properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+ BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE);
+ Mockito.doReturn(GetTableResponse.builder()
+ .table(Table.builder().databaseName("db1").name("t1").parameters(properties).build()).build())
+ .when(glue).getTable(Mockito.any(GetTableRequest.class));
+ Mockito.doReturn(GetTablesResponse.builder().build())
+ .when(glue).getTables(Mockito.any(GetTablesRequest.class));
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db1").build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ counter.decrementAndGet();
+ return DeleteTableResponse.builder().build();
+ }
+ }).when(glue).deleteTable(Mockito.any(DeleteTableRequest.class));
+ glueCatalog.dropTable(TableIdentifier.of("db1", "t1"));
+ Assert.assertEquals(0, counter.get());
+ }
+
+ @Test
+ public void createNamespace() {
+ Mockito.doReturn(CreateDatabaseResponse.builder().build())
+ .when(glue).createDatabase(Mockito.any(CreateDatabaseRequest.class));
+ glueCatalog.createNamespace(Namespace.of("db"));
+ }
+
+ @Test
+ public void createNamespace_badName() {
+ Mockito.doReturn(CreateDatabaseResponse.builder().build())
+ .when(glue).createDatabase(Mockito.any(CreateDatabaseRequest.class));
+ List<Namespace> invalidNamespaces = Lists.newArrayList(
+ Namespace.of("db-1"),
+ Namespace.of("db", "db2")
+ );
+
+ for (Namespace namespace : invalidNamespaces) {
+ AssertHelpers.assertThrows("should not create namespace with invalid or nested names",
+ ValidationException.class,
+ "Cannot convert namespace",
+ () -> glueCatalog.createNamespace(namespace));
+ }
+ }
+
+ @Test
+ public void listNamespaces_all() {
+ Mockito.doReturn(GetDatabasesResponse.builder()
+ .databaseList(
+ Database.builder().name("db1").build(),
+ Database.builder().name("db2").build()
+ ).build())
+ .when(glue).getDatabases(Mockito.any(GetDatabasesRequest.class));
+ Assert.assertEquals(
+ Lists.newArrayList(
+ Namespace.of("db1"),
+ Namespace.of("db2")
+ ),
+ glueCatalog.listNamespaces()
+ );
+ }
+
+ @Test
+ public void listNamespaces_pagination() {
+ AtomicInteger counter = new AtomicInteger(10);
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ if (counter.decrementAndGet() > 0) {
+ return GetDatabasesResponse.builder()
+ .databaseList(
+ Database.builder().name(UUID.randomUUID().toString().replace("-", "")).build()
+ )
+ .nextToken("token")
+ .build();
+ } else {
+ return GetDatabasesResponse.builder()
+ .databaseList(Database.builder().name("db").build())
+ .build();
+ }
+ }
+ }).when(glue).getDatabases(Mockito.any(GetDatabasesRequest.class));
+ Assert.assertEquals(10, glueCatalog.listNamespaces().size());
+ }
+
+ @Test
+ public void listNamespaces_self() {
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db1").build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ Assert.assertEquals(
+ "list self should return empty list",
+ Lists.newArrayList(),
+ glueCatalog.listNamespaces(Namespace.of("db1"))
+ );
+ }
+
+ @Test
+ public void listNamespaces_selfInvalid() {
+ AssertHelpers.assertThrows("table name invalid",
+ ValidationException.class,
+ "Cannot convert namespace",
+ () -> glueCatalog.listNamespaces(Namespace.of("db-1")));
+ }
+
+ @Test
+ public void loadNamespaceMetadata() {
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("key", "val");
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db1")
+ .parameters(parameters)
+ .build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ Assert.assertEquals(parameters, glueCatalog.loadNamespaceMetadata(Namespace.of("db1")));
+ }
+
+ @Test
+ public void dropNamespace() {
+ Mockito.doReturn(GetTablesResponse.builder().build())
+ .when(glue).getTables(Mockito.any(GetTablesRequest.class));
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db1").build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ Mockito.doReturn(DeleteDatabaseResponse.builder().build())
+ .when(glue).deleteDatabase(Mockito.any(DeleteDatabaseRequest.class));
+ glueCatalog.dropNamespace(Namespace.of("db1"));
+ }
+
+ @Test
+ public void dropNamespace_notEmpty() {
+ Mockito.doReturn(GetTablesResponse.builder()
+ .tableList(
+ Table.builder().databaseName("db1").name("t1").build(),
+ Table.builder().databaseName("db1").name("t2").build()
+ ).build())
+ .when(glue).getTables(Mockito.any(GetTablesRequest.class));
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db1").build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ Mockito.doReturn(DeleteDatabaseResponse.builder().build())
+ .when(glue).deleteDatabase(Mockito.any(DeleteDatabaseRequest.class));
+ AssertHelpers.assertThrows("namespace should not be dropped when still has table",
+ NamespaceNotEmptyException.class,
+ "Cannot drop namespace",
+ () -> glueCatalog.dropNamespace(Namespace.of("db1")));
+ }
+
+ @Test
+ public void setProperties() {
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("key", "val");
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db1")
+ .parameters(parameters)
+ .build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ Mockito.doReturn(UpdateDatabaseResponse.builder().build())
+ .when(glue).updateDatabase(Mockito.any(UpdateDatabaseRequest.class));
+ glueCatalog.setProperties(Namespace.of("db1"), parameters);
+ }
+
+ @Test
+ public void removeProperties() {
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("key", "val");
+ Mockito.doReturn(GetDatabaseResponse.builder()
+ .database(Database.builder().name("db1")
+ .parameters(parameters)
+ .build()).build())
+ .when(glue).getDatabase(Mockito.any(GetDatabaseRequest.class));
+ Mockito.doReturn(UpdateDatabaseResponse.builder().build())
+ .when(glue).updateDatabase(Mockito.any(UpdateDatabaseRequest.class));
+ glueCatalog.removeProperties(Namespace.of("db1"), Sets.newHashSet("key"));
+ }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/GlueToIcebergConverterTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueToIcebergConverterTest.java
new file mode 100644
index 0000000..88a9eee
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/glue/GlueToIcebergConverterTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.Table;
+
+public class GlueToIcebergConverterTest {
+
+ @Test
+ public void toNamespace() {
+ Database database = Database.builder()
+ .name("db")
+ .build();
+ Namespace namespace = Namespace.of("db");
+ Assert.assertEquals(namespace, GlueToIcebergConverter.toNamespace(database));
+ }
+
+ @Test
+ public void toTableId() {
+ Table table = Table.builder()
+ .databaseName("db")
+ .name("name")
+ .build();
+ TableIdentifier icebergId = TableIdentifier.of("db", "name");
+ Assert.assertEquals(icebergId, GlueToIcebergConverter.toTableId(table));
+ }
+
+ @Test
+ public void validateTable() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
+ BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE);
+ Table table = Table.builder()
+ .parameters(properties)
+ .build();
+ GlueToIcebergConverter.validateTable(table, "name");
+ }
+
+ @Test
+ public void validateTable_icebergPropertyNotFound() {
+ Map<String, String> properties = new HashMap<>();
+ Table table = Table.builder()
+ .parameters(properties)
+ .build();
+ AssertHelpers.assertThrows("Iceberg property not found",
+ ValidationException.class,
+ "Input Glue table is not an iceberg table",
+ () -> GlueToIcebergConverter.validateTable(table, "name")
+ );
+ }
+
+ @Test
+ public void validateTable_icebergPropertyValueWrong() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(BaseMetastoreTableOperations.TABLE_TYPE_PROP, "other");
+ Table table = Table.builder()
+ .parameters(properties)
+ .build();
+ AssertHelpers.assertThrows("Iceberg property value wrong",
+ ValidationException.class,
+ "Input Glue table is not an iceberg table",
+ () -> GlueToIcebergConverter.validateTable(table, "name")
+ );
+ }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/glue/IcebergToGlueConverterTest.java b/aws/src/test/java/org/apache/iceberg/aws/glue/IcebergToGlueConverterTest.java
new file mode 100644
index 0000000..d4c20b7
--- /dev/null
+++ b/aws/src/test/java/org/apache/iceberg/aws/glue/IcebergToGlueConverterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.glue;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+
+public class IcebergToGlueConverterTest {
+
+ @Test
+ public void toDatabaseName() {
+ Assert.assertEquals("db", IcebergToGlueConverter.toDatabaseName(Namespace.of("db")));
+ }
+
+ @Test
+ public void toDatabaseName_fail() {
+ List<Namespace> badNames = Lists.newArrayList(
+ Namespace.of("db", "a"),
+ Namespace.of("db-1"),
+ Namespace.empty(),
+ Namespace.of(""),
+ Namespace.of(new String(new char[600]).replace("\0", "a")));
+ for (Namespace name : badNames) {
+ AssertHelpers.assertThrows("bad namespace name",
+ ValidationException.class,
+ "Cannot convert namespace",
+ () -> IcebergToGlueConverter.toDatabaseName(name)
+ );
+ }
+ }
+
+ @Test
+ public void toDatabaseInput() {
+ Map<String, String> param = new HashMap<>();
+ DatabaseInput input = DatabaseInput.builder()
+ .name("db")
+ .parameters(param)
+ .build();
+ Namespace namespace = Namespace.of("db");
+ Assert.assertEquals(input, IcebergToGlueConverter.toDatabaseInput(namespace, param));
+ }
+}
diff --git a/build.gradle b/build.gradle
index 7df7c9f..a385685 100644
--- a/build.gradle
+++ b/build.gradle
@@ -252,7 +252,16 @@ project(':iceberg-aws') {
compile 'software.amazon.awssdk:url-connection-client'
compile 'software.amazon.awssdk:s3'
compile 'software.amazon.awssdk:kms'
+ compile 'software.amazon.awssdk:glue'
+ compileOnly("org.apache.hadoop:hadoop-common") {
+ exclude group: 'org.apache.avro', module: 'avro'
+ exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+ exclude group: 'javax.servlet', module: 'servlet-api'
+ exclude group: 'com.google.code.gson', module: 'gson'
+ }
+
+ testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
testCompile("com.adobe.testing:s3mock-junit4") {
exclude module: "spring-boot-starter-logging"
exclude module: "logback-classic"