You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "dannycranmer (via GitHub)" <gi...@apache.org> on 2023/03/24 11:25:54 UTC

[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

dannycranmer commented on code in PR #47:
URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1147301118


##########
flink-catalog-aws-glue/pom.xml:
##########
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-catalog-aws-glue</artifactId>
+    <name>Flink : Catalog : AWS : Glue</name>

Review Comment:
   ```suggestion
       <name>Flink : Catalog : AWS : Glue Data Catalog</name>
   ```



##########
flink-catalog-aws-glue/pom.xml:
##########
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-catalog-aws-glue</artifactId>
+    <name>Flink : Catalog : AWS : Glue</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>glue</artifactId>
+            <version>${aws.sdkv2.version}</version>

Review Comment:
   You can remove this line as it should be managed by the bom



##########
flink-catalog-aws-glue/pom.xml:
##########
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   With the formats we have started to add parent modules to split connectors/formats, we should do the same for catalog. Can you move beneath `flink-catalog-aws` like so  https://github.com/apache/flink-connector-aws/tree/main/flink-formats-aws



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1030 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.AwsClientFactories;
+import org.apache.flink.table.catalog.glue.util.AwsProperties;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+public class GlueCatalog extends AbstractCatalog {

Review Comment:
   Missing support annotation. This one should probably be `@PublicEvolving`. Please check all files



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.PrincipalType;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {

Review Comment:
   This class is huge and I wonder if it is worth splitting into smaller classes? We could split by Database/Table/etc?



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConstants.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.catalog.glue.util.AwsClientFactory;
+
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+
+import java.util.regex.Pattern;
+
+/** Configs for catalog meta-objects in {@link GlueCatalog}. */
+public class GlueCatalogConstants {
+
+    public static final String COMMENT = "comment";
+    public static final String DEFAULT_SEPARATOR = ":";
+    public static final String LOCATION_SEPARATOR = "/";
+    public static final String LOCATION_URI = "location-uri";
+    public static final String AND = "and";
+    public static final String NEXT_LINE = "\n";
+    public static final String SPACE = " ";
+
+    public static final String TABLE_OWNER = "owner";
+    public static final String TABLE_INPUT_FORMAT = "table.input.format";
+    public static final String TABLE_OUTPUT_FORMAT = "table.output.format";
+
+    public static final String FLINK_SCALA_FUNCTION_PREFIX = "flink:scala:";
+    public static final String FLINK_PYTHON_FUNCTION_PREFIX = "flink:python:";
+    public static final String FLINK_JAVA_FUNCTION_PREFIX = "flink:java:";
+
+    public static final String FLINK_CATALOG = "FLINK_CATALOG";
+
+    public static final Pattern GLUE_DB_PATTERN = Pattern.compile("^[a-z0-9_]{1,252}$");
+    public static final String GLUE_EXCEPTION_MSG_IDENTIFIER = "GLUE EXCEPTION";
+    public static final String TABLE_NOT_EXISTS_IDENTIFIER = "TABLE DOESN'T EXISTS";
+    public static final String DEFAULT_PARTITION_NAME = "__GLUE_DEFAULT_PARTITION__";
+
+    // ---- glue configs
+    /**
+     * The type of {@link software.amazon.awssdk.http.SdkHttpClient} implementation used by {@link
+     * AwsClientFactory} If set, all AWS clients will use this specified HTTP client. If not set,
+     * HTTP_CLIENT_TYPE_DEFAULT will be used. For specific types supported, see HTTP_CLIENT_TYPE_*
+     * defined below.
+     */
+    public static final String HTTP_CLIENT_TYPE = "http-client.type";
+
+    /**
+     * Used to configure the connection acquisition timeout in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS =
+            "http-client.apache.connection-acquisition-timeout-ms";
+
+    /**
+     * If Glue should skip name validations It is recommended to stick to Glue best practice in
+     * https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html to make sure operations
+     * are Hive compatible. This is only added for users that have existing conventions using
+     * non-standard characters. When database name and table name validation are skipped, there is
+     * no guarantee that downstream systems would all support the names.
+     */
+    public static final String GLUE_CATALOG_SKIP_NAME_VALIDATION = "glue.skip-name-validation";
+
+    /**
+     * Used to configure the connection max idle time in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS =
+            "http-client.apache.connection-max-idle-time-ms";
+
+    /**
+     * Used to configure the connection time to live in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS =
+            "http-client.apache.connection-time-to-live-ms";
+
+    /**
+     * Used to configure the connection timeout in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS =

Review Comment:
   All of the AWS SDK/client specific things should ideally go in the AWS base module so they can be reused across projects. We are aiming for consistent configuration keys across AWS integrations. Can you please refactor?



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1030 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.AwsClientFactories;
+import org.apache.flink.table.catalog.glue.util.AwsProperties;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+public class GlueCatalog extends AbstractCatalog {
+
+    /** instance of GlueOperator to facilitate glue related actions. */
+    public GlueOperator glueOperator;
+
+    /** Default database name if not passed as part of catalog. */
+    public static final String DEFAULT_DB = "default";
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);
+
+    public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) {
+        super(catalogName, databaseName);
+        checkNotNull(glueProperties);
+        // setLocationUri for the database level
+        String locationUri = "";
+        if (glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).isPresent()) {
+            locationUri = glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get();
+        }
+        // initialize aws client factories
+        AwsProperties awsProperties = new AwsProperties(glueProperties);
+
+        // create glue client
+        GlueClient glueClient = AwsClientFactories.factory(awsProperties).glue();
+        this.glueOperator = new GlueOperator(locationUri, getName(), awsProperties, glueClient);
+    }
+
+    /**
+     * Open the catalog. Used for any required preparation in initialization phase.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void open() throws CatalogException {}
+
+    /**
+     * Close the catalog when it is no longer needed and release any resource that it might be
+     * holding.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void close() throws CatalogException {
+        try {
+            glueOperator.closeClient();
+        } catch (Exception e) {
+            LOG.warn("Glue Client is not closed properly!");

Review Comment:
   1. Please include the error in the log `LOG.warn("..", e);`
   2. Do we not want to rethrow this here? `throw new CatalogException("...", e);`



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1030 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.AwsClientFactories;
+import org.apache.flink.table.catalog.glue.util.AwsProperties;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+public class GlueCatalog extends AbstractCatalog {
+
+    /** instance of GlueOperator to facilitate glue related actions. */
+    public GlueOperator glueOperator;
+
+    /** Default database name if not passed as part of catalog. */
+    public static final String DEFAULT_DB = "default";
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);
+
+    public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) {
+        super(catalogName, databaseName);
+        checkNotNull(glueProperties);
+        // setLocationUri for the database level
+        String locationUri = "";
+        if (glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).isPresent()) {
+            locationUri = glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get();
+        }
+        // initialize aws client factories

