You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "vahmed-hamdy (via GitHub)" <gi...@apache.org> on 2023/04/24 16:20:56 UTC

[GitHub] [flink-connector-aws] vahmed-hamdy commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

vahmed-hamdy commented on code in PR #47:
URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1175426325


##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1073 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.connector.aws.util.AwsClientFactories;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.glue.util.GlueUtils;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+@PublicEvolving
+public class GlueCatalog extends AbstractCatalog {
+
+    /** instance of GlueOperator to facilitate glue related actions. */
+    public GlueOperator glueOperator;
+
+    /** Default database name if not passed as part of catalog. */
+    public static final String DEFAULT_DB = "default";
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);
+
+    public GlueCatalog(String catalogName, String databaseName, ReadableConfig catalogConfig) {
+        super(catalogName, databaseName);
+        checkNotNull(catalogConfig, "Catalog config cannot be null.");
+        AWSConfig awsConfig = new AWSConfig(catalogConfig);
+        GlueClient glueClient = AwsClientFactories.factory(awsConfig).glue();
+        String catalogPath = catalogConfig.get(GlueCatalogOptions.PATH);
+        this.glueOperator = new GlueOperator(getName(), catalogPath, awsConfig, glueClient);
+    }
+
+    @VisibleForTesting
+    public GlueCatalog(String catalogName, String databaseName, GlueOperator glueOperator) {
+        super(catalogName, databaseName);
+        this.glueOperator = glueOperator;
+    }
+
+    /**
+     * Open the catalog. Used for any required preparation in initialization phase.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void open() throws CatalogException {}
+
+    /**
+     * Close the catalog when it is no longer needed and release any resource that it might be
+     * holding.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void close() throws CatalogException {
+        try {
+            glueOperator.closeClient();
+        } catch (Exception e) {

Review Comment:
   Can we catch `CatalogException` instead, we don't want to mask other exceptions 



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {
+
+    /**
+     * Instance of AwsProperties which holds the configs related to configure glue and aws setup.
+     */
+    private final AWSConfig awsConfig;
+
+    /** http client for glue client. Current implementation for client is sync type. */
+    private final GlueClient glueClient;
+
+    private final String catalogName;
+
+    public final String catalogPath;
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueOperator.class);
+
+    public GlueOperator(
+            String catalogName, String catalogPath, AWSConfig awsConfig, GlueClient glueClient) {
+        this.awsConfig = awsConfig;
+        this.glueClient = glueClient;
+        this.catalogPath = catalogPath;
+        this.catalogName = catalogName;
+    }
+
+    public void closeClient() {
+        glueClient.close();
+    }
+
+    // -------------- Database related operations.
+
+    /**
+     * List all databases present.
+     *
+     * @return List of fully qualified database names
+     */
+    public List<String> listGlueDatabases() throws CatalogException {
+        try {
+            GetDatabasesRequest.Builder databasesRequestBuilder =
+                    GetDatabasesRequest.builder().catalogId(getGlueCatalogId());
+
+            GetDatabasesResponse response =
+                    glueClient.getDatabases(databasesRequestBuilder.build());
+            List<String> glueDatabases =
+                    response.databaseList().stream()
+                            .map(Database::name)
+                            .collect(Collectors.toList());
+            String dbResultNextToken = response.nextToken();
+            if (Optional.ofNullable(dbResultNextToken).isPresent()) {
+                do {
+                    databasesRequestBuilder.nextToken(dbResultNextToken);
+                    response = glueClient.getDatabases(databasesRequestBuilder.build());
+                    glueDatabases.addAll(
+                            response.databaseList().stream()
+                                    .map(Database::name)
+                                    .collect(Collectors.toList()));
+                    dbResultNextToken = response.nextToken();
+                } while (Optional.ofNullable(dbResultNextToken).isPresent());
+            }
+            return glueDatabases;
+        } catch (GlueException e) {
+            throw new CatalogException(
+                    GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause());
+        }
+    }
+
+    /**
+     * Create database in glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @param database Instance of {@link CatalogDatabase}.
+     * @throws CatalogException when unknown error from glue servers.
+     * @throws DatabaseAlreadyExistException when database exists already in glue data catalog.
+     */
+    public void createGlueDatabase(String databaseName, CatalogDatabase database)
+            throws CatalogException, DatabaseAlreadyExistException {
+
+        GlueUtils.validate(databaseName);
+        Map<String, String> properties = new HashMap<>(database.getProperties());
+        String str =
+                properties.entrySet().stream()
+                        .map(e -> e.getKey() + ":" + e.getValue())
+                        .collect(Collectors.joining("|"));
+        LOG.info("Create DB MYLOG:- " + str);

Review Comment:
   What is MYLOG?



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1073 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.connector.aws.util.AwsClientFactories;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.glue.util.GlueUtils;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+@PublicEvolving
+public class GlueCatalog extends AbstractCatalog {
+
+    /** instance of GlueOperator to facilitate glue related actions. */
+    public GlueOperator glueOperator;
+
+    /** Default database name if not passed as part of catalog. */
+    public static final String DEFAULT_DB = "default";
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);

Review Comment:
   nit: public static members should go on top of member definitions



##########
flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java:
##########
@@ -144,6 +145,194 @@ public enum CredentialProvider {
     /** Read Request timeout for {@link SdkAsyncHttpClient}. */
     public static final String HTTP_CLIENT_READ_TIMEOUT_MILLIS = "aws.http-client.read-timeout";
 
+    /**
+     * The type of {@link software.amazon.awssdk.http.SdkHttpClient} implementation used by {@link
+     * AwsClientFactory} If set, all AWS clients will use this specified HTTP client. If not set,
+     * HTTP_CLIENT_TYPE_DEFAULT will be used. For specific types supported, see HTTP_CLIENT_TYPE_*
+     * defined below.
+     */
+    public static final String HTTP_CLIENT_TYPE = "http-client.type";
+
+    /**
+     * Used to configure the connection acquisition timeout in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS =
+            "http-client.apache.connection-acquisition-timeout-ms";
+
+    /**
+     * If Glue should skip name validations It is recommended to stick to Glue best practice in
+     * https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html to make sure operations
+     * are Hive compatible. This is only added for users that have existing conventions using
+     * non-standard characters. When database name and table name validation are skipped, there is
+     * no guarantee that downstream systems would all support the names.
+     */
+    public static final String GLUE_CATALOG_SKIP_NAME_VALIDATION = "glue.skip-name-validation";
+
+    /**
+     * Used to configure the connection max idle time in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS =
+            "http-client.apache.connection-max-idle-time-ms";
+
+    /**
+     * Used to configure the connection time to live in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS =
+            "http-client.apache.connection-time-to-live-ms";
+
+    // ---- glue configs
+
+    /**
+     * Used to configure the connection timeout in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS =
+            "http-client.apache.connection-timeout-ms";
+
+    /**
+     * Used to configure whether to enable the expect continue setting for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>In default, this is disabled.
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_EXPECT_CONTINUE_ENABLED =
+            "http-client.apache.expect-continue-enabled";
+
+    /**
+     * Used to configure the max connections number for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_MAX_CONNECTIONS =
+            "http-client.apache.max-connections";
+
+    /**
+     * Used to configure the socket timeout in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS =
+            "http-client.apache.socket-timeout-ms";
+
+    /**
+     * Used to configure whether to enable the tcp keep alive setting for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE.
+     *
+     * <p>In default, this is disabled.
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_TCP_KEEP_ALIVE_ENABLED =
+            "http-client.apache.tcp-keep-alive-enabled";
+
+    /**
+     * Used to configure whether to use idle connection reaper for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE.
+     *
+     * <p>In default, this is enabled.
+     *
+     * <p>For more details, see
+     * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED =
+            "http-client.apache.use-idle-connection-reaper-enabled";
+
+    /**
+     * Configure an alternative endpoint of the Glue service for GlueCatalog to access.
+     *
+     * <p>This could be used to use GlueCatalog with any glue-compatible metastore service that has
+     * a different endpoint
+     */
+    public static final String GLUE_CATALOG_ENDPOINT = "glue.endpoint";

Review Comment:
   We mainly use this package for common AWS utils and constants, this seems Glue specific IMO



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {
+
+    /**
+     * Instance of AwsProperties which holds the configs related to configure glue and aws setup.
+     */
+    private final AWSConfig awsConfig;
+
+    /** http client for glue client. Current implementation for client is sync type. */
+    private final GlueClient glueClient;
+
+    private final String catalogName;
+
+    public final String catalogPath;
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueOperator.class);
+
+    public GlueOperator(
+            String catalogName, String catalogPath, AWSConfig awsConfig, GlueClient glueClient) {
+        this.awsConfig = awsConfig;
+        this.glueClient = glueClient;
+        this.catalogPath = catalogPath;
+        this.catalogName = catalogName;
+    }
+
+    public void closeClient() {
+        glueClient.close();
+    }
+
+    // -------------- Database related operations.
+
+    /**
+     * List all databases present.
+     *
+     * @return List of fully qualified database names
+     */
+    public List<String> listGlueDatabases() throws CatalogException {
+        try {
+            GetDatabasesRequest.Builder databasesRequestBuilder =
+                    GetDatabasesRequest.builder().catalogId(getGlueCatalogId());
+
+            GetDatabasesResponse response =
+                    glueClient.getDatabases(databasesRequestBuilder.build());
+            List<String> glueDatabases =
+                    response.databaseList().stream()
+                            .map(Database::name)
+                            .collect(Collectors.toList());
+            String dbResultNextToken = response.nextToken();
+            if (Optional.ofNullable(dbResultNextToken).isPresent()) {
+                do {
+                    databasesRequestBuilder.nextToken(dbResultNextToken);
+                    response = glueClient.getDatabases(databasesRequestBuilder.build());
+                    glueDatabases.addAll(
+                            response.databaseList().stream()
+                                    .map(Database::name)
+                                    .collect(Collectors.toList()));
+                    dbResultNextToken = response.nextToken();
+                } while (Optional.ofNullable(dbResultNextToken).isPresent());
+            }
+            return glueDatabases;
+        } catch (GlueException e) {
+            throw new CatalogException(
+                    GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause());
+        }
+    }
+
+    /**
+     * Create database in glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @param database Instance of {@link CatalogDatabase}.
+     * @throws CatalogException when unknown error from glue servers.
+     * @throws DatabaseAlreadyExistException when database exists already in glue data catalog.
+     */
+    public void createGlueDatabase(String databaseName, CatalogDatabase database)
+            throws CatalogException, DatabaseAlreadyExistException {
+
+        GlueUtils.validate(databaseName);
+        Map<String, String> properties = new HashMap<>(database.getProperties());
+        String str =
+                properties.entrySet().stream()
+                        .map(e -> e.getKey() + ":" + e.getValue())
+                        .collect(Collectors.joining("|"));
+        LOG.info("Create DB MYLOG:- " + str);
+        DatabaseInput.Builder databaseInputBuilder =
+                DatabaseInput.builder()
+                        .name(databaseName)
+                        .description(database.getComment())
+                        .locationUri(
+                                GlueUtils.extractDatabaseLocation(
+                                        properties, databaseName, catalogPath))
+                        .parameters(properties);
+        CreateDatabaseRequest.Builder requestBuilder =
+                CreateDatabaseRequest.builder()
+                        .databaseInput(databaseInputBuilder.build())
+                        .catalogId(getGlueCatalogId());
+        LOG.info(
+                String.format(
+                        "Database Properties Listing :- %s",
+                        properties.entrySet().stream()
+                                .map(e -> e.getKey() + e.getValue())
+                                .collect(Collectors.joining(","))));
+        try {
+            CreateDatabaseResponse response = glueClient.createDatabase(requestBuilder.build());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(GlueUtils.getDebugLog(response));
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("%s Database created.", databaseName));
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseAlreadyExistException(catalogName, databaseName, e);
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Delete a database from Glue data catalog only when database is empty.
+     *
+     * @param databaseName fully qualified name of database
+     * @throws CatalogException Any Exception thrown due to glue error
+     * @throws DatabaseNotExistException when database doesn't exists in glue catalog.
+     */
+    public void dropGlueDatabase(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+
+        GlueUtils.validate(databaseName);
+        DeleteDatabaseRequest deleteDatabaseRequest =
+                DeleteDatabaseRequest.builder()
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            DeleteDatabaseResponse response = glueClient.deleteDatabase(deleteDatabaseRequest);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(GlueUtils.getDebugLog(response));
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Database Dropped %s", databaseName));
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        } catch (GlueException e) {
+            throw new CatalogException(catalogName, e);
+        }
+    }
+
+    /**
+     * Drops list of table in database from glue data catalog.
+     *
+     * @param databaseName fully qualified name of database
+     * @param tables List of tables to remove from database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void deleteTablesFromDatabase(String databaseName, Collection<String> tables)
+            throws CatalogException {
+        GlueUtils.validate(databaseName);
+        BatchDeleteTableRequest batchTableRequest =
+                BatchDeleteTableRequest.builder()
+                        .databaseName(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .tablesToDelete(tables)
+                        .build();
+
+        try {
+            BatchDeleteTableResponse response = glueClient.batchDeleteTable(batchTableRequest);
+            if (response.hasErrors()) {
+                String errorMsg =
+                        String.format(
+                                "Glue Table errors:- %s",
+                                response.errors().stream()
+                                        .map(
+                                                e ->
+                                                        "Table: "
+                                                                + e.tableName()
+                                                                + "\nErrorDetail:  "
+                                                                + e.errorDetail().errorMessage())
+                                        .collect(Collectors.joining("\n")));
+                LOG.error(errorMsg);
+                throw new CatalogException(errorMsg);
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Tables deleted.");
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drops list of user defined function in database from glue data catalog.
+     *
+     * @param databaseName fully qualified name of database
+     * @param functions List of tables to remove from database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void deleteFunctionsFromDatabase(String databaseName, Collection<String> functions)
+            throws CatalogException {
+        GlueUtils.validate(databaseName);
+        try {
+            DeleteUserDefinedFunctionRequest.Builder requestBuilder =
+                    DeleteUserDefinedFunctionRequest.builder()
+                            .databaseName(databaseName)
+                            .catalogId(getGlueCatalogId());
+            for (String functionName : functions) {
+                requestBuilder.functionName(functionName);
+                DeleteUserDefinedFunctionResponse response =
+                        glueClient.deleteUserDefinedFunction(requestBuilder.build());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(GlueUtils.getDebugLog(response));
+                }
+                GlueUtils.validateGlueResponse(response);
+                LOG.info(String.format("Dropped Function %s", functionName));
+            }
+
+        } catch (GlueException e) {
+            LOG.error(String.format("Error deleting functions in database: %s", databaseName));
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Check if database is empty. i.e. it should not contain 1. table 2. functions
+     *
+     * @param databaseName name of database.
+     * @return boolean True/False based on the content of database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public boolean isDatabaseEmpty(String databaseName) throws CatalogException {
+        checkArgument(!isNullOrWhitespaceOnly(databaseName));
+        GlueUtils.validate(databaseName);
+        GetTablesRequest tablesRequest =
+                GetTablesRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(databaseName)
+                        .maxResults(1)
+                        .build();
+        try {
+            GetTablesResponse response = glueClient.getTables(tablesRequest);
+            return response.sdkHttpResponse().isSuccessful()
+                    && response.tableList().size() == 0
+                    && listGlueFunctions(databaseName).size() == 0;
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get a database from this glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @return Instance of {@link CatalogDatabase } .
+     * @throws DatabaseNotExistException when database doesn't exists in Glue data catalog.
+     * @throws CatalogException when any unknown error occurs in glue.
+     */
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        GlueUtils.validate(databaseName);
+        GetDatabaseRequest getDatabaseRequest =
+                GetDatabaseRequest.builder()
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetDatabaseResponse response = glueClient.getDatabase(getDatabaseRequest);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER
+                                + ": existing database. Client call response :- "
+                                + response.sdkHttpResponse().statusText());
+            }
+            GlueUtils.validateGlueResponse(response);
+            return GlueUtils.getCatalogDatabase(response.database());
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    public void updateGlueDatabase(String databaseName, CatalogDatabase newDatabase)
+            throws CatalogException {
+
+        GlueUtils.validate(databaseName);
+        Map<String, String> properties = new HashMap<>(newDatabase.getProperties());
+        DatabaseInput.Builder databaseInputBuilder =
+                DatabaseInput.builder()
+                        .parameters(properties)
+                        .description(newDatabase.getComment())
+                        .name(databaseName)
+                        .locationUri(
+                                GlueUtils.extractDatabaseLocation(
+                                        properties, databaseName, catalogPath));
+
+        UpdateDatabaseRequest updateRequest =
+                UpdateDatabaseRequest.builder()
+                        .databaseInput(databaseInputBuilder.build())
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        UpdateDatabaseResponse response = glueClient.updateDatabase(updateRequest);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(GlueUtils.getDebugLog(response));
+        }
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Database Updated. %s", databaseName));
+    }
+
+    // -------------- Table related operations.
+
+    /**
+     * Create table in glue data catalog.
+     *
+     * @param tablePath Fully qualified name of table. {@link ObjectPath}
+     * @param table instance of {@link CatalogBaseTable} containing table related information.
+     * @param managedTable identifier if managed table.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void createGlueTable(
+            final ObjectPath tablePath, final CatalogBaseTable table, final boolean managedTable)
+            throws CatalogException {
+
+        checkNotNull(table, "Table cannot be Null");
+        checkNotNull(tablePath, "Table Path cannot be Null");
+        final Map<String, String> properties = new HashMap<>(table.getOptions());
+        String tableOwner = GlueUtils.extractTableOwner(properties);
+        if (managedTable) {
+            properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER);
+        }
+        Set<Column> glueColumns = GlueUtils.getGlueColumnsFromCatalogTable(table);
+        String tableLocation = "";
+        if (properties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+            tableLocation = properties.remove(GlueCatalogConstants.LOCATION_URI);
+        } else {
+            try {
+                tableLocation =
+                        getDatabase(tablePath.getDatabaseName())
+                                        .getProperties()
+                                        .get(GlueCatalogConstants.LOCATION_URI)
+                                + GlueCatalogConstants.LOCATION_SEPARATOR
+                                + tablePath.getObjectName();
+            } catch (DatabaseNotExistException e) {
+                LOG.warn("Database doesn't Exists");
+            }
+        }
+
+        StorageDescriptor.Builder storageDescriptorBuilder =
+                StorageDescriptor.builder()
+                        .inputFormat(GlueUtils.extractInputFormat(properties))
+                        .outputFormat(GlueUtils.extractOutputFormat(properties))
+                        .location(tableLocation);
+
+        TableInput.Builder tableInputBuilder =
+                TableInput.builder()
+                        .name(tablePath.getObjectName())
+                        .description(table.getComment())
+                        .tableType(table.getTableKind().name())
+                        .lastAccessTime(Instant.now())
+                        .owner(tableOwner);
+
+        if (table.getTableKind().name().equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name())) {
+            tableInputBuilder.viewExpandedText(GlueUtils.getExpandedQuery(table));
+            tableInputBuilder.viewOriginalText(GlueUtils.getOriginalQuery(table));
+        }
+
+        CreateTableRequest.Builder requestBuilder =
+                CreateTableRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName());
+
+        if (table instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) table;
+            if (catalogTable.isPartitioned()) {
+                LOG.info("Catalog table is partitioned");
+                Collection<Column> partitionKeys =
+                        GlueUtils.getPartitionKeys(catalogTable, glueColumns);
+                LOG.info(
+                        "Partition columns are -> "
+                                + partitionKeys.stream()
+                                        .map(Column::name)
+                                        .collect(Collectors.joining(",")));
+                tableInputBuilder.partitionKeys(partitionKeys);
+            }
+        }
+
+        try {
+            storageDescriptorBuilder.columns(glueColumns);
+            tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+            tableInputBuilder.parameters(properties);
+            requestBuilder.tableInput(tableInputBuilder.build());
+            CreateTableResponse response = glueClient.createTable(requestBuilder.build());
+            LOG.debug(GlueUtils.getDebugLog(response));
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Table created. %s", tablePath.getFullName()));
+
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * @param tablePath fully Qualified table path.
+     * @param newTable instance of {@link CatalogBaseTable} containing information for table.
+     * @param managedTable identifier for managed table.
+     * @throws CatalogException Glue related exception.
+     */
+    public void alterGlueTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean managedTable)
+            throws CatalogException {
+
+        Map<String, String> properties = new HashMap<>(newTable.getOptions());
+        String tableOwner = GlueUtils.extractTableOwner(properties);
+
+        if (managedTable) {
+            properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER);
+        }
+
+        Set<Column> glueColumns = GlueUtils.getGlueColumnsFromCatalogTable(newTable);
+
+        // create StorageDescriptor for table
+        StorageDescriptor.Builder storageDescriptorBuilder =
+                StorageDescriptor.builder()
+                        .inputFormat(GlueUtils.extractInputFormat(properties))
+                        .outputFormat(GlueUtils.extractOutputFormat(properties))
+                        .location(
+                                GlueUtils.extractTableLocation(properties, tablePath, catalogPath))
+                        .parameters(properties)
+                        .columns(glueColumns);
+
+        // create TableInput Builder with available information.
+        TableInput.Builder tableInputBuilder =
+                TableInput.builder()
+                        .name(tablePath.getObjectName())
+                        .description(newTable.getComment())
+                        .tableType(newTable.getTableKind().name())
+                        .lastAccessTime(Instant.now())
+                        .owner(tableOwner);
+
+        UpdateTableRequest.Builder requestBuilder =
+                UpdateTableRequest.builder()
+                        .tableInput(tableInputBuilder.build())
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName());
+
+        if (newTable instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) newTable;
+            if (catalogTable.isPartitioned()) {
+                tableInputBuilder.partitionKeys(
+                        GlueUtils.getPartitionKeys(catalogTable, glueColumns));
+            }
+        }
+
+        // apply storage descriptor and tableInput for request
+        tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+        requestBuilder.tableInput(tableInputBuilder.build());
+
+        try {
+            UpdateTableResponse response = glueClient.updateTable(requestBuilder.build());
+            LOG.debug(GlueUtils.getDebugLog(response));
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Table updated. %s", tablePath.getFullName()));
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get names of all tables or views under this database based on type identifier. An empty list
+     * is returned if none exists.
+     *
+     * @param databaseName fully qualified database name.
+     * @return a list of the names of all tables or views in this database based on type identifier.
+     * @throws CatalogException in case of any runtime exception
+     */
+    public List<String> getGlueTableList(String databaseName, String type) throws CatalogException {
+        GetTablesRequest.Builder tablesRequestBuilder =
+                GetTablesRequest.builder().databaseName(databaseName).catalogId(getGlueCatalogId());
+        GetTablesResponse response = glueClient.getTables(tablesRequestBuilder.build());
+        GlueUtils.validateGlueResponse(response);
+        List<String> finalTableList =
+                response.tableList().stream()
+                        .filter(table -> table.tableType().equalsIgnoreCase(type))
+                        .map(Table::name)
+                        .collect(Collectors.toList());
+        String tableResultNextToken = response.nextToken();
+
+        if (Optional.ofNullable(tableResultNextToken).isPresent()) {
+            do {
+                // update token in requestBuilder to fetch next batch
+                tablesRequestBuilder.nextToken(tableResultNextToken);
+                response = glueClient.getTables(tablesRequestBuilder.build());
+                GlueUtils.validateGlueResponse(response);
+                finalTableList.addAll(
+                        response.tableList().stream()
+                                .filter(table -> table.tableType().equalsIgnoreCase(type))
+                                .map(Table::name)
+                                .collect(Collectors.toList()));
+                tableResultNextToken = response.nextToken();
+            } while (Optional.ofNullable(tableResultNextToken).isPresent());
+        }
+        return finalTableList;
+    }
+
+    /**
+     * Returns a {@link Table} identified by the given table Path. {@link ObjectPath}.
+     *
+     * @param tablePath Path of the table or view
+     * @return The requested table. Glue encapsulates whether table or view in its attribute called
+     *     type.
+     * @throws TableNotExistException if the target does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    public Table getGlueTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        GetTableRequest tablesRequest =
+                GetTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetTableResponse response = glueClient.getTable(tablesRequest);
+            LOG.info(String.format("Glue table Found %s", response.table().name()));
+            GlueUtils.validateGlueResponse(response);
+            return response.table();
+        } catch (EntityNotFoundException e) {
+            throw new TableNotExistException(catalogName, tablePath, e);
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Check if a table or view exists in glue data catalog.
+     *
+     * @param tablePath Path of the table or view
+     * @return true if the given table exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    public boolean glueTableExists(ObjectPath tablePath) throws CatalogException {
+        try {
+            Table glueTable = getGlueTable(tablePath);
+            return glueTable != null && glueTable.name().equals(tablePath.getObjectName());
+        } catch (TableNotExistException e) {
+            LOG.warn(
+                    String.format(
+                            "%s\nDatabase: %s Table: %s",
+                            GlueCatalogConstants.TABLE_NOT_EXISTS_IDENTIFIER,
+                            tablePath.getDatabaseName(),
+                            tablePath.getObjectName()));
+            return false;
+        } catch (CatalogException e) {
+            LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause());
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drop a table or view from glue data catalog.
+     *
+     * @param tablePath fully qualified table path
+     * @throws CatalogException on runtime errors.
+     */
+    public void dropGlueTable(ObjectPath tablePath) throws CatalogException {
+        DeleteTableRequest.Builder tableRequestBuilder =
+                DeleteTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId());
+        try {
+            DeleteTableResponse response = glueClient.deleteTable(tableRequestBuilder.build());
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Dropped Table %s.", tablePath.getObjectName()));
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    // -------------- Function related operations.
+
+    /**
+     * Create a function. Function name should be handled in a case-insensitive way.
+     *
+     * @param functionPath path of the function
+     * @param function Flink function to be created
+     * @throws CatalogException in case of any runtime exception
+     */
+    public void createGlueFunction(ObjectPath functionPath, CatalogFunction function)
+            throws CatalogException, FunctionAlreadyExistException {
+
+        UserDefinedFunctionInput functionInput =
+                GlueUtils.createFunctionInput(functionPath, function);
+        CreateUserDefinedFunctionRequest.Builder requestBuilder =
+                CreateUserDefinedFunctionRequest.builder()
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .functionInput(functionInput);
+        try {
+            CreateUserDefinedFunctionResponse response =
+                    glueClient.createUserDefinedFunction(requestBuilder.build());
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Function created. %s", functionPath.getFullName()));
+        } catch (AlreadyExistsException e) {
+            LOG.error(
+                    String.format(
+                            "%s.%s already Exists. Function language of type: %s",
+                            functionPath.getDatabaseName(),
+                            functionPath.getObjectName(),
+                            function.getFunctionLanguage()));
+            throw new FunctionAlreadyExistException(catalogName, functionPath);
+        } catch (GlueException e) {
+            LOG.error("Error creating glue function.", e.getCause());
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get the user defined function from glue Catalog. Function name should be handled in a
+     * case-insensitive way.
+     *
+     * @param functionPath path of the function
+     * @return the requested function
+     * @throws CatalogException in case of any runtime exception
+     */
+    public CatalogFunction getGlueFunction(ObjectPath functionPath) {
+        GetUserDefinedFunctionRequest request =
+                GetUserDefinedFunctionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(functionPath.getDatabaseName())
+                        .functionName(functionPath.getObjectName())
+                        .build();
+        GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request);
+        GlueUtils.validateGlueResponse(response);
+        UserDefinedFunction udf = response.userDefinedFunction();
+
+        List<org.apache.flink.table.resource.ResourceUri> resourceUris = new LinkedList<>();
+        for (ResourceUri resourceUri : udf.resourceUris()) {
+            resourceUris.add(
+                    new org.apache.flink.table.resource.ResourceUri(
+                            ResourceType.valueOf(resourceUri.resourceType().name()),
+                            resourceUri.uri()));
+        }
+
+        return new CatalogFunctionImpl(
+                GlueUtils.getCatalogFunctionClassName(udf),
+                GlueUtils.getFunctionalLanguage(udf),
+                resourceUris);
+    }
+
+    public List<String> listGlueFunctions(String databaseName) {
+        GetUserDefinedFunctionsRequest.Builder functionsRequest =
+                GetUserDefinedFunctionsRequest.builder()
+                        .databaseName(databaseName)
+                        .catalogId(getGlueCatalogId());
+
+        List<String> glueFunctions;
+        try {
+            GetUserDefinedFunctionsResponse functionsResponse =
+                    glueClient.getUserDefinedFunctions(functionsRequest.build());
+            String token = functionsResponse.nextToken();
+            glueFunctions =
+                    functionsResponse.userDefinedFunctions().stream()
+                            .map(UserDefinedFunction::functionName)
+                            .collect(Collectors.toCollection(LinkedList::new));
+            if (Optional.ofNullable(token).isPresent()) {
+                do {
+                    functionsRequest.nextToken(token);
+                    functionsResponse =
+                            glueClient.getUserDefinedFunctions(functionsRequest.build());
+                    glueFunctions.addAll(
+                            functionsResponse.userDefinedFunctions().stream()
+                                    .map(UserDefinedFunction::functionName)
+                                    .collect(Collectors.toCollection(LinkedList::new)));
+                    token = functionsResponse.nextToken();
+                } while (Optional.ofNullable(token).isPresent());
+            }
+
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+        return glueFunctions;
+    }
+
+    public boolean glueFunctionExists(ObjectPath functionPath) {
+        GetUserDefinedFunctionRequest request =
+                GetUserDefinedFunctionRequest.builder()
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .build();
+
+        try {
+            GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request);
+            GlueUtils.validateGlueResponse(response);
+            return response.userDefinedFunction()
+                    .functionName()
+                    .equalsIgnoreCase(functionPath.getObjectName());
+        } catch (EntityNotFoundException e) {
+            LOG.warn(
+                    String.format(
+                            "Entry not found for function %s.%s",
+                            functionPath.getObjectName(), functionPath.getDatabaseName()));
+            return false;
+        } catch (GlueException e) {
+            LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Modify an existing function. Function name should be handled in a case-insensitive way.
+     *
+     * @param functionPath path of function.
+     * @param newFunction modified function.
+     * @throws CatalogException on runtime errors.
+     */
+    public void alterGlueFunction(ObjectPath functionPath, CatalogFunction newFunction)
+            throws CatalogException {
+        UserDefinedFunctionInput functionInput =
+                GlueUtils.createFunctionInput(functionPath, newFunction);
+        UpdateUserDefinedFunctionRequest updateUserDefinedFunctionRequest =
+                UpdateUserDefinedFunctionRequest.builder()
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .functionInput(functionInput)
+                        .build();
+        UpdateUserDefinedFunctionResponse response =
+                glueClient.updateUserDefinedFunction(updateUserDefinedFunctionRequest);
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Function altered. %s", functionPath.getFullName()));
+    }
+
+    /**
+     * @param functionPath fully qualified function path
+     * @throws CatalogException
+     */
+    public void dropGlueFunction(ObjectPath functionPath) throws CatalogException {
+        DeleteUserDefinedFunctionRequest request =
+                DeleteUserDefinedFunctionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .build();
+        DeleteUserDefinedFunctionResponse response = glueClient.deleteUserDefinedFunction(request);
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Dropped Function. %s", functionPath.getFullName()));
+    }
+
+    // -------------- Partition related operations.
+
+    public void ensurePartitionedTable(ObjectPath tablePath, Table glueTable)
+            throws TableNotPartitionedException {
+        if (!glueTable.hasPartitionKeys()) {
+            throw new TableNotPartitionedException(catalogName, tablePath);
+        }
+    }
+
+    /**
+     * create partition in glue data catalog.
+     *
+     * @param glueTable glue table
+     * @param partitionSpec partition spec
+     * @param catalogPartition partition to add
+     */
+    public void createGluePartition(
+            Table glueTable, CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition)
+            throws CatalogException, PartitionSpecInvalidException {
+
+        List<String> partCols = GlueUtils.getColumnNames(glueTable.partitionKeys());
+        LOG.info(String.format("Partition Columns are : %s", String.join(", ", partCols)));
+        List<String> partitionValues =
+                getOrderedFullPartitionValues(
+                        partitionSpec,
+                        partCols,
+                        new ObjectPath(glueTable.databaseName(), glueTable.name()));
+
+        // validate partition values
+        for (int i = 0; i < partCols.size(); i++) {
+            if (isNullOrWhitespaceOnly(partitionValues.get(i))) {
+                throw new PartitionSpecInvalidException(
+                        catalogName,
+                        partCols,
+                        new ObjectPath(glueTable.databaseName(), glueTable.name()),
+                        partitionSpec);
+            }
+        }
+        StorageDescriptor.Builder sdBuilder = glueTable.storageDescriptor().toBuilder();
+        Map<String, String> partitionProperties = catalogPartition.getProperties();
+
+        sdBuilder.location(extractPartitionLocation(partitionProperties));
+        sdBuilder.parameters(partitionSpec.getPartitionSpec());
+        String comment = catalogPartition.getComment();
+        if (comment != null) {
+            partitionProperties.put(GlueCatalogConstants.COMMENT, comment);
+        }
+        PartitionInput.Builder partitionInput =
+                PartitionInput.builder()
+                        .parameters(partitionProperties)
+                        .lastAccessTime(Instant.now())
+                        .storageDescriptor(sdBuilder.build())
+                        .values(partitionValues);
+        CreatePartitionRequest createPartitionRequest =
+                CreatePartitionRequest.builder()
+                        .partitionInput(partitionInput.build())
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(glueTable.databaseName())
+                        .tableName(glueTable.name())
+                        .build();
+
+        try {
+            CreatePartitionResponse response = glueClient.createPartition(createPartitionRequest);
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Partition Created.");
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private String extractPartitionLocation(Map<String, String> properties) {
+        return properties.containsKey(GlueCatalogConstants.LOCATION_URI)
+                ? properties.remove(GlueCatalogConstants.LOCATION_URI)
+                : null;
+    }
+
+    /**
+     * Get a list of ordered partition values by re-arranging them based on the given list of
+     * partition keys. If the partition value is null, it'll be converted into default partition
+     * name.
+     *
+     * @param partitionSpec a partition spec.
+     * @param partitionKeys a list of partition keys.
+     * @param tablePath path of the table to which the partition belongs.
+     * @return A list of partition values ordered according to partitionKeys.
+     * @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have
+     *     different sizes, or any key in partitionKeys doesn't exist in partitionSpec.
+     */
+    private List<String> getOrderedFullPartitionValues(
+            CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath)
+            throws PartitionSpecInvalidException {
+        Map<String, String> spec = partitionSpec.getPartitionSpec();
+        if (spec.size() != partitionKeys.size()) {
+            throw new PartitionSpecInvalidException(
+                    catalogName, partitionKeys, tablePath, partitionSpec);
+        }
+
+        List<String> values = new ArrayList<>(spec.size());
+        for (String key : partitionKeys) {
+            if (!spec.containsKey(key)) {
+                throw new PartitionSpecInvalidException(
+                        catalogName, partitionKeys, tablePath, partitionSpec);
+            } else {
+                String value = spec.get(key);
+                if (value == null) {
+                    value = GlueCatalogConstants.DEFAULT_PARTITION_NAME;
+                }
+                values.add(value);
+            }
+        }
+
+        return values;
+    }
+
+    /**
+     * Update glue table.
+     *
+     * @param tablePath contains database name and table name.
+     * @param partitionSpec Existing partition information.
+     * @param newPartition Partition information with new changes.
+     * @throws CatalogException Exception in failure.
+     */
+    public void alterGluePartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition)
+            throws CatalogException {
+        // todo has to implement
+    }
+
+    /**
+     * Get CatalogPartitionSpec of all partitions from glue data catalog.
+     *
+     * @param tablePath fully qualified table path.
+     * @return List of PartitionSpec
+     */
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) {
+
+        GetPartitionsRequest.Builder request =
+                GetPartitionsRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName())
+                        .tableName(tablePath.getObjectName());
+        try {
+            GetPartitionsResponse response = glueClient.getPartitions(request.build());
+            GlueUtils.validateGlueResponse(response);
+            List<CatalogPartitionSpec> finalPartitionsList =
+                    response.partitions().stream()
+                            .map(this::getCatalogPartitionSpec)
+                            .collect(Collectors.toList());
+            String partitionsResultNextToken = response.nextToken();
+            if (Optional.ofNullable(partitionsResultNextToken).isPresent()) {
+                do {
+                    // creating a new GetPartitionsResult using next token.
+                    request.nextToken(partitionsResultNextToken);
+                    response = glueClient.getPartitions(request.build());
+                    finalPartitionsList.addAll(
+                            response.partitions().stream()
+                                    .map(this::getCatalogPartitionSpec)
+                                    .collect(Collectors.toList()));
+                    partitionsResultNextToken = response.nextToken();
+                } while (Optional.ofNullable(partitionsResultNextToken).isPresent());
+            }
+
+            return finalPartitionsList;
+
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drop partition in Glue data catalog.
+     *
+     * @param tablePath fully qualified table path
+     * @param partitionSpec partition spec details
+     * @throws CatalogException unknown errors
+     * @throws PartitionNotExistException partition doesnt exists
+     */
+    public void dropGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException, PartitionNotExistException {
+
+        try {
+            Table glueTable = getGlueTable(tablePath);
+            List<String> partCols = GlueUtils.getColumnNames(glueTable.partitionKeys());
+            DeletePartitionRequest deletePartitionRequest =
+                    DeletePartitionRequest.builder()
+                            .catalogId(getGlueCatalogId())
+                            .databaseName(tablePath.getDatabaseName())
+                            .tableName(tablePath.getObjectName())
+                            .partitionValues(
+                                    getOrderedFullPartitionValues(
+                                            partitionSpec, partCols, tablePath))
+                            .build();
+            DeletePartitionResponse response = glueClient.deletePartition(deletePartitionRequest);
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Partition Dropped.");
+        } catch (TableNotExistException | PartitionSpecInvalidException e) {
+            e.printStackTrace();
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * @param tablePath
+     * @param filters
+     * @return List of Partition Spec
+     */
+    public List<CatalogPartitionSpec> listGluePartitionsByFilter(
+            ObjectPath tablePath, List<Expression> filters) {
+        String expressionString =
+                filters.stream()
+                        .map(x -> getExpressionString(x, new StringBuilder()))
+                        .collect(
+                                Collectors.joining(
+                                        GlueCatalogConstants.SPACE
+                                                + GlueCatalogConstants.AND
+                                                + GlueCatalogConstants.SPACE));
+        try {
+            GetPartitionsRequest request =
+                    GetPartitionsRequest.builder()
+                            .databaseName(tablePath.getDatabaseName())
+                            .tableName(tablePath.getObjectName())
+                            .catalogId(getGlueCatalogId())
+                            .expression(expressionString)
+                            .build();
+            GetPartitionsResponse response = glueClient.getPartitions(request);
+            List<CatalogPartitionSpec> catalogPartitionSpecList =
+                    response.partitions().stream()
+                            .map(this::getCatalogPartitionSpec)
+                            .collect(Collectors.toList());
+            GlueUtils.validateGlueResponse(response);
+            String nextToken = response.nextToken();
+            if (Optional.ofNullable(nextToken).isPresent()) {
+                do {
+                    // creating a new GetPartitionsResult using next token.
+                    request =
+                            GetPartitionsRequest.builder()
+                                    .databaseName(tablePath.getDatabaseName())
+                                    .tableName(tablePath.getObjectName())
+                                    .catalogId(getGlueCatalogId())
+                                    .expression(expressionString)
+                                    .nextToken(nextToken)
+                                    .build();
+                    response = glueClient.getPartitions(request);
+                    catalogPartitionSpecList.addAll(
+                            response.partitions().stream()
+                                    .map(this::getCatalogPartitionSpec)
+                                    .collect(Collectors.toList()));
+                    nextToken = response.nextToken();
+                } while (Optional.ofNullable(nextToken).isPresent());
+            }
+            return catalogPartitionSpecList;
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private String getExpressionString(Expression expression, StringBuilder sb) {
+
+        for (Expression childExpression : expression.getChildren()) {
+            if (childExpression.getChildren() != null && childExpression.getChildren().size() > 0) {
+                getExpressionString(childExpression, sb);
+            }
+        }
+        return sb.insert(
+                        0,
+                        expression.asSummaryString()
+                                + GlueCatalogConstants.SPACE
+                                + GlueCatalogConstants.AND)
+                .toString();
+    }
+
+    public Partition getGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException {
+        try {
+            GetPartitionRequest request =
+                    GetPartitionRequest.builder()
+                            .catalogId(getGlueCatalogId())
+                            .databaseName(tablePath.getDatabaseName())
+                            .tableName(tablePath.getObjectName())
+                            .build();
+            GetPartitionResponse response = glueClient.getPartition(request);
+            GlueUtils.validateGlueResponse(response);
+            Partition partition = response.partition();
+            if (partition.hasValues()
+                    && GlueUtils.specSubset(
+                            partitionSpec.getPartitionSpec(),
+                            partition.storageDescriptor().parameters())) {
+                return partition;
+            }
+        } catch (EntityNotFoundException e) {
+            throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+        }
+        return null;
+    }
+
+    public boolean gluePartitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        GetPartitionRequest request =
+                GetPartitionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName())
+                        .tableName(tablePath.getObjectName())
+                        .build();
+        try {
+            GetPartitionResponse response = glueClient.getPartition(request);
+            GlueUtils.validateGlueResponse(response);
+            return response.partition()
+                    .parameters()
+                    .keySet()
+                    .containsAll(partitionSpec.getPartitionSpec().keySet());
+        } catch (EntityNotFoundException e) {
+            LOG.warn(String.format("%s is not found", partitionSpec.getPartitionSpec()));
+        } catch (GlueException e) {
+            throw new CatalogException(catalogName, e);
+        }
+        return false;
+    }
+
+    private String getGlueCatalogId() {
+        return awsConfig.getGlueCatalogId();
+    }
+
+    /**
+     * Rename glue table. Glue catalog don't support renaming table. For renaming in Flink, it has

Review Comment:
   Should we throw `UnsupportedOperationException` instead for now?
   Also Please add Jira to track TODO items.



##########
flink-catalog-aws/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/glue/GlueCatalogTest.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import software.amazon.awssdk.http.SdkHttpResponse;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** GlueCatalog Test. */
+public class GlueCatalogTest {

Review Comment:
   I would rather have separate tests for `GlueOperator` and `GlueCatalog`. 



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java:
##########
@@ -0,0 +1,365 @@
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.PrincipalType;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** Utilities related glue Operation. */
+public class GlueUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class);
+
+    /**
+     * Glue supports lowercase naming convention.
+     *
+     * @param name fully qualified name.
+     * @return modified name according to glue convention.
+     */
+    public static String getGlueConventionalName(String name) {
+        return name.toLowerCase(Locale.ROOT);
+    }
+
+    /**
+     * Extract location from database properties if present and remove location from properties.
+     * fallback to create default location if not present
+     *
+     * @param databaseProperties database properties.
+     * @param databaseName fully qualified name for database.
+     * @return location for database.
+     */
+    public static String extractDatabaseLocation(
+            final Map<String, String> databaseProperties,
+            final String databaseName,
+            final String catalogPath) {
+        if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+            return databaseProperties.remove(GlueCatalogConstants.LOCATION_URI);
+        } else {
+            LOG.info("No location URI Set. Using Catalog Path as default");
+            return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + databaseName;
+        }
+    }
+
+    /**
+     * Extract location from database properties if present and remove location from properties.
+     * fallback to create default location if not present
+     *
+     * @param tableProperties table properties.
+     * @param tablePath fully qualified object for table.
+     * @return location for table.
+     */
+    public static String extractTableLocation(
+            final Map<String, String> tableProperties,
+            final ObjectPath tablePath,
+            final String catalogPath) {
+        if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+            return tableProperties.remove(GlueCatalogConstants.LOCATION_URI);
+        } else {
+            return catalogPath
+                    + GlueCatalogConstants.LOCATION_SEPARATOR
+                    + tablePath.getDatabaseName()
+                    + GlueCatalogConstants.LOCATION_SEPARATOR
+                    + tablePath.getObjectName();
+        }
+    }
+
+    /**
+     * Build CatalogDatabase instance using information from glue Database instance.
+     *
+     * @param glueDatabase {@link Database }
+     * @return {@link CatalogDatabase } instance.
+     */
+    public static CatalogDatabase getCatalogDatabase(final Database glueDatabase) {
+        Map<String, String> properties = new HashMap<>(glueDatabase.parameters());
+        properties.put(GlueCatalogConstants.LOCATION_URI, glueDatabase.locationUri());
+        String comment = glueDatabase.description();
+        return new CatalogDatabaseImpl(properties, comment);
+    }
+
+    /**
+     * A Glue database name cannot be longer than 252 characters. The only acceptable characters are
+     * lowercase letters, numbers, and the underscore character. More details:
+     * https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html
+     *
+     * @param name name
+     */
+    public static void validate(String name) {
+        checkArgument(
+                name != null && GlueCatalogConstants.GLUE_DB_PATTERN.matcher(name).find(),
+                "Database name is not according to Glue Norms, "
+                        + "check here https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html");
+    }
+
+    /** validate response from client call. */
+    public static void validateGlueResponse(GlueResponse response) {
+        if (response != null && !response.sdkHttpResponse().isSuccessful()) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER);
+        }
+    }
+
+    /**
+     * @param udf Instance of UserDefinedFunction
+     * @return ClassName for function
+     */
+    public static String getCatalogFunctionClassName(final UserDefinedFunction udf) {
+        validateUDFClassName(udf.className());
+        return udf.functionName().split(GlueCatalogConstants.DEFAULT_SEPARATOR)[1];
+    }
+
+    private static void validateUDFClassName(final String name) {
+        checkArgument(!isNullOrWhitespaceOnly(name));
+        if (name.split(GlueCatalogConstants.DEFAULT_SEPARATOR).length != 3) {

Review Comment:
   nit: we can check against a pattern instead



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {
+
+    /**
+     * Instance of AwsProperties which holds the configs related to configure glue and aws setup.
+     */
+    private final AWSConfig awsConfig;
+
+    /** http client for glue client. Current implementation for client is sync type. */
+    private final GlueClient glueClient;
+
+    private final String catalogName;
+
+    public final String catalogPath;
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueOperator.class);
+
+    public GlueOperator(
+            String catalogName, String catalogPath, AWSConfig awsConfig, GlueClient glueClient) {
+        this.awsConfig = awsConfig;
+        this.glueClient = glueClient;
+        this.catalogPath = catalogPath;
+        this.catalogName = catalogName;
+    }
+
+    public void closeClient() {
+        glueClient.close();
+    }
+
+    // -------------- Database related operations.
+
+    /**
+     * List all databases present.
+     *
+     * @return List of fully qualified database names
+     */
+    public List<String> listGlueDatabases() throws CatalogException {
+        try {
+            GetDatabasesRequest.Builder databasesRequestBuilder =
+                    GetDatabasesRequest.builder().catalogId(getGlueCatalogId());
+
+            GetDatabasesResponse response =
+                    glueClient.getDatabases(databasesRequestBuilder.build());
+            List<String> glueDatabases =
+                    response.databaseList().stream()
+                            .map(Database::name)
+                            .collect(Collectors.toList());
+            String dbResultNextToken = response.nextToken();
+            if (Optional.ofNullable(dbResultNextToken).isPresent()) {

Review Comment:
   nit: can use a while loop



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1073 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.connector.aws.util.AwsClientFactories;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.glue.util.GlueUtils;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+@PublicEvolving
+public class GlueCatalog extends AbstractCatalog {
+
+    /** instance of GlueOperator to facilitate glue related actions. */
+    public GlueOperator glueOperator;
+
+    /** Default database name if not passed as part of catalog. */
+    public static final String DEFAULT_DB = "default";
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);
+
+    public GlueCatalog(String catalogName, String databaseName, ReadableConfig catalogConfig) {
+        super(catalogName, databaseName);
+        checkNotNull(catalogConfig, "Catalog config cannot be null.");
+        AWSConfig awsConfig = new AWSConfig(catalogConfig);
+        GlueClient glueClient = AwsClientFactories.factory(awsConfig).glue();
+        String catalogPath = catalogConfig.get(GlueCatalogOptions.PATH);
+        this.glueOperator = new GlueOperator(getName(), catalogPath, awsConfig, glueClient);
+    }
+
+    @VisibleForTesting
+    public GlueCatalog(String catalogName, String databaseName, GlueOperator glueOperator) {
+        super(catalogName, databaseName);
+        this.glueOperator = glueOperator;
+    }
+
+    /**
+     * Open the catalog. Used for any required preparation in initialization phase.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void open() throws CatalogException {}
+
+    /**
+     * Close the catalog when it is no longer needed and release any resource that it might be
+     * holding.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void close() throws CatalogException {
+        try {
+            glueOperator.closeClient();
+        } catch (Exception e) {
+            LOG.warn("Glue Client is not closed properly!");
+        }
+    }
+
+    // ------ databases ------
+
+    /**
+     * Create a database.
+     *
+     * @param name Name of the database to be created
+     * @param database The database definition
+     * @param ignoreIfExists Flag to specify behavior when a database with the given name already
+     *     exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do
+     *     nothing.
+     * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists
+     *     is false
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(name),
+                "Database name cannot be null or empty.");
+        checkNotNull(database, "Database cannot be null.");
+
+        name = GlueUtils.getGlueConventionalName(name);
+        if (databaseExists(name)) {
+            if (!ignoreIfExists) {
+                throw new DatabaseAlreadyExistException(getName(), name);
+            }
+        } else {
+            glueOperator.createGlueDatabase(name, database);
+        }
+    }
+
+    /**
+     * Drop a database.
+     *
+     * @param name Name of the database to be dropped.
+     * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @param cascade Flag to specify behavior when the database contains table or function: if set
+     *     to true, delete all tables and functions in the database and then delete the database, if
+     *     set to false, throw an exception.
+     * @throws DatabaseNotExistException if the given database does not exist
+     * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
+
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(name),
+                "Database name cannot be null or empty.");
+
+        name = GlueUtils.getGlueConventionalName(name);
+
+        if (databaseExists(name)) {
+            if (cascade) {
+                // delete all tables from database.
+                glueOperator.deleteTablesFromDatabase(name, listTables(name));
+                LOG.info("All Tables deleted from Database.");
+                // delete all functions from database.
+                glueOperator.deleteFunctionsFromDatabase(name, listFunctions(name));
+                LOG.info("All Functions deleted from Database.");
+            }
+
+            if (glueOperator.isDatabaseEmpty(name)) {
+                glueOperator.dropGlueDatabase(name);
+                LOG.info("Database Dropped.");
+            } else {
+                throw new DatabaseNotEmptyException(getName(), name);
+            }
+        } else if (!ignoreIfNotExists) {
+            throw new DatabaseNotExistException(getName(), name);
+        }
+    }
+
+    /**
+     * Modify an existing database.
+     *
+     * @param name Name of the database to be modified
+     * @param newDatabase The new database definition
+     * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if
+     *     set to false, throw an exception, if set to true, do nothing.
+     * @throws DatabaseNotExistException if the given database does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+
+        name = GlueUtils.getGlueConventionalName(name);
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(name),
+                "Database name cannot be null or empty.");
+        checkNotNull(newDatabase, "Database cannot be Empty");
+
+        CatalogDatabase existingDatabase = glueOperator.getDatabase(name);
+        if (existingDatabase != null) {
+            if (existingDatabase.getClass() != newDatabase.getClass()) {
+                throw new CatalogException(
+                        String.format(
+                                "Database types don't match. Existing database is '%s' and new database is '%s'.",
+                                existingDatabase.getClass().getName(),
+                                newDatabase.getClass().getName()));
+            }
+            glueOperator.updateGlueDatabase(name, newDatabase);
+            LOG.info("Database updated.");
+        } else if (!ignoreIfNotExists) {
+            throw new DatabaseNotExistException(getName(), name);
+        }
+    }
+
+    /**
+     * Get the names of all databases in this catalog.
+     *
+     * @return a list of the names of all databases
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return glueOperator.listGlueDatabases();
+    }
+
+    /**
+     * Get a database from this catalog.
+     *
+     * @param databaseName Name of the database
+     * @return The requested database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+
+        databaseName = GlueUtils.getGlueConventionalName(databaseName);
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(databaseName),
+                "Database name cannot be null or empty.");
+        return glueOperator.getDatabase(databaseName);
+    }
+
+    /**
+     * Check if a database exists in this catalog.
+     *
+     * @param databaseName Name of the database
+     * @return true if the given database exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException {
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(databaseName),
+                "Database name cannot be null or empty.");
+        try {
+            CatalogDatabase database = getDatabase(databaseName);
+            return database != null;
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+    }
+
+    // ------ tables ------
+
+    /**
+     * Creates a new table or view.
+     *
+     * <p>The framework will make sure to call this method with fully validated {@link
+     * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances are easy to serialize
+     * for a durable catalog implementation.
+     *
+     * @param tablePath path of the table or view to be created
+     * @param table the table definition
+     * @param ignoreIfExists flag to specify behavior when a table or view already exists at the
+     *     given path: if set to false, it throws a TableAlreadyExistException, if set to true, do
+     *     nothing.
+     * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
+     * @throws DatabaseNotExistException if the database in tablePath doesn't exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
+
+        checkNotNull(tablePath, "TablePath cannot be null");
+        checkNotNull(table, "Table can't be null.");
+
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
+        }
+
+        if (tableExists(tablePath)) {
+            if (!ignoreIfExists) {
+                throw new TableAlreadyExistException(getName(), tablePath);
+            }
+        } else {
+            glueOperator.createGlueTable(tablePath, table, false);
+            LOG.info("Table Created!");
+        }
+    }
+
+    /**
+     * Modifies an existing table or view. Note that the new and old {@link CatalogBaseTable} must
+     * be of the same kind. For example, this doesn't allow altering a regular table to partitioned
+     * table, or altering a view to a table, and vice versa.
+     *
+     * <p>The framework will make sure to call this method with fully validated {@link
+     * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances are easy to serialize
+     * for a durable catalog implementation.
+     *
+     * @param tablePath path of the table or view to be modified
+     * @param newTable the new table definition
+     * @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if
+     *     set to false, throw an exception, if set to true, do nothing.
+     * @throws TableNotExistException if the table does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void alterTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+
+        checkNotNull(tablePath, "TablePath can't be null");
+        checkNotNull(newTable, "Table can't be null.");
+
+        CatalogBaseTable existingTable = getTable(tablePath);
+
+        if (existingTable != null) {
+            if (existingTable.getTableKind() != newTable.getTableKind()) {
+                throw new CatalogException(
+                        String.format(
+                                "Table types don't match. Existing table is '%s' and new table is '%s'.",
+                                existingTable.getTableKind(), newTable.getTableKind()));
+            }
+            glueOperator.alterGlueTable(tablePath, newTable, false);
+        } else if (!ignoreIfNotExists) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+    }
+
+    // ------ tables and views ------
+
+    /**
+     * Drop a table or view.
+     *
+     * @param tablePath Path of the table or view to be dropped
+     * @param ignoreIfNotExists Flag to specify behavior when the table or view does not exist: if
+     *     set to false, throw an exception, if set to true, do nothing.
+     * @throws TableNotExistException if the table or view does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+
+        checkNotNull(tablePath, "TablePath can't be null");
+
+        if (tableExists(tablePath)) {
+            glueOperator.dropGlueTable(tablePath);
+            LOG.info("Table Dropped!");
+        } else if (!ignoreIfNotExists) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+    }
+
+    /**
+     * Rename an existing table or view.
+     *
+     * @param tablePath Path of the table or view to be renamed
+     * @param newTableName the new name of the table or view
+     * @param ignoreIfNotExists Flag to specify behavior when the table or view does not exist: if
+     *     set to false, throw an exception, if set to true, do nothing.
+     * @throws TableNotExistException if the table does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException, CatalogException {
+
+        checkNotNull(tablePath, "TablePath can't be null");
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(newTableName),
+                "Table name can't be null or empty.");
+
+        if (tableExists(tablePath)) {
+            ObjectPath newTablePath = new ObjectPath(tablePath.getDatabaseName(), newTableName);
+            if (tableExists(newTablePath)) {
+                throw new TableAlreadyExistException(getName(), newTablePath);
+            }
+            glueOperator.renameGlueTable(tablePath, newTablePath);
+        } else if (!ignoreIfNotExists) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+    }
+
+    /**
+     * Get names of all tables and views under this database. An empty list is returned if none
+     * exists.
+     *
+     * @param databaseName fully qualified database name.
+     * @return a list of the names of all tables and views in this database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(databaseName),
+                "Database name can't be null or empty.");
+
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        List<String> results =
+                glueOperator.getGlueTableList(
+                        databaseName, CatalogBaseTable.TableKind.TABLE.name());
+        // include all views as well.
+        results.addAll(listViews(databaseName));
+        return results;
+    }
+
+    /**
+     * Get names of all views under this database. An empty list is returned if none exists.
+     *
+     * @param databaseName the name of the given database
+     * @return a list of the names of all views in the given database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public List<String> listViews(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(databaseName),
+                "Database name can't be null or empty");
+
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        return glueOperator.getGlueTableList(databaseName, CatalogBaseTable.TableKind.VIEW.name());
+    }
+
+    /**
+     * Returns a {@link CatalogTable} or {@link CatalogView} identified by the given {@link
+     * ObjectPath}. The framework will resolve the metadata objects when necessary.
+     *
+     * @param tablePath Path of the table or view
+     * @return The requested table or view
+     * @throws TableNotExistException if the target does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+
+        checkNotNull(tablePath, "Table path can't be null");
+
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        return glueOperator.getCatalogBaseTableFromGlueTable(glueOperator.getGlueTable(tablePath));
+    }
+
+    /**
+     * Check if a table or view exists in this catalog.
+     *
+     * @param tablePath Path of the table or view
+     * @return true if the given table exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+
+        checkNotNull(tablePath, "Table path can't be null.");
+
+        return databaseExists(tablePath.getDatabaseName())
+                && glueOperator.glueTableExists(tablePath);
+    }
+
+    // ------ functions ------
+
+    /**
+     * Create a function. Function name should be handled in a case-insensitive way.
+     *
+     * @param path path of the function
+     * @param function the function to be created
+     * @param ignoreIfExists flag to specify behavior if a function with the given name already
+     *     exists: if set to false, it throws a FunctionAlreadyExistException, if set to true,
+     *     nothing happens.
+     * @throws FunctionAlreadyExistException if the function already exist
+     * @throws DatabaseNotExistException if the given database does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void createFunction(ObjectPath path, CatalogFunction function, boolean ignoreIfExists)
+            throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
+
+        checkNotNull(path, "Function path can't be null.");
+        checkNotNull(function, "Catalog Function can't be null.");
+
+        ObjectPath functionPath = normalize(path);
+        if (!databaseExists(functionPath.getDatabaseName())) {
+            throw new DatabaseNotExistException(getName(), functionPath.getDatabaseName());
+        }
+
+        if (!functionExists(functionPath)) {
+            glueOperator.createGlueFunction(functionPath, function);
+            LOG.info("Function Created.");
+        } else {
+            if (!ignoreIfExists) {
+                throw new FunctionAlreadyExistException(getName(), functionPath);
+            }
+        }
+    }
+
+    private ObjectPath normalize(ObjectPath path) {
+        return new ObjectPath(
+                path.getDatabaseName(), FunctionIdentifier.normalizeName(path.getObjectName()));
+    }
+
+    /**
+     * Modify an existing function. Function name should be handled in a case-insensitive way.
+     *
+     * @param path path of the function
+     * @param newFunction the function to be modified
+     * @param ignoreIfNotExists flag to specify behavior if the function does not exist: if set to
+     *     false, throw an exception if set to true, nothing happens
+     * @throws FunctionNotExistException if the function does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void alterFunction(
+            ObjectPath path, CatalogFunction newFunction, boolean ignoreIfNotExists)
+            throws FunctionNotExistException, CatalogException {
+        checkNotNull(path, "Function path can't be null.");
+        checkNotNull(newFunction, "Catalog Function can't be null.");
+
+        ObjectPath functionPath = normalize(path);
+
+        CatalogFunction existingFunction = getFunction(functionPath);
+
+        if (existingFunction != null) {
+            if (existingFunction.getClass() != newFunction.getClass()) {
+                throw new CatalogException(
+                        String.format(
+                                "Function types don't match. Existing function is '%s' and new function is '%s'.",
+                                existingFunction.getClass().getName(),
+                                newFunction.getClass().getName()));
+            }
+
+            glueOperator.alterGlueFunction(functionPath, newFunction);
+        } else if (!ignoreIfNotExists) {
+            throw new FunctionNotExistException(getName(), functionPath);
+        }
+    }
+
+    /**
+     * Drop a function. Function name should be handled in a case-insensitive way.
+     *
+     * @param path path of the function to be dropped
+     * @param ignoreIfNotExists flag to specify behavior if the function does not exist: if set to
+     *     false, throw an exception if set to true, nothing happens
+     * @throws FunctionNotExistException if the function does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void dropFunction(ObjectPath path, boolean ignoreIfNotExists)
+            throws FunctionNotExistException, CatalogException {
+
+        checkNotNull(path, "Function path can't be null.");
+
+        ObjectPath functionPath = normalize(path);
+
+        if (functionExists(functionPath)) {
+            glueOperator.dropGlueFunction(functionPath);
+            LOG.info("Function Dropped!");
+        } else if (!ignoreIfNotExists) {
+            throw new FunctionNotExistException(getName(), functionPath);
+        }
+    }
+
+    /**
+     * List the names of all functions in the given database. An empty list is returned if none is
+     * registered.
+     *
+     * @param databaseName name of the database.
+     * @return a list of the names of the functions in this database
+     * @throws DatabaseNotExistException if the database does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public List<String> listFunctions(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+
+        databaseName = GlueUtils.getGlueConventionalName(databaseName);
+
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(databaseName),
+                "Database name can't be null or empty.");
+
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        return glueOperator.listGlueFunctions(databaseName);
+    }
+
+    /**
+     * Get the function. Function name should be handled in a case-insensitive way.
+     *
+     * @param path path of the function
+     * @return the requested function
+     * @throws FunctionNotExistException if the function does not exist in the catalog
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public CatalogFunction getFunction(ObjectPath path)
+            throws FunctionNotExistException, CatalogException {
+
+        checkNotNull(path, "Function path can't be null.");
+
+        ObjectPath functionPath = normalize(path);
+
+        if (!functionExists(functionPath)) {
+            throw new FunctionNotExistException(getName(), functionPath);
+        } else {
+            return glueOperator.getGlueFunction(functionPath);
+        }
+    }
+
+    /**
+     * Check whether a function exists or not. Function name should be handled in a case-insensitive
+     * way.
+     *
+     * @param path path of the function
+     * @return true if the function exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public boolean functionExists(ObjectPath path) throws CatalogException {
+
+        checkNotNull(path, "Function path can't be null.");
+
+        ObjectPath functionPath = normalize(path);
+        return databaseExists(functionPath.getDatabaseName())
+                && glueOperator.glueFunctionExists(functionPath);
+    }
+
+    /**
+     * Create a partition.
+     *
+     * @param tablePath path of the table.
+     * @param partitionSpec partition spec of the partition
+     * @param partition the partition to add.
+     * @param ignoreIfExists flag to specify behavior if a table with the given name already exists:
+     *     if set to false, it throws a TableAlreadyExistException, if set to true, nothing happens.
+     * @throws TableNotExistException thrown if the target table does not exist
+     * @throws TableNotPartitionedException thrown if the target table is not partitioned
+     * @throws PartitionSpecInvalidException thrown if the given partition spec is invalid
+     * @throws PartitionAlreadyExistsException thrown if the target partition already exists
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void createPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition partition,
+            boolean ignoreIfExists)
+            throws TableNotExistException, TableNotPartitionedException,
+                    PartitionSpecInvalidException, PartitionAlreadyExistsException,
+                    CatalogException {
+
+        checkNotNull(tablePath, "Table path can't be null.");
+        checkNotNull(partitionSpec, "PartitionSpec can't be null.");
+        checkNotNull(partition, "Partition can't be null.");
+
+        Table glueTable = glueOperator.getGlueTable(tablePath);
+        glueOperator.ensurePartitionedTable(tablePath, glueTable);
+
+        if (!partitionExists(tablePath, partitionSpec)) {
+            glueOperator.createGluePartition(glueTable, partitionSpec, partition);
+            LOG.info("Partition Created!");
+        } else {
+            if (!ignoreIfExists) {
+                throw new PartitionAlreadyExistsException(getName(), tablePath, partitionSpec);
+            }
+        }
+    }
+
+    /**
+     * Get CatalogPartitionSpec of all partitions of the table.
+     *
+     * @param tablePath path of the table
+     * @return a list of CatalogPartitionSpec of the table
+     * @throws TableNotExistException thrown if the table does not exist in the catalog
+     * @throws TableNotPartitionedException thrown if the table is not partitioned
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+            throws TableNotExistException, TableNotPartitionedException, CatalogException {
+
+        checkNotNull(tablePath, "Table path can't be null");
+
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        if (isPartitionedTable(tablePath)) {
+            return glueOperator.listPartitions(tablePath);
+        }
+        throw new TableNotPartitionedException(getName(), tablePath);
+    }
+
+    private boolean isPartitionedTable(ObjectPath tablePath) {
+        CatalogBaseTable table;
+        try {
+            table = getTable(tablePath);
+            if (table instanceof CatalogTable) {
+                CatalogTable catalogTable = (CatalogTable) table;
+                return catalogTable.isPartitioned();
+            } else {
+                return false;
+            }
+        } catch (TableNotExistException e) {
+            return false;
+        }
+    }
+
+    /**
+     * Get CatalogPartitionSpec of all partitions that is under the given CatalogPartitionSpec in
+     * the table.
+     *
+     * @param tablePath path of the table
+     * @param partitionSpec the partition spec to list
+     * @return a list of CatalogPartitionSpec that is under the given CatalogPartitionSpec in the
+     *     table
+     * @throws TableNotExistException thrown if the table does not exist in the catalog
+     * @throws TableNotPartitionedException thrown if the table is not partitioned
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws TableNotExistException, TableNotPartitionedException,
+                    PartitionSpecInvalidException, CatalogException {
+
+        checkNotNull(tablePath, "Table path can't be null.");
+        checkNotNull(partitionSpec, "Patition spec can't be null.");
+
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        if (!isPartitionedTable(tablePath)) {
+            throw new TableNotPartitionedException(getName(), tablePath);
+        }
+        return glueOperator.listGluePartitions(tablePath, partitionSpec);
+    }
+
+    /**
+     * Get CatalogPartitionSpec of partitions by expression filters in the table.
+     *
+     * <p>NOTE: For FieldReferenceExpression, the field index is based on schema of this table
+     * instead of partition columns only.
+     *
+     * <p>The passed in predicates have been translated in conjunctive form.
+     *
+     * <p>If catalog does not support this interface at present, throw an {@link
+     * UnsupportedOperationException} directly. If the catalog does not have a valid filter, throw
+     * the {@link UnsupportedOperationException} directly. Planner will fallback to get all
+     * partitions and filter by itself.
+     *
+     * @param tablePath path of the table
+     * @param filters filters to push down filter to catalog
+     * @return a list of CatalogPartitionSpec that is under the given CatalogPartitionSpec in the
+     *     table
+     * @throws TableNotExistException thrown if the table does not exist in the catalog
+     * @throws TableNotPartitionedException thrown if the table is not partitioned
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(
+            ObjectPath tablePath, List<Expression> filters)
+            throws TableNotExistException, TableNotPartitionedException, CatalogException {
+        checkNotNull(tablePath, "Table path cannot be null");
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        if (!isPartitionedTable(tablePath)) {
+            throw new TableNotPartitionedException(getName(), tablePath);
+        }
+
+        return glueOperator.listGluePartitionsByFilter(tablePath, filters);
+    }
+
+    /**
+     * Get a partition of the given table. The given partition spec keys and values need to be
+     * matched exactly for a result.
+     *
+     * @param tablePath path of the table
+     * @param partitionSpec partition spec of partition to get
+     * @return the requested partition
+     * @throws PartitionNotExistException thrown if the partition doesn't exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+
+        checkNotNull(tablePath, "Table path can't be null.");
+        checkNotNull(partitionSpec, "CatalogPartitionSpec can't be null.");
+
+        Partition gluePartition = glueOperator.getGluePartition(tablePath, partitionSpec);
+
+        if (gluePartition == null) {
+            throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
+        }
+
+        Map<String, String> properties =
+                new HashMap<>(gluePartition.storageDescriptor().parameters());
+
+        properties.put(
+                GlueCatalogConstants.LOCATION_URI, gluePartition.storageDescriptor().location());
+
+        String comment = properties.remove(GlueCatalogConstants.COMMENT);
+        return new CatalogPartitionImpl(properties, comment);
+    }
+
+    /**
+     * Check whether a partition exists or not.
+     *
+     * @param tablePath path of the table
+     * @param partitionSpec partition spec of the partition to check
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        checkNotNull(tablePath, "Table path can't be null");
+
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            throw new CatalogException("Database doesn't exists.");
+        } else if (!tableExists(tablePath)) {
+            throw new CatalogException("Table doesn't exists.");
+        }
+        return glueOperator.gluePartitionExists(tablePath, partitionSpec);
+    }
+
+    /**
+     * Drop a partition.
+     *
+     * @param tablePath path of the table.
+     * @param partitionSpec partition spec of the partition to drop
+     * @param ignoreIfNotExists flag to specify behavior if the database does not exist: if set to
+     *     false, throw an exception, if set to true, nothing happens.
+     * @throws PartitionNotExistException thrown if the target partition does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void dropPartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+
+        checkNotNull(tablePath, "Table path can't be null.");
+        checkNotNull(partitionSpec, "PartitionSpec can't be null.");
+
+        if (partitionExists(tablePath, partitionSpec)) {
+            glueOperator.dropGluePartition(tablePath, partitionSpec);
+            LOG.info("Partition Dropped!");
+        } else if (!ignoreIfNotExists) {
+            throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
+        }
+    }
+
+    /**
+     * Alter a partition.
+     *
+     * @param tablePath path of the table
+     * @param partitionSpec partition spec of the partition
+     * @param newPartition new partition to replace the old one
+     * @param ignoreIfNotExists flag to specify behavior if the database does not exist: if set to
+     *     false, throw an exception, if set to true, nothing happens.
+     * @throws PartitionNotExistException thrown if the target partition does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void alterPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition newPartition,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+
+        checkNotNull(tablePath, "Table path can't be null.");
+        checkNotNull(partitionSpec, "CatalogPartitionSpec can't be null.");
+        checkNotNull(newPartition, "New partition can't be null.");
+
+        CatalogPartition existingPartition = getPartition(tablePath, partitionSpec);
+        if (existingPartition != null) {
+            glueOperator.alterGluePartition(tablePath, partitionSpec, newPartition);
+        } else if (!ignoreIfNotExists) {
+            throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
+        }
+    }
+
+    /**
+     * Get the statistics of a table.
+     *
+     * @param tablePath path of the table
+     * @return statistics of the given table
+     * @throws TableNotExistException if the table does not exist in the catalog
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        return null;

Review Comment:
   Should we return unknown instead? like in JDBC Catalog, can you verify it is nullable otherwise?
   https://github.com/apache/flink-connector-jdbc/blob/c0e3eaa08f5c7fbc26cfa104e38c0eb887b7e297/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java#L432



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {

Review Comment:
   nit: This class is a bit unreadable, it would be great if we can break it down.



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {
+
+    /**
+     * Instance of AwsProperties which holds the configs related to configure glue and aws setup.
+     */
+    private final AWSConfig awsConfig;
+
+    /** http client for glue client. Current implementation for client is sync type. */
+    private final GlueClient glueClient;
+
+    private final String catalogName;
+
+    public final String catalogPath;
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueOperator.class);
+
+    public GlueOperator(
+            String catalogName, String catalogPath, AWSConfig awsConfig, GlueClient glueClient) {
+        this.awsConfig = awsConfig;
+        this.glueClient = glueClient;
+        this.catalogPath = catalogPath;
+        this.catalogName = catalogName;
+    }
+
+    public void closeClient() {
+        glueClient.close();
+    }
+
+    // -------------- Database related operations.
+
+    /**
+     * List all databases present.
+     *
+     * @return List of fully qualified database names
+     */
+    public List<String> listGlueDatabases() throws CatalogException {
+        try {
+            GetDatabasesRequest.Builder databasesRequestBuilder =
+                    GetDatabasesRequest.builder().catalogId(getGlueCatalogId());
+
+            GetDatabasesResponse response =
+                    glueClient.getDatabases(databasesRequestBuilder.build());
+            List<String> glueDatabases =
+                    response.databaseList().stream()
+                            .map(Database::name)
+                            .collect(Collectors.toList());
+            String dbResultNextToken = response.nextToken();
+            if (Optional.ofNullable(dbResultNextToken).isPresent()) {
+                do {
+                    databasesRequestBuilder.nextToken(dbResultNextToken);
+                    response = glueClient.getDatabases(databasesRequestBuilder.build());
+                    glueDatabases.addAll(
+                            response.databaseList().stream()
+                                    .map(Database::name)
+                                    .collect(Collectors.toList()));
+                    dbResultNextToken = response.nextToken();
+                } while (Optional.ofNullable(dbResultNextToken).isPresent());
+            }
+            return glueDatabases;
+        } catch (GlueException e) {
+            throw new CatalogException(
+                    GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause());
+        }
+    }
+
+    /**
+     * Create database in glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @param database Instance of {@link CatalogDatabase}.
+     * @throws CatalogException when unknown error from glue servers.
+     * @throws DatabaseAlreadyExistException when database exists already in glue data catalog.
+     */
+    public void createGlueDatabase(String databaseName, CatalogDatabase database)
+            throws CatalogException, DatabaseAlreadyExistException {
+
+        GlueUtils.validate(databaseName);
+        Map<String, String> properties = new HashMap<>(database.getProperties());
+        String str =
+                properties.entrySet().stream()
+                        .map(e -> e.getKey() + ":" + e.getValue())
+                        .collect(Collectors.joining("|"));
+        LOG.info("Create DB MYLOG:- " + str);
+        DatabaseInput.Builder databaseInputBuilder =
+                DatabaseInput.builder()
+                        .name(databaseName)
+                        .description(database.getComment())
+                        .locationUri(
+                                GlueUtils.extractDatabaseLocation(
+                                        properties, databaseName, catalogPath))
+                        .parameters(properties);
+        CreateDatabaseRequest.Builder requestBuilder =
+                CreateDatabaseRequest.builder()
+                        .databaseInput(databaseInputBuilder.build())
+                        .catalogId(getGlueCatalogId());
+        LOG.info(
+                String.format(
+                        "Database Properties Listing :- %s",
+                        properties.entrySet().stream()
+                                .map(e -> e.getKey() + e.getValue())
+                                .collect(Collectors.joining(","))));
+        try {
+            CreateDatabaseResponse response = glueClient.createDatabase(requestBuilder.build());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(GlueUtils.getDebugLog(response));
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("%s Database created.", databaseName));
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseAlreadyExistException(catalogName, databaseName, e);
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Delete a database from Glue data catalog only when database is empty.
+     *
+     * @param databaseName fully qualified name of database
+     * @throws CatalogException Any Exception thrown due to glue error
+     * @throws DatabaseNotExistException when database doesn't exists in glue catalog.
+     */
+    public void dropGlueDatabase(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+
+        GlueUtils.validate(databaseName);
+        DeleteDatabaseRequest deleteDatabaseRequest =
+                DeleteDatabaseRequest.builder()
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            DeleteDatabaseResponse response = glueClient.deleteDatabase(deleteDatabaseRequest);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(GlueUtils.getDebugLog(response));
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Database Dropped %s", databaseName));
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        } catch (GlueException e) {
+            throw new CatalogException(catalogName, e);
+        }
+    }
+
+    /**
+     * Drops list of table in database from glue data catalog.
+     *
+     * @param databaseName fully qualified name of database
+     * @param tables List of tables to remove from database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void deleteTablesFromDatabase(String databaseName, Collection<String> tables)
+            throws CatalogException {
+        GlueUtils.validate(databaseName);
+        BatchDeleteTableRequest batchTableRequest =
+                BatchDeleteTableRequest.builder()
+                        .databaseName(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .tablesToDelete(tables)
+                        .build();
+
+        try {
+            BatchDeleteTableResponse response = glueClient.batchDeleteTable(batchTableRequest);
+            if (response.hasErrors()) {
+                String errorMsg =
+                        String.format(
+                                "Glue Table errors:- %s",
+                                response.errors().stream()
+                                        .map(
+                                                e ->
+                                                        "Table: "
+                                                                + e.tableName()
+                                                                + "\nErrorDetail:  "
+                                                                + e.errorDetail().errorMessage())
+                                        .collect(Collectors.joining("\n")));
+                LOG.error(errorMsg);
+                throw new CatalogException(errorMsg);
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Tables deleted.");
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drops list of user defined function in database from glue data catalog.
+     *
+     * @param databaseName fully qualified name of database
+     * @param functions List of tables to remove from database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void deleteFunctionsFromDatabase(String databaseName, Collection<String> functions)
+            throws CatalogException {
+        GlueUtils.validate(databaseName);
+        try {
+            DeleteUserDefinedFunctionRequest.Builder requestBuilder =
+                    DeleteUserDefinedFunctionRequest.builder()
+                            .databaseName(databaseName)
+                            .catalogId(getGlueCatalogId());
+            for (String functionName : functions) {
+                requestBuilder.functionName(functionName);
+                DeleteUserDefinedFunctionResponse response =
+                        glueClient.deleteUserDefinedFunction(requestBuilder.build());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(GlueUtils.getDebugLog(response));
+                }
+                GlueUtils.validateGlueResponse(response);
+                LOG.info(String.format("Dropped Function %s", functionName));
+            }
+
+        } catch (GlueException e) {
+            LOG.error(String.format("Error deleting functions in database: %s", databaseName));
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Check if database is empty. i.e. it should not contain 1. table 2. functions
+     *
+     * @param databaseName name of database.
+     * @return boolean True/False based on the content of database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public boolean isDatabaseEmpty(String databaseName) throws CatalogException {
+        checkArgument(!isNullOrWhitespaceOnly(databaseName));
+        GlueUtils.validate(databaseName);
+        GetTablesRequest tablesRequest =
+                GetTablesRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(databaseName)
+                        .maxResults(1)
+                        .build();
+        try {
+            GetTablesResponse response = glueClient.getTables(tablesRequest);
+            return response.sdkHttpResponse().isSuccessful()
+                    && response.tableList().size() == 0
+                    && listGlueFunctions(databaseName).size() == 0;
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get a database from this glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @return Instance of {@link CatalogDatabase } .
+     * @throws DatabaseNotExistException when database doesn't exists in Glue data catalog.
+     * @throws CatalogException when any unknown error occurs in glue.
+     */
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        GlueUtils.validate(databaseName);
+        GetDatabaseRequest getDatabaseRequest =
+                GetDatabaseRequest.builder()
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetDatabaseResponse response = glueClient.getDatabase(getDatabaseRequest);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER
+                                + ": existing database. Client call response :- "
+                                + response.sdkHttpResponse().statusText());
+            }
+            GlueUtils.validateGlueResponse(response);
+            return GlueUtils.getCatalogDatabase(response.database());
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    public void updateGlueDatabase(String databaseName, CatalogDatabase newDatabase)
+            throws CatalogException {
+
+        GlueUtils.validate(databaseName);
+        Map<String, String> properties = new HashMap<>(newDatabase.getProperties());
+        DatabaseInput.Builder databaseInputBuilder =
+                DatabaseInput.builder()
+                        .parameters(properties)
+                        .description(newDatabase.getComment())
+                        .name(databaseName)
+                        .locationUri(
+                                GlueUtils.extractDatabaseLocation(
+                                        properties, databaseName, catalogPath));
+
+        UpdateDatabaseRequest updateRequest =
+                UpdateDatabaseRequest.builder()
+                        .databaseInput(databaseInputBuilder.build())
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        UpdateDatabaseResponse response = glueClient.updateDatabase(updateRequest);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(GlueUtils.getDebugLog(response));
+        }
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Database Updated. %s", databaseName));
+    }
+
+    // -------------- Table related operations.
+
+    /**
+     * Create table in glue data catalog.
+     *
+     * @param tablePath Fully qualified name of table. {@link ObjectPath}
+     * @param table instance of {@link CatalogBaseTable} containing table related information.
+     * @param managedTable identifier if managed table.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void createGlueTable(
+            final ObjectPath tablePath, final CatalogBaseTable table, final boolean managedTable)
+            throws CatalogException {
+
+        checkNotNull(table, "Table cannot be Null");
+        checkNotNull(tablePath, "Table Path cannot be Null");
+        final Map<String, String> properties = new HashMap<>(table.getOptions());
+        String tableOwner = GlueUtils.extractTableOwner(properties);
+        if (managedTable) {
+            properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER);
+        }
+        Set<Column> glueColumns = GlueUtils.getGlueColumnsFromCatalogTable(table);
+        String tableLocation = "";
+        if (properties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+            tableLocation = properties.remove(GlueCatalogConstants.LOCATION_URI);
+        } else {
+            try {
+                tableLocation =
+                        getDatabase(tablePath.getDatabaseName())
+                                        .getProperties()
+                                        .get(GlueCatalogConstants.LOCATION_URI)
+                                + GlueCatalogConstants.LOCATION_SEPARATOR
+                                + tablePath.getObjectName();
+            } catch (DatabaseNotExistException e) {
+                LOG.warn("Database doesn't Exists");
+            }
+        }
+
+        StorageDescriptor.Builder storageDescriptorBuilder =
+                StorageDescriptor.builder()
+                        .inputFormat(GlueUtils.extractInputFormat(properties))
+                        .outputFormat(GlueUtils.extractOutputFormat(properties))
+                        .location(tableLocation);
+
+        TableInput.Builder tableInputBuilder =
+                TableInput.builder()
+                        .name(tablePath.getObjectName())
+                        .description(table.getComment())
+                        .tableType(table.getTableKind().name())
+                        .lastAccessTime(Instant.now())
+                        .owner(tableOwner);
+
+        if (table.getTableKind().name().equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name())) {
+            tableInputBuilder.viewExpandedText(GlueUtils.getExpandedQuery(table));
+            tableInputBuilder.viewOriginalText(GlueUtils.getOriginalQuery(table));
+        }
+
+        CreateTableRequest.Builder requestBuilder =
+                CreateTableRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName());
+
+        if (table instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) table;
+            if (catalogTable.isPartitioned()) {
+                LOG.info("Catalog table is partitioned");
+                Collection<Column> partitionKeys =
+                        GlueUtils.getPartitionKeys(catalogTable, glueColumns);
+                LOG.info(
+                        "Partition columns are -> "
+                                + partitionKeys.stream()
+                                        .map(Column::name)
+                                        .collect(Collectors.joining(",")));
+                tableInputBuilder.partitionKeys(partitionKeys);
+            }
+        }
+
+        try {
+            storageDescriptorBuilder.columns(glueColumns);
+            tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+            tableInputBuilder.parameters(properties);
+            requestBuilder.tableInput(tableInputBuilder.build());
+            CreateTableResponse response = glueClient.createTable(requestBuilder.build());
+            LOG.debug(GlueUtils.getDebugLog(response));
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Table created. %s", tablePath.getFullName()));
+
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * @param tablePath fully Qualified table path.
+     * @param newTable instance of {@link CatalogBaseTable} containing information for table.
+     * @param managedTable identifier for managed table.
+     * @throws CatalogException Glue related exception.
+     */
+    public void alterGlueTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean managedTable)
+            throws CatalogException {
+
+        Map<String, String> properties = new HashMap<>(newTable.getOptions());
+        String tableOwner = GlueUtils.extractTableOwner(properties);
+
+        if (managedTable) {
+            properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER);
+        }
+
+        Set<Column> glueColumns = GlueUtils.getGlueColumnsFromCatalogTable(newTable);
+
+        // create StorageDescriptor for table
+        StorageDescriptor.Builder storageDescriptorBuilder =
+                StorageDescriptor.builder()
+                        .inputFormat(GlueUtils.extractInputFormat(properties))
+                        .outputFormat(GlueUtils.extractOutputFormat(properties))
+                        .location(
+                                GlueUtils.extractTableLocation(properties, tablePath, catalogPath))
+                        .parameters(properties)
+                        .columns(glueColumns);
+
+        // create TableInput Builder with available information.
+        TableInput.Builder tableInputBuilder =
+                TableInput.builder()
+                        .name(tablePath.getObjectName())
+                        .description(newTable.getComment())
+                        .tableType(newTable.getTableKind().name())
+                        .lastAccessTime(Instant.now())
+                        .owner(tableOwner);
+
+        UpdateTableRequest.Builder requestBuilder =
+                UpdateTableRequest.builder()
+                        .tableInput(tableInputBuilder.build())
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName());
+
+        if (newTable instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) newTable;
+            if (catalogTable.isPartitioned()) {
+                tableInputBuilder.partitionKeys(
+                        GlueUtils.getPartitionKeys(catalogTable, glueColumns));
+            }
+        }
+
+        // apply storage descriptor and tableInput for request
+        tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+        requestBuilder.tableInput(tableInputBuilder.build());
+
+        try {
+            UpdateTableResponse response = glueClient.updateTable(requestBuilder.build());
+            LOG.debug(GlueUtils.getDebugLog(response));
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Table updated. %s", tablePath.getFullName()));
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get names of all tables or views under this database based on type identifier. An empty list
+     * is returned if none exists.
+     *
+     * @param databaseName fully qualified database name.
+     * @return a list of the names of all tables or views in this database based on type identifier.
+     * @throws CatalogException in case of any runtime exception
+     */
+    public List<String> getGlueTableList(String databaseName, String type) throws CatalogException {
+        GetTablesRequest.Builder tablesRequestBuilder =
+                GetTablesRequest.builder().databaseName(databaseName).catalogId(getGlueCatalogId());
+        GetTablesResponse response = glueClient.getTables(tablesRequestBuilder.build());
+        GlueUtils.validateGlueResponse(response);
+        List<String> finalTableList =
+                response.tableList().stream()
+                        .filter(table -> table.tableType().equalsIgnoreCase(type))
+                        .map(Table::name)
+                        .collect(Collectors.toList());
+        String tableResultNextToken = response.nextToken();
+
+        if (Optional.ofNullable(tableResultNextToken).isPresent()) {
+            do {
+                // update token in requestBuilder to fetch next batch
+                tablesRequestBuilder.nextToken(tableResultNextToken);
+                response = glueClient.getTables(tablesRequestBuilder.build());
+                GlueUtils.validateGlueResponse(response);
+                finalTableList.addAll(
+                        response.tableList().stream()
+                                .filter(table -> table.tableType().equalsIgnoreCase(type))
+                                .map(Table::name)
+                                .collect(Collectors.toList()));
+                tableResultNextToken = response.nextToken();
+            } while (Optional.ofNullable(tableResultNextToken).isPresent());
+        }
+        return finalTableList;
+    }
+
+    /**
+     * Returns a {@link Table} identified by the given table Path. {@link ObjectPath}.
+     *
+     * @param tablePath Path of the table or view
+     * @return The requested table. Glue encapsulates whether table or view in its attribute called
+     *     type.
+     * @throws TableNotExistException if the target does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    public Table getGlueTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        GetTableRequest tablesRequest =
+                GetTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetTableResponse response = glueClient.getTable(tablesRequest);
+            LOG.info(String.format("Glue table Found %s", response.table().name()));
+            GlueUtils.validateGlueResponse(response);
+            return response.table();
+        } catch (EntityNotFoundException e) {
+            throw new TableNotExistException(catalogName, tablePath, e);
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Check if a table or view exists in glue data catalog.
+     *
+     * @param tablePath Path of the table or view
+     * @return true if the given table exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    public boolean glueTableExists(ObjectPath tablePath) throws CatalogException {
+        try {
+            Table glueTable = getGlueTable(tablePath);
+            return glueTable != null && glueTable.name().equals(tablePath.getObjectName());
+        } catch (TableNotExistException e) {
+            LOG.warn(
+                    String.format(
+                            "%s\nDatabase: %s Table: %s",
+                            GlueCatalogConstants.TABLE_NOT_EXISTS_IDENTIFIER,
+                            tablePath.getDatabaseName(),
+                            tablePath.getObjectName()));
+            return false;
+        } catch (CatalogException e) {
+            LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause());
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drop a table or view from glue data catalog.
+     *
+     * @param tablePath fully qualified table path
+     * @throws CatalogException on runtime errors.
+     */
+    public void dropGlueTable(ObjectPath tablePath) throws CatalogException {
+        DeleteTableRequest.Builder tableRequestBuilder =
+                DeleteTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId());
+        try {
+            DeleteTableResponse response = glueClient.deleteTable(tableRequestBuilder.build());
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Dropped Table %s.", tablePath.getObjectName()));
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    // -------------- Function related operations.
+
+    /**
+     * Create a function. Function name should be handled in a case-insensitive way.
+     *
+     * @param functionPath path of the function
+     * @param function Flink function to be created
+     * @throws CatalogException in case of any runtime exception
+     */
+    public void createGlueFunction(ObjectPath functionPath, CatalogFunction function)
+            throws CatalogException, FunctionAlreadyExistException {
+
+        UserDefinedFunctionInput functionInput =
+                GlueUtils.createFunctionInput(functionPath, function);
+        CreateUserDefinedFunctionRequest.Builder requestBuilder =
+                CreateUserDefinedFunctionRequest.builder()
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .functionInput(functionInput);
+        try {
+            CreateUserDefinedFunctionResponse response =
+                    glueClient.createUserDefinedFunction(requestBuilder.build());
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Function created. %s", functionPath.getFullName()));
+        } catch (AlreadyExistsException e) {
+            LOG.error(
+                    String.format(
+                            "%s.%s already Exists. Function language of type: %s",
+                            functionPath.getDatabaseName(),
+                            functionPath.getObjectName(),
+                            function.getFunctionLanguage()));
+            throw new FunctionAlreadyExistException(catalogName, functionPath);
+        } catch (GlueException e) {
+            LOG.error("Error creating glue function.", e.getCause());
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get the user defined function from glue Catalog. Function name should be handled in a
+     * case-insensitive way.
+     *
+     * @param functionPath path of the function
+     * @return the requested function
+     * @throws CatalogException in case of any runtime exception
+     */
+    public CatalogFunction getGlueFunction(ObjectPath functionPath) {
+        GetUserDefinedFunctionRequest request =
+                GetUserDefinedFunctionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(functionPath.getDatabaseName())
+                        .functionName(functionPath.getObjectName())
+                        .build();
+        GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request);
+        GlueUtils.validateGlueResponse(response);
+        UserDefinedFunction udf = response.userDefinedFunction();
+
+        List<org.apache.flink.table.resource.ResourceUri> resourceUris = new LinkedList<>();
+        for (ResourceUri resourceUri : udf.resourceUris()) {
+            resourceUris.add(
+                    new org.apache.flink.table.resource.ResourceUri(
+                            ResourceType.valueOf(resourceUri.resourceType().name()),
+                            resourceUri.uri()));
+        }
+
+        return new CatalogFunctionImpl(
+                GlueUtils.getCatalogFunctionClassName(udf),
+                GlueUtils.getFunctionalLanguage(udf),
+                resourceUris);
+    }
+
+    public List<String> listGlueFunctions(String databaseName) {
+        GetUserDefinedFunctionsRequest.Builder functionsRequest =
+                GetUserDefinedFunctionsRequest.builder()
+                        .databaseName(databaseName)
+                        .catalogId(getGlueCatalogId());
+
+        List<String> glueFunctions;
+        try {
+            GetUserDefinedFunctionsResponse functionsResponse =
+                    glueClient.getUserDefinedFunctions(functionsRequest.build());
+            String token = functionsResponse.nextToken();
+            glueFunctions =
+                    functionsResponse.userDefinedFunctions().stream()
+                            .map(UserDefinedFunction::functionName)
+                            .collect(Collectors.toCollection(LinkedList::new));
+            if (Optional.ofNullable(token).isPresent()) {
+                do {
+                    functionsRequest.nextToken(token);
+                    functionsResponse =
+                            glueClient.getUserDefinedFunctions(functionsRequest.build());
+                    glueFunctions.addAll(
+                            functionsResponse.userDefinedFunctions().stream()
+                                    .map(UserDefinedFunction::functionName)
+                                    .collect(Collectors.toCollection(LinkedList::new)));
+                    token = functionsResponse.nextToken();
+                } while (Optional.ofNullable(token).isPresent());
+            }
+
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+        return glueFunctions;
+    }
+
+    public boolean glueFunctionExists(ObjectPath functionPath) {
+        GetUserDefinedFunctionRequest request =
+                GetUserDefinedFunctionRequest.builder()
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .build();
+
+        try {
+            GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request);
+            GlueUtils.validateGlueResponse(response);
+            return response.userDefinedFunction()
+                    .functionName()
+                    .equalsIgnoreCase(functionPath.getObjectName());
+        } catch (EntityNotFoundException e) {
+            LOG.warn(
+                    String.format(
+                            "Entry not found for function %s.%s",
+                            functionPath.getObjectName(), functionPath.getDatabaseName()));
+            return false;
+        } catch (GlueException e) {
+            LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Modify an existing function. Function name should be handled in a case-insensitive way.
+     *
+     * @param functionPath path of function.
+     * @param newFunction modified function.
+     * @throws CatalogException on runtime errors.
+     */
+    public void alterGlueFunction(ObjectPath functionPath, CatalogFunction newFunction)
+            throws CatalogException {
+        UserDefinedFunctionInput functionInput =
+                GlueUtils.createFunctionInput(functionPath, newFunction);
+        UpdateUserDefinedFunctionRequest updateUserDefinedFunctionRequest =
+                UpdateUserDefinedFunctionRequest.builder()
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .functionInput(functionInput)
+                        .build();
+        UpdateUserDefinedFunctionResponse response =
+                glueClient.updateUserDefinedFunction(updateUserDefinedFunctionRequest);
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Function altered. %s", functionPath.getFullName()));
+    }
+
+    /**
+     * @param functionPath fully qualified function path
+     * @throws CatalogException
+     */
+    public void dropGlueFunction(ObjectPath functionPath) throws CatalogException {
+        DeleteUserDefinedFunctionRequest request =
+                DeleteUserDefinedFunctionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .build();
+        DeleteUserDefinedFunctionResponse response = glueClient.deleteUserDefinedFunction(request);
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Dropped Function. %s", functionPath.getFullName()));
+    }
+
+    // -------------- Partition related operations.
+
+    public void ensurePartitionedTable(ObjectPath tablePath, Table glueTable)
+            throws TableNotPartitionedException {
+        if (!glueTable.hasPartitionKeys()) {
+            throw new TableNotPartitionedException(catalogName, tablePath);
+        }
+    }
+
+    /**
+     * create partition in glue data catalog.
+     *
+     * @param glueTable glue table
+     * @param partitionSpec partition spec
+     * @param catalogPartition partition to add
+     */
+    public void createGluePartition(
+            Table glueTable, CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition)
+            throws CatalogException, PartitionSpecInvalidException {
+
+        List<String> partCols = GlueUtils.getColumnNames(glueTable.partitionKeys());
+        LOG.info(String.format("Partition Columns are : %s", String.join(", ", partCols)));
+        List<String> partitionValues =
+                getOrderedFullPartitionValues(
+                        partitionSpec,
+                        partCols,
+                        new ObjectPath(glueTable.databaseName(), glueTable.name()));
+
+        // validate partition values
+        for (int i = 0; i < partCols.size(); i++) {
+            if (isNullOrWhitespaceOnly(partitionValues.get(i))) {
+                throw new PartitionSpecInvalidException(
+                        catalogName,
+                        partCols,
+                        new ObjectPath(glueTable.databaseName(), glueTable.name()),
+                        partitionSpec);
+            }
+        }
+        StorageDescriptor.Builder sdBuilder = glueTable.storageDescriptor().toBuilder();
+        Map<String, String> partitionProperties = catalogPartition.getProperties();
+
+        sdBuilder.location(extractPartitionLocation(partitionProperties));
+        sdBuilder.parameters(partitionSpec.getPartitionSpec());
+        String comment = catalogPartition.getComment();
+        if (comment != null) {
+            partitionProperties.put(GlueCatalogConstants.COMMENT, comment);
+        }
+        PartitionInput.Builder partitionInput =
+                PartitionInput.builder()
+                        .parameters(partitionProperties)
+                        .lastAccessTime(Instant.now())
+                        .storageDescriptor(sdBuilder.build())
+                        .values(partitionValues);
+        CreatePartitionRequest createPartitionRequest =
+                CreatePartitionRequest.builder()
+                        .partitionInput(partitionInput.build())
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(glueTable.databaseName())
+                        .tableName(glueTable.name())
+                        .build();
+
+        try {
+            CreatePartitionResponse response = glueClient.createPartition(createPartitionRequest);
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Partition Created.");
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private String extractPartitionLocation(Map<String, String> properties) {
+        return properties.containsKey(GlueCatalogConstants.LOCATION_URI)
+                ? properties.remove(GlueCatalogConstants.LOCATION_URI)
+                : null;
+    }
+
+    /**
+     * Get a list of ordered partition values by re-arranging them based on the given list of
+     * partition keys. If the partition value is null, it'll be converted into default partition
+     * name.
+     *
+     * @param partitionSpec a partition spec.
+     * @param partitionKeys a list of partition keys.
+     * @param tablePath path of the table to which the partition belongs.
+     * @return A list of partition values ordered according to partitionKeys.
+     * @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have
+     *     different sizes, or any key in partitionKeys doesn't exist in partitionSpec.
+     */
+    private List<String> getOrderedFullPartitionValues(
+            CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath)
+            throws PartitionSpecInvalidException {
+        Map<String, String> spec = partitionSpec.getPartitionSpec();
+        if (spec.size() != partitionKeys.size()) {
+            throw new PartitionSpecInvalidException(
+                    catalogName, partitionKeys, tablePath, partitionSpec);
+        }
+
+        List<String> values = new ArrayList<>(spec.size());
+        for (String key : partitionKeys) {
+            if (!spec.containsKey(key)) {
+                throw new PartitionSpecInvalidException(
+                        catalogName, partitionKeys, tablePath, partitionSpec);
+            } else {
+                String value = spec.get(key);
+                if (value == null) {
+                    value = GlueCatalogConstants.DEFAULT_PARTITION_NAME;
+                }
+                values.add(value);
+            }
+        }
+
+        return values;
+    }
+
+    /**
+     * Update glue table.
+     *
+     * @param tablePath contains database name and table name.
+     * @param partitionSpec Existing partition information.
+     * @param newPartition Partition information with new changes.
+     * @throws CatalogException Exception in failure.
+     */
+    public void alterGluePartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition)
+            throws CatalogException {
+        // todo has to implement

Review Comment:
   Can we raise a follow up please?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org