You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Samrat002 (via GitHub)" <gi...@apache.org> on 2023/04/24 17:16:15 UTC

[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

Samrat002 commented on code in PR #47:
URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1175579217


##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.connector.aws.config.AWSConfig;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {
+
+    /**
+     * Instance of AwsProperties which holds the configs related to configure glue and aws setup.
+     */
+    private final AWSConfig awsConfig;
+
+    /** http client for glue client. Current implementation for client is sync type. */
+    private final GlueClient glueClient;
+
+    private final String catalogName;
+
+    public final String catalogPath;
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueOperator.class);
+
+    public GlueOperator(
+            String catalogName, String catalogPath, AWSConfig awsConfig, GlueClient glueClient) {
+        this.awsConfig = awsConfig;
+        this.glueClient = glueClient;
+        this.catalogPath = catalogPath;
+        this.catalogName = catalogName;
+    }
+
+    public void closeClient() {
+        glueClient.close();
+    }
+
+    // -------------- Database related operations.
+
+    /**
+     * List all databases present.
+     *
+     * @return List of fully qualified database names
+     */
+    public List<String> listGlueDatabases() throws CatalogException {
+        try {
+            GetDatabasesRequest.Builder databasesRequestBuilder =
+                    GetDatabasesRequest.builder().catalogId(getGlueCatalogId());
+
+            GetDatabasesResponse response =
+                    glueClient.getDatabases(databasesRequestBuilder.build());
+            List<String> glueDatabases =
+                    response.databaseList().stream()
+                            .map(Database::name)
+                            .collect(Collectors.toList());
+            String dbResultNextToken = response.nextToken();
+            if (Optional.ofNullable(dbResultNextToken).isPresent()) {
+                do {
+                    databasesRequestBuilder.nextToken(dbResultNextToken);
+                    response = glueClient.getDatabases(databasesRequestBuilder.build());
+                    glueDatabases.addAll(
+                            response.databaseList().stream()
+                                    .map(Database::name)
+                                    .collect(Collectors.toList()));
+                    dbResultNextToken = response.nextToken();
+                } while (Optional.ofNullable(dbResultNextToken).isPresent());
+            }
+            return glueDatabases;
+        } catch (GlueException e) {
+            throw new CatalogException(
+                    GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause());
+        }
+    }
+
+    /**
+     * Create database in glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @param database Instance of {@link CatalogDatabase}.
+     * @throws CatalogException when unknown error from glue servers.
+     * @throws DatabaseAlreadyExistException when database exists already in glue data catalog.
+     */
+    public void createGlueDatabase(String databaseName, CatalogDatabase database)
+            throws CatalogException, DatabaseAlreadyExistException {
+
+        GlueUtils.validate(databaseName);
+        Map<String, String> properties = new HashMap<>(database.getProperties());
+        String str =
+                properties.entrySet().stream()
+                        .map(e -> e.getKey() + ":" + e.getValue())
+                        .collect(Collectors.joining("|"));
+        LOG.info("Create DB MYLOG:- " + str);
+        DatabaseInput.Builder databaseInputBuilder =
+                DatabaseInput.builder()
+                        .name(databaseName)
+                        .description(database.getComment())
+                        .locationUri(
+                                GlueUtils.extractDatabaseLocation(
+                                        properties, databaseName, catalogPath))
+                        .parameters(properties);
+        CreateDatabaseRequest.Builder requestBuilder =
+                CreateDatabaseRequest.builder()
+                        .databaseInput(databaseInputBuilder.build())
+                        .catalogId(getGlueCatalogId());
+        LOG.info(
+                String.format(
+                        "Database Properties Listing :- %s",
+                        properties.entrySet().stream()
+                                .map(e -> e.getKey() + e.getValue())
+                                .collect(Collectors.joining(","))));
+        try {
+            CreateDatabaseResponse response = glueClient.createDatabase(requestBuilder.build());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(GlueUtils.getDebugLog(response));
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("%s Database created.", databaseName));
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseAlreadyExistException(catalogName, databaseName, e);
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Delete a database from Glue data catalog only when database is empty.
+     *
+     * @param databaseName fully qualified name of database
+     * @throws CatalogException Any Exception thrown due to glue error
+     * @throws DatabaseNotExistException when database doesn't exists in glue catalog.
+     */
+    public void dropGlueDatabase(String databaseName)
+            throws CatalogException, DatabaseNotExistException {
+
+        GlueUtils.validate(databaseName);
+        DeleteDatabaseRequest deleteDatabaseRequest =
+                DeleteDatabaseRequest.builder()
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            DeleteDatabaseResponse response = glueClient.deleteDatabase(deleteDatabaseRequest);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(GlueUtils.getDebugLog(response));
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Database Dropped %s", databaseName));
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        } catch (GlueException e) {
+            throw new CatalogException(catalogName, e);
+        }
+    }
+
+    /**
+     * Drops list of table in database from glue data catalog.
+     *
+     * @param databaseName fully qualified name of database
+     * @param tables List of tables to remove from database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void deleteTablesFromDatabase(String databaseName, Collection<String> tables)
+            throws CatalogException {
+        GlueUtils.validate(databaseName);
+        BatchDeleteTableRequest batchTableRequest =
+                BatchDeleteTableRequest.builder()
+                        .databaseName(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .tablesToDelete(tables)
+                        .build();
+
+        try {
+            BatchDeleteTableResponse response = glueClient.batchDeleteTable(batchTableRequest);
+            if (response.hasErrors()) {
+                String errorMsg =
+                        String.format(
+                                "Glue Table errors:- %s",
+                                response.errors().stream()
+                                        .map(
+                                                e ->
+                                                        "Table: "
+                                                                + e.tableName()
+                                                                + "\nErrorDetail:  "
+                                                                + e.errorDetail().errorMessage())
+                                        .collect(Collectors.joining("\n")));
+                LOG.error(errorMsg);
+                throw new CatalogException(errorMsg);
+            }
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Tables deleted.");
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drops list of user defined function in database from glue data catalog.
+     *
+     * @param databaseName fully qualified name of database
+     * @param functions List of tables to remove from database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void deleteFunctionsFromDatabase(String databaseName, Collection<String> functions)
+            throws CatalogException {
+        GlueUtils.validate(databaseName);
+        try {
+            DeleteUserDefinedFunctionRequest.Builder requestBuilder =
+                    DeleteUserDefinedFunctionRequest.builder()
+                            .databaseName(databaseName)
+                            .catalogId(getGlueCatalogId());
+            for (String functionName : functions) {
+                requestBuilder.functionName(functionName);
+                DeleteUserDefinedFunctionResponse response =
+                        glueClient.deleteUserDefinedFunction(requestBuilder.build());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(GlueUtils.getDebugLog(response));
+                }
+                GlueUtils.validateGlueResponse(response);
+                LOG.info(String.format("Dropped Function %s", functionName));
+            }
+
+        } catch (GlueException e) {
+            LOG.error(String.format("Error deleting functions in database: %s", databaseName));
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Check if database is empty. i.e. it should not contain 1. table 2. functions
+     *
+     * @param databaseName name of database.
+     * @return boolean True/False based on the content of database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public boolean isDatabaseEmpty(String databaseName) throws CatalogException {
+        checkArgument(!isNullOrWhitespaceOnly(databaseName));
+        GlueUtils.validate(databaseName);
+        GetTablesRequest tablesRequest =
+                GetTablesRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(databaseName)
+                        .maxResults(1)
+                        .build();
+        try {
+            GetTablesResponse response = glueClient.getTables(tablesRequest);
+            return response.sdkHttpResponse().isSuccessful()
+                    && response.tableList().size() == 0
+                    && listGlueFunctions(databaseName).size() == 0;
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get a database from this glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @return Instance of {@link CatalogDatabase } .
+     * @throws DatabaseNotExistException when database doesn't exists in Glue data catalog.
+     * @throws CatalogException when any unknown error occurs in glue.
+     */
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        GlueUtils.validate(databaseName);
+        GetDatabaseRequest getDatabaseRequest =
+                GetDatabaseRequest.builder()
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetDatabaseResponse response = glueClient.getDatabase(getDatabaseRequest);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER
+                                + ": existing database. Client call response :- "
+                                + response.sdkHttpResponse().statusText());
+            }
+            GlueUtils.validateGlueResponse(response);
+            return GlueUtils.getCatalogDatabase(response.database());
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    public void updateGlueDatabase(String databaseName, CatalogDatabase newDatabase)
+            throws CatalogException {
+
+        GlueUtils.validate(databaseName);
+        Map<String, String> properties = new HashMap<>(newDatabase.getProperties());
+        DatabaseInput.Builder databaseInputBuilder =
+                DatabaseInput.builder()
+                        .parameters(properties)
+                        .description(newDatabase.getComment())
+                        .name(databaseName)
+                        .locationUri(
+                                GlueUtils.extractDatabaseLocation(
+                                        properties, databaseName, catalogPath));
+
+        UpdateDatabaseRequest updateRequest =
+                UpdateDatabaseRequest.builder()
+                        .databaseInput(databaseInputBuilder.build())
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        UpdateDatabaseResponse response = glueClient.updateDatabase(updateRequest);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(GlueUtils.getDebugLog(response));
+        }
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Database Updated. %s", databaseName));
+    }
+
+    // -------------- Table related operations.
+
+    /**
+     * Create table in glue data catalog.
+     *
+     * @param tablePath Fully qualified name of table. {@link ObjectPath}
+     * @param table instance of {@link CatalogBaseTable} containing table related information.
+     * @param managedTable identifier if managed table.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void createGlueTable(
+            final ObjectPath tablePath, final CatalogBaseTable table, final boolean managedTable)
+            throws CatalogException {
+
+        checkNotNull(table, "Table cannot be Null");
+        checkNotNull(tablePath, "Table Path cannot be Null");
+        final Map<String, String> properties = new HashMap<>(table.getOptions());
+        String tableOwner = GlueUtils.extractTableOwner(properties);
+        if (managedTable) {
+            properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER);
+        }
+        Set<Column> glueColumns = GlueUtils.getGlueColumnsFromCatalogTable(table);
+        String tableLocation = "";
+        if (properties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+            tableLocation = properties.remove(GlueCatalogConstants.LOCATION_URI);
+        } else {
+            try {
+                tableLocation =
+                        getDatabase(tablePath.getDatabaseName())
+                                        .getProperties()
+                                        .get(GlueCatalogConstants.LOCATION_URI)
+                                + GlueCatalogConstants.LOCATION_SEPARATOR
+                                + tablePath.getObjectName();
+            } catch (DatabaseNotExistException e) {
+                LOG.warn("Database doesn't Exists");
+            }
+        }
+
+        StorageDescriptor.Builder storageDescriptorBuilder =
+                StorageDescriptor.builder()
+                        .inputFormat(GlueUtils.extractInputFormat(properties))
+                        .outputFormat(GlueUtils.extractOutputFormat(properties))
+                        .location(tableLocation);
+
+        TableInput.Builder tableInputBuilder =
+                TableInput.builder()
+                        .name(tablePath.getObjectName())
+                        .description(table.getComment())
+                        .tableType(table.getTableKind().name())
+                        .lastAccessTime(Instant.now())
+                        .owner(tableOwner);
+
+        if (table.getTableKind().name().equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name())) {
+            tableInputBuilder.viewExpandedText(GlueUtils.getExpandedQuery(table));
+            tableInputBuilder.viewOriginalText(GlueUtils.getOriginalQuery(table));
+        }
+
+        CreateTableRequest.Builder requestBuilder =
+                CreateTableRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName());
+
+        if (table instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) table;
+            if (catalogTable.isPartitioned()) {
+                LOG.info("Catalog table is partitioned");
+                Collection<Column> partitionKeys =
+                        GlueUtils.getPartitionKeys(catalogTable, glueColumns);
+                LOG.info(
+                        "Partition columns are -> "
+                                + partitionKeys.stream()
+                                        .map(Column::name)
+                                        .collect(Collectors.joining(",")));
+                tableInputBuilder.partitionKeys(partitionKeys);
+            }
+        }
+
+        try {
+            storageDescriptorBuilder.columns(glueColumns);
+            tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+            tableInputBuilder.parameters(properties);
+            requestBuilder.tableInput(tableInputBuilder.build());
+            CreateTableResponse response = glueClient.createTable(requestBuilder.build());
+            LOG.debug(GlueUtils.getDebugLog(response));
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Table created. %s", tablePath.getFullName()));
+
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * @param tablePath fully Qualified table path.
+     * @param newTable instance of {@link CatalogBaseTable} containing information for table.
+     * @param managedTable identifier for managed table.
+     * @throws CatalogException Glue related exception.
+     */
+    public void alterGlueTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean managedTable)
+            throws CatalogException {
+
+        Map<String, String> properties = new HashMap<>(newTable.getOptions());
+        String tableOwner = GlueUtils.extractTableOwner(properties);
+
+        if (managedTable) {
+            properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER);
+        }
+
+        Set<Column> glueColumns = GlueUtils.getGlueColumnsFromCatalogTable(newTable);
+
+        // create StorageDescriptor for table
+        StorageDescriptor.Builder storageDescriptorBuilder =
+                StorageDescriptor.builder()
+                        .inputFormat(GlueUtils.extractInputFormat(properties))
+                        .outputFormat(GlueUtils.extractOutputFormat(properties))
+                        .location(
+                                GlueUtils.extractTableLocation(properties, tablePath, catalogPath))
+                        .parameters(properties)
+                        .columns(glueColumns);
+
+        // create TableInput Builder with available information.
+        TableInput.Builder tableInputBuilder =
+                TableInput.builder()
+                        .name(tablePath.getObjectName())
+                        .description(newTable.getComment())
+                        .tableType(newTable.getTableKind().name())
+                        .lastAccessTime(Instant.now())
+                        .owner(tableOwner);
+
+        UpdateTableRequest.Builder requestBuilder =
+                UpdateTableRequest.builder()
+                        .tableInput(tableInputBuilder.build())
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName());
+
+        if (newTable instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) newTable;
+            if (catalogTable.isPartitioned()) {
+                tableInputBuilder.partitionKeys(
+                        GlueUtils.getPartitionKeys(catalogTable, glueColumns));
+            }
+        }
+
+        // apply storage descriptor and tableInput for request
+        tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+        requestBuilder.tableInput(tableInputBuilder.build());
+
+        try {
+            UpdateTableResponse response = glueClient.updateTable(requestBuilder.build());
+            LOG.debug(GlueUtils.getDebugLog(response));
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Table updated. %s", tablePath.getFullName()));
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get names of all tables or views under this database based on type identifier. An empty list
+     * is returned if none exists.
+     *
+     * @param databaseName fully qualified database name.
+     * @return a list of the names of all tables or views in this database based on type identifier.
+     * @throws CatalogException in case of any runtime exception
+     */
+    public List<String> getGlueTableList(String databaseName, String type) throws CatalogException {
+        GetTablesRequest.Builder tablesRequestBuilder =
+                GetTablesRequest.builder().databaseName(databaseName).catalogId(getGlueCatalogId());
+        GetTablesResponse response = glueClient.getTables(tablesRequestBuilder.build());
+        GlueUtils.validateGlueResponse(response);
+        List<String> finalTableList =
+                response.tableList().stream()
+                        .filter(table -> table.tableType().equalsIgnoreCase(type))
+                        .map(Table::name)
+                        .collect(Collectors.toList());
+        String tableResultNextToken = response.nextToken();
+
+        if (Optional.ofNullable(tableResultNextToken).isPresent()) {
+            do {
+                // update token in requestBuilder to fetch next batch
+                tablesRequestBuilder.nextToken(tableResultNextToken);
+                response = glueClient.getTables(tablesRequestBuilder.build());
+                GlueUtils.validateGlueResponse(response);
+                finalTableList.addAll(
+                        response.tableList().stream()
+                                .filter(table -> table.tableType().equalsIgnoreCase(type))
+                                .map(Table::name)
+                                .collect(Collectors.toList()));
+                tableResultNextToken = response.nextToken();
+            } while (Optional.ofNullable(tableResultNextToken).isPresent());
+        }
+        return finalTableList;
+    }
+
+    /**
+     * Returns a {@link Table} identified by the given table Path. {@link ObjectPath}.
+     *
+     * @param tablePath Path of the table or view
+     * @return The requested table. Glue encapsulates whether table or view in its attribute called
+     *     type.
+     * @throws TableNotExistException if the target does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    public Table getGlueTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        GetTableRequest tablesRequest =
+                GetTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetTableResponse response = glueClient.getTable(tablesRequest);
+            LOG.info(String.format("Glue table Found %s", response.table().name()));
+            GlueUtils.validateGlueResponse(response);
+            return response.table();
+        } catch (EntityNotFoundException e) {
+            throw new TableNotExistException(catalogName, tablePath, e);
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Check if a table or view exists in glue data catalog.
+     *
+     * @param tablePath Path of the table or view
+     * @return true if the given table exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    public boolean glueTableExists(ObjectPath tablePath) throws CatalogException {
+        try {
+            Table glueTable = getGlueTable(tablePath);
+            return glueTable != null && glueTable.name().equals(tablePath.getObjectName());
+        } catch (TableNotExistException e) {
+            LOG.warn(
+                    String.format(
+                            "%s\nDatabase: %s Table: %s",
+                            GlueCatalogConstants.TABLE_NOT_EXISTS_IDENTIFIER,
+                            tablePath.getDatabaseName(),
+                            tablePath.getObjectName()));
+            return false;
+        } catch (CatalogException e) {
+            LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause());
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drop a table or view from glue data catalog.
+     *
+     * @param tablePath fully qualified table path
+     * @throws CatalogException on runtime errors.
+     */
+    public void dropGlueTable(ObjectPath tablePath) throws CatalogException {
+        DeleteTableRequest.Builder tableRequestBuilder =
+                DeleteTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId());
+        try {
+            DeleteTableResponse response = glueClient.deleteTable(tableRequestBuilder.build());
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Dropped Table %s.", tablePath.getObjectName()));
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    // -------------- Function related operations.
+
+    /**
+     * Create a function. Function name should be handled in a case-insensitive way.
+     *
+     * @param functionPath path of the function
+     * @param function Flink function to be created
+     * @throws CatalogException in case of any runtime exception
+     */
+    public void createGlueFunction(ObjectPath functionPath, CatalogFunction function)
+            throws CatalogException, FunctionAlreadyExistException {
+
+        UserDefinedFunctionInput functionInput =
+                GlueUtils.createFunctionInput(functionPath, function);
+        CreateUserDefinedFunctionRequest.Builder requestBuilder =
+                CreateUserDefinedFunctionRequest.builder()
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .functionInput(functionInput);
+        try {
+            CreateUserDefinedFunctionResponse response =
+                    glueClient.createUserDefinedFunction(requestBuilder.build());
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Function created. %s", functionPath.getFullName()));
+        } catch (AlreadyExistsException e) {
+            LOG.error(
+                    String.format(
+                            "%s.%s already Exists. Function language of type: %s",
+                            functionPath.getDatabaseName(),
+                            functionPath.getObjectName(),
+                            function.getFunctionLanguage()));
+            throw new FunctionAlreadyExistException(catalogName, functionPath);
+        } catch (GlueException e) {
+            LOG.error("Error creating glue function.", e.getCause());
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get the user defined function from glue Catalog. Function name should be handled in a
+     * case-insensitive way.
+     *
+     * @param functionPath path of the function
+     * @return the requested function
+     * @throws CatalogException in case of any runtime exception
+     */
+    public CatalogFunction getGlueFunction(ObjectPath functionPath) {
+        GetUserDefinedFunctionRequest request =
+                GetUserDefinedFunctionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(functionPath.getDatabaseName())
+                        .functionName(functionPath.getObjectName())
+                        .build();
+        GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request);
+        GlueUtils.validateGlueResponse(response);
+        UserDefinedFunction udf = response.userDefinedFunction();
+
+        List<org.apache.flink.table.resource.ResourceUri> resourceUris = new LinkedList<>();
+        for (ResourceUri resourceUri : udf.resourceUris()) {
+            resourceUris.add(
+                    new org.apache.flink.table.resource.ResourceUri(
+                            ResourceType.valueOf(resourceUri.resourceType().name()),
+                            resourceUri.uri()));
+        }
+
+        return new CatalogFunctionImpl(
+                GlueUtils.getCatalogFunctionClassName(udf),
+                GlueUtils.getFunctionalLanguage(udf),
+                resourceUris);
+    }
+
+    public List<String> listGlueFunctions(String databaseName) {
+        GetUserDefinedFunctionsRequest.Builder functionsRequest =
+                GetUserDefinedFunctionsRequest.builder()
+                        .databaseName(databaseName)
+                        .catalogId(getGlueCatalogId());
+
+        List<String> glueFunctions;
+        try {
+            GetUserDefinedFunctionsResponse functionsResponse =
+                    glueClient.getUserDefinedFunctions(functionsRequest.build());
+            String token = functionsResponse.nextToken();
+            glueFunctions =
+                    functionsResponse.userDefinedFunctions().stream()
+                            .map(UserDefinedFunction::functionName)
+                            .collect(Collectors.toCollection(LinkedList::new));
+            if (Optional.ofNullable(token).isPresent()) {
+                do {
+                    functionsRequest.nextToken(token);
+                    functionsResponse =
+                            glueClient.getUserDefinedFunctions(functionsRequest.build());
+                    glueFunctions.addAll(
+                            functionsResponse.userDefinedFunctions().stream()
+                                    .map(UserDefinedFunction::functionName)
+                                    .collect(Collectors.toCollection(LinkedList::new)));
+                    token = functionsResponse.nextToken();
+                } while (Optional.ofNullable(token).isPresent());
+            }
+
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+        return glueFunctions;
+    }
+
+    public boolean glueFunctionExists(ObjectPath functionPath) {
+        GetUserDefinedFunctionRequest request =
+                GetUserDefinedFunctionRequest.builder()
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .build();
+
+        try {
+            GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request);
+            GlueUtils.validateGlueResponse(response);
+            return response.userDefinedFunction()
+                    .functionName()
+                    .equalsIgnoreCase(functionPath.getObjectName());
+        } catch (EntityNotFoundException e) {
+            LOG.warn(
+                    String.format(
+                            "Entry not found for function %s.%s",
+                            functionPath.getObjectName(), functionPath.getDatabaseName()));
+            return false;
+        } catch (GlueException e) {
+            LOG.error(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Modify an existing function. Function name should be handled in a case-insensitive way.
+     *
+     * @param functionPath path of function.
+     * @param newFunction modified function.
+     * @throws CatalogException on runtime errors.
+     */
+    public void alterGlueFunction(ObjectPath functionPath, CatalogFunction newFunction)
+            throws CatalogException {
+        UserDefinedFunctionInput functionInput =
+                GlueUtils.createFunctionInput(functionPath, newFunction);
+        UpdateUserDefinedFunctionRequest updateUserDefinedFunctionRequest =
+                UpdateUserDefinedFunctionRequest.builder()
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .catalogId(getGlueCatalogId())
+                        .functionInput(functionInput)
+                        .build();
+        UpdateUserDefinedFunctionResponse response =
+                glueClient.updateUserDefinedFunction(updateUserDefinedFunctionRequest);
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Function altered. %s", functionPath.getFullName()));
+    }
+
+    /**
+     * @param functionPath fully qualified function path
+     * @throws CatalogException
+     */
+    public void dropGlueFunction(ObjectPath functionPath) throws CatalogException {
+        DeleteUserDefinedFunctionRequest request =
+                DeleteUserDefinedFunctionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .functionName(functionPath.getObjectName())
+                        .databaseName(functionPath.getDatabaseName())
+                        .build();
+        DeleteUserDefinedFunctionResponse response = glueClient.deleteUserDefinedFunction(request);
+        GlueUtils.validateGlueResponse(response);
+        LOG.info(String.format("Dropped Function. %s", functionPath.getFullName()));
+    }
+
+    // -------------- Partition related operations.
+
+    public void ensurePartitionedTable(ObjectPath tablePath, Table glueTable)
+            throws TableNotPartitionedException {
+        if (!glueTable.hasPartitionKeys()) {
+            throw new TableNotPartitionedException(catalogName, tablePath);
+        }
+    }
+
+    /**
+     * create partition in glue data catalog.
+     *
+     * @param glueTable glue table
+     * @param partitionSpec partition spec
+     * @param catalogPartition partition to add
+     */
+    public void createGluePartition(
+            Table glueTable, CatalogPartitionSpec partitionSpec, CatalogPartition catalogPartition)
+            throws CatalogException, PartitionSpecInvalidException {
+
+        List<String> partCols = GlueUtils.getColumnNames(glueTable.partitionKeys());
+        LOG.info(String.format("Partition Columns are : %s", String.join(", ", partCols)));
+        List<String> partitionValues =
+                getOrderedFullPartitionValues(
+                        partitionSpec,
+                        partCols,
+                        new ObjectPath(glueTable.databaseName(), glueTable.name()));
+
+        // validate partition values
+        for (int i = 0; i < partCols.size(); i++) {
+            if (isNullOrWhitespaceOnly(partitionValues.get(i))) {
+                throw new PartitionSpecInvalidException(
+                        catalogName,
+                        partCols,
+                        new ObjectPath(glueTable.databaseName(), glueTable.name()),
+                        partitionSpec);
+            }
+        }
+        StorageDescriptor.Builder sdBuilder = glueTable.storageDescriptor().toBuilder();
+        Map<String, String> partitionProperties = catalogPartition.getProperties();
+
+        sdBuilder.location(extractPartitionLocation(partitionProperties));
+        sdBuilder.parameters(partitionSpec.getPartitionSpec());
+        String comment = catalogPartition.getComment();
+        if (comment != null) {
+            partitionProperties.put(GlueCatalogConstants.COMMENT, comment);
+        }
+        PartitionInput.Builder partitionInput =
+                PartitionInput.builder()
+                        .parameters(partitionProperties)
+                        .lastAccessTime(Instant.now())
+                        .storageDescriptor(sdBuilder.build())
+                        .values(partitionValues);
+        CreatePartitionRequest createPartitionRequest =
+                CreatePartitionRequest.builder()
+                        .partitionInput(partitionInput.build())
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(glueTable.databaseName())
+                        .tableName(glueTable.name())
+                        .build();
+
+        try {
+            CreatePartitionResponse response = glueClient.createPartition(createPartitionRequest);
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Partition Created.");
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private String extractPartitionLocation(Map<String, String> properties) {
+        return properties.containsKey(GlueCatalogConstants.LOCATION_URI)
+                ? properties.remove(GlueCatalogConstants.LOCATION_URI)
+                : null;
+    }
+
+    /**
+     * Get a list of ordered partition values by re-arranging them based on the given list of
+     * partition keys. If the partition value is null, it'll be converted into default partition
+     * name.
+     *
+     * @param partitionSpec a partition spec.
+     * @param partitionKeys a list of partition keys.
+     * @param tablePath path of the table to which the partition belongs.
+     * @return A list of partition values ordered according to partitionKeys.
+     * @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have
+     *     different sizes, or any key in partitionKeys doesn't exist in partitionSpec.
+     */
+    private List<String> getOrderedFullPartitionValues(
+            CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath)
+            throws PartitionSpecInvalidException {
+        Map<String, String> spec = partitionSpec.getPartitionSpec();
+        if (spec.size() != partitionKeys.size()) {
+            throw new PartitionSpecInvalidException(
+                    catalogName, partitionKeys, tablePath, partitionSpec);
+        }
+
+        List<String> values = new ArrayList<>(spec.size());
+        for (String key : partitionKeys) {
+            if (!spec.containsKey(key)) {
+                throw new PartitionSpecInvalidException(
+                        catalogName, partitionKeys, tablePath, partitionSpec);
+            } else {
+                String value = spec.get(key);
+                if (value == null) {
+                    value = GlueCatalogConstants.DEFAULT_PARTITION_NAME;
+                }
+                values.add(value);
+            }
+        }
+
+        return values;
+    }
+
+    /**
+     * Update glue table.
+     *
+     * @param tablePath contains database name and table name.
+     * @param partitionSpec Existing partition information.
+     * @param newPartition Partition information with new changes.
+     * @throws CatalogException Exception in failure.
+     */
+    public void alterGluePartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition)
+            throws CatalogException {
+        // todo has to implement
+    }
+
+    /**
+     * Get CatalogPartitionSpec of all partitions from glue data catalog.
+     *
+     * @param tablePath fully qualified table path.
+     * @return List of PartitionSpec
+     */
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) {
+
+        GetPartitionsRequest.Builder request =
+                GetPartitionsRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName())
+                        .tableName(tablePath.getObjectName());
+        try {
+            GetPartitionsResponse response = glueClient.getPartitions(request.build());
+            GlueUtils.validateGlueResponse(response);
+            List<CatalogPartitionSpec> finalPartitionsList =
+                    response.partitions().stream()
+                            .map(this::getCatalogPartitionSpec)
+                            .collect(Collectors.toList());
+            String partitionsResultNextToken = response.nextToken();
+            if (Optional.ofNullable(partitionsResultNextToken).isPresent()) {
+                do {
+                    // creating a new GetPartitionsResult using next token.
+                    request.nextToken(partitionsResultNextToken);
+                    response = glueClient.getPartitions(request.build());
+                    finalPartitionsList.addAll(
+                            response.partitions().stream()
+                                    .map(this::getCatalogPartitionSpec)
+                                    .collect(Collectors.toList()));
+                    partitionsResultNextToken = response.nextToken();
+                } while (Optional.ofNullable(partitionsResultNextToken).isPresent());
+            }
+
+            return finalPartitionsList;
+
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drop partition in Glue data catalog.
+     *
+     * @param tablePath fully qualified table path
+     * @param partitionSpec partition spec details
+     * @throws CatalogException unknown errors
+     * @throws PartitionNotExistException partition doesnt exists
+     */
+    public void dropGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException, PartitionNotExistException {
+
+        try {
+            Table glueTable = getGlueTable(tablePath);
+            List<String> partCols = GlueUtils.getColumnNames(glueTable.partitionKeys());
+            DeletePartitionRequest deletePartitionRequest =
+                    DeletePartitionRequest.builder()
+                            .catalogId(getGlueCatalogId())
+                            .databaseName(tablePath.getDatabaseName())
+                            .tableName(tablePath.getObjectName())
+                            .partitionValues(
+                                    getOrderedFullPartitionValues(
+                                            partitionSpec, partCols, tablePath))
+                            .build();
+            DeletePartitionResponse response = glueClient.deletePartition(deletePartitionRequest);
+            GlueUtils.validateGlueResponse(response);
+            LOG.info("Partition Dropped.");
+        } catch (TableNotExistException | PartitionSpecInvalidException e) {
+            e.printStackTrace();
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * @param tablePath
+     * @param filters
+     * @return List of Partition Spec
+     */
+    public List<CatalogPartitionSpec> listGluePartitionsByFilter(
+            ObjectPath tablePath, List<Expression> filters) {
+        String expressionString =
+                filters.stream()
+                        .map(x -> getExpressionString(x, new StringBuilder()))
+                        .collect(
+                                Collectors.joining(
+                                        GlueCatalogConstants.SPACE
+                                                + GlueCatalogConstants.AND
+                                                + GlueCatalogConstants.SPACE));
+        try {
+            GetPartitionsRequest request =
+                    GetPartitionsRequest.builder()
+                            .databaseName(tablePath.getDatabaseName())
+                            .tableName(tablePath.getObjectName())
+                            .catalogId(getGlueCatalogId())
+                            .expression(expressionString)
+                            .build();
+            GetPartitionsResponse response = glueClient.getPartitions(request);
+            List<CatalogPartitionSpec> catalogPartitionSpecList =
+                    response.partitions().stream()
+                            .map(this::getCatalogPartitionSpec)
+                            .collect(Collectors.toList());
+            GlueUtils.validateGlueResponse(response);
+            String nextToken = response.nextToken();
+            if (Optional.ofNullable(nextToken).isPresent()) {
+                do {
+                    // creating a new GetPartitionsResult using next token.
+                    request =
+                            GetPartitionsRequest.builder()
+                                    .databaseName(tablePath.getDatabaseName())
+                                    .tableName(tablePath.getObjectName())
+                                    .catalogId(getGlueCatalogId())
+                                    .expression(expressionString)
+                                    .nextToken(nextToken)
+                                    .build();
+                    response = glueClient.getPartitions(request);
+                    catalogPartitionSpecList.addAll(
+                            response.partitions().stream()
+                                    .map(this::getCatalogPartitionSpec)
+                                    .collect(Collectors.toList()));
+                    nextToken = response.nextToken();
+                } while (Optional.ofNullable(nextToken).isPresent());
+            }
+            return catalogPartitionSpecList;
+        } catch (GlueException e) {
+            throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private String getExpressionString(Expression expression, StringBuilder sb) {
+
+        for (Expression childExpression : expression.getChildren()) {
+            if (childExpression.getChildren() != null && childExpression.getChildren().size() > 0) {
+                getExpressionString(childExpression, sb);
+            }
+        }
+        return sb.insert(
+                        0,
+                        expression.asSummaryString()
+                                + GlueCatalogConstants.SPACE
+                                + GlueCatalogConstants.AND)
+                .toString();
+    }
+
+    public Partition getGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException {
+        try {
+            GetPartitionRequest request =
+                    GetPartitionRequest.builder()
+                            .catalogId(getGlueCatalogId())
+                            .databaseName(tablePath.getDatabaseName())
+                            .tableName(tablePath.getObjectName())
+                            .build();
+            GetPartitionResponse response = glueClient.getPartition(request);
+            GlueUtils.validateGlueResponse(response);
+            Partition partition = response.partition();
+            if (partition.hasValues()
+                    && GlueUtils.specSubset(
+                            partitionSpec.getPartitionSpec(),
+                            partition.storageDescriptor().parameters())) {
+                return partition;
+            }
+        } catch (EntityNotFoundException e) {
+            throw new PartitionNotExistException(catalogName, tablePath, partitionSpec);
+        }
+        return null;
+    }
+
+    public boolean gluePartitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        GetPartitionRequest request =
+                GetPartitionRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName())
+                        .tableName(tablePath.getObjectName())
+                        .build();
+        try {
+            GetPartitionResponse response = glueClient.getPartition(request);
+            GlueUtils.validateGlueResponse(response);
+            return response.partition()
+                    .parameters()
+                    .keySet()
+                    .containsAll(partitionSpec.getPartitionSpec().keySet());
+        } catch (EntityNotFoundException e) {
+            LOG.warn(String.format("%s is not found", partitionSpec.getPartitionSpec()));
+        } catch (GlueException e) {
+            throw new CatalogException(catalogName, e);
+        }
+        return false;
+    }
+
+    private String getGlueCatalogId() {
+        return awsConfig.getGlueCatalogId();
+    }
+
+    /**
+     * Rename glue table. Glue catalog don't support renaming table. For renaming in Flink, it has

Review Comment:
   https://issues.apache.org/jira/browse/FLINK-31926



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org