Review Comment:
   Please review the guidance on comments https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#comments
   
   These type of comments do not add any extra context/info and can be removed



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+
+/** Default factories. */
+public class AwsClientFactories {
+
+    private AwsClientFactories() {}
+
+    public static AwsClientFactory factory(AwsProperties properties) {
+        return new DefaultAwsClientFactory(properties);
+    }
+
+    static class DefaultAwsClientFactory implements AwsClientFactory {
+
+        /** instance that holds provides. */
+        private AwsProperties awsProperties;
+
+        DefaultAwsClientFactory(AwsProperties properties) {
+            awsProperties = properties;
+        }
+
+        @Override
+        public GlueClient glue() {
+            return GlueClient.builder()

Review Comment:
   How about credential providers? Can we use the util classes [#1](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSClientUtil.java), [#2](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java) to build clients and credential providers so the configuration/options/features is consistent/reused for all integrations?



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+
+/**
+ * Interface to customize AWS clients used by Flink. A custom factory must have a no-arg.
+ * constructor, and use {@link #initialize(ReadableConfig)} to initialize the factory.

Review Comment:
   Interesting idea, I like this. Can we extract anything common to aws-base for reuse with other integrations?



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1030 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.AwsClientFactories;
+import org.apache.flink.table.catalog.glue.util.AwsProperties;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+public class GlueCatalog extends AbstractCatalog {
+
+    /** instance of GlueOperator to facilitate glue related actions. */
+    public GlueOperator glueOperator;
+
+    /** Default database name if not passed as part of catalog. */
+    public static final String DEFAULT_DB = "default";
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);
+
+    public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) {
+        super(catalogName, databaseName);
+        checkNotNull(glueProperties);
+        // setLocationUri for the database level
+        String locationUri = "";
+        if (glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).isPresent()) {
+            locationUri = glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get();
+        }
+        // initialize aws client factories
+        AwsProperties awsProperties = new AwsProperties(glueProperties);
+
+        // create glue client
+        GlueClient glueClient = AwsClientFactories.factory(awsProperties).glue();
+        this.glueOperator = new GlueOperator(locationUri, getName(), awsProperties, glueClient);
+    }
+
+    /**
+     * Open the catalog. Used for any required preparation in initialization phase.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void open() throws CatalogException {}
+
+    /**
+     * Close the catalog when it is no longer needed and release any resource that it might be
+     * holding.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void close() throws CatalogException {
+        try {
+            glueOperator.closeClient();
+        } catch (Exception e) {
+            LOG.warn("Glue Client is not closed properly!");
+        }
+    }
+
+    // ------ databases ------
+
+    /**
+     * Create a database.
+     *
+     * @param name Name of the database to be created
+     * @param database The database definition
+     * @param ignoreIfExists Flag to specify behavior when a database with the given name already
+     *     exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do
+     *     nothing.
+     * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists
+     *     is false
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(name));
+        checkNotNull(database, "Database cannot be null.");
+
+        // glue supports lowercase naming convention
+        name = name.toLowerCase(Locale.ROOT);

Review Comment:
   Suggest extracting this to a helper method since it is repeated 



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.PrincipalType;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {
+
+    /** Defines the location URI for database. This locationUri is high level path for catalog */
+    public final String locationUri;
+
+    /**
+     * Instance of AwsProperties which holds the configs related to configure glue and aws setup.
+     */
+    private final AwsProperties awsProperties;
+
+    /** http client for glue client. Current implementation for client is sync type. */
+    private final GlueClient glueClient;
+
+    private final String catalogName;
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueOperator.class);
+
+    public GlueOperator(
+            String locationUri,
+            String catalogName,
+            AwsProperties awsProperties,
+            GlueClient glueClient) {
+        this.locationUri = locationUri;

Review Comment:
   Validate public args for blank/null etc



##########
flink-catalog-aws-glue/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-catalog-aws-glue</artifactId>
+    <name>Flink : Catalog : AWS : Glue</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>glue</artifactId>
+            <version>${aws.sdkv2.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>apache-client</artifactId>
+            <version>${aws.sdkv2.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>url-connection-client</artifactId>
+            <version>${aws.sdkv2.version}</version>
+        </dependency>
+
+        <!-- ArchUit test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-architecture-tests-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>

Review Comment:
   Is it expected to shade this? Usually we have a plain and Uber (`sql`) jar. I would rather not shade this module incase users want to use it with the Table API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org