You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by db...@apache.org on 2023/07/04 11:14:27 UTC

[impala] branch master updated: IMPALA-11013: Support 'MIGRATE TABLE' for external Hive tables

This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 929b91ac6 IMPALA-11013: Support 'MIGRATE TABLE' for external Hive tables
929b91ac6 is described below

commit 929b91ac644561ee68da7923cf5272eb300d79de
Author: LPL <li...@apache.org>
AuthorDate: Wed Sep 7 18:37:53 2022 +0800

    IMPALA-11013: Support 'MIGRATE TABLE' for external Hive tables
    
    This patch implements the migration from legacy Hive tables to Iceberg
    tables. The target Iceberg tables inherit the location of the original
    Hive tables. The Hive table has to be a non-transactional table.
    
    To migrate a Hive format table stored in a distributed system or object
    store to an Iceberg table use the command:
    
    ALTER TABLE [dbname.]table_name CONVERT TO ICEBERG [TBLPROPERTIES(...)];
    
    Currently only 'iceberg.catalog' is allowed as a table property.
    
    For example
         - ALTER TABLE hive_table CONVERT TO ICEBERG;
         - ALTER TABLE hive_table CONVERT TO ICEBERG TBLPROPERTIES(
           'iceberg.catalog' = 'hadoop.tables');
    
    The HDFS table to be converted must follow those requirements:
         - table is not a transactional table
         - InputFormat must be either PARQUET, ORC, or AVRO
    
    This is an in-place migration so the original data files of the legacy
    Hive table are re-used and not moved, copied or re-created by this
    operation. The new Iceberg table will have the 'external.table.purge'
    property set to true after the migration.
    
    NUM_THREADS_FOR_TABLE_MIGRATION query option can control the maximum
    number of threads to execute the table conversion. The default value is
    one, meaning that table conversion runs on one thread. It can be
    configured in a range of [0, 1024]. Zero means that the number of CPU
    cores will be the degree of parallelism. A value greater than zero will
    imply the number of threads used for table conversion, however, there
    is a cap of the number of CPU cores as the highest degree of
    parallelism.
    
    Process of migration:
     - Step 1: Setting table properties,
               e.g. 'external.table.purge'=false on the HDFS table.
     - Step 2: Rename the HDFS table to a temporary table name using a name
               format of "<original_table_name>_tmp_<random_ID>".
     - Step 3: Refresh the renamed HDFS table.
     - Step 4: Create an external Iceberg table by Iceberg API using the
               data of the Hdfs table.
     - Step 5 (Optional): For an Iceberg table in Hadoop Tables, run a
               CREATE TABLE query to add the Iceberg table to HMS as well.
     - Step 6 (Optional): For an Iceberg table in Hive catalog, run an
               INVALIDATE METADATA to make the new table available for all
               coordinators right after the conversion finished.
     - Step 7 (Optional): For an Iceberg table in Hadoop Tables, set the
               'external.table.purge' property to true in an ALTER TABLE
               query.
     - Step 8: Drop the temporary HDFS table.
    
    Testing:
     - Add e2e tests
     - Add FE UTs
     - Manually tested the runtime performance for a table that is
       unpartitioned and has 10k data files. The runtime is around 10-13s.
    
    Co-authored-by: lipenglin <li...@apache.org>
    
    Change-Id: Iacdad996d680fe545cc9a45e6bc64a348a64cd80
    Reviewed-on: http://gerrit.cloudera.org:8080/20077
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Tamas Mate <tm...@apache.org>
---
 be/src/service/client-request-state.cc             | 122 ++++++-
 be/src/service/client-request-state.h              |  12 +-
 be/src/service/frontend.cc                         |   5 +
 be/src/service/frontend.h                          |   4 +
 be/src/service/query-options.cc                    |   7 +
 be/src/service/query-options.h                     |   6 +-
 common/thrift/Frontend.thrift                      |  19 ++
 common/thrift/ImpalaService.thrift                 |   4 +
 common/thrift/Query.thrift                         |   2 +
 common/thrift/Types.thrift                         |   1 +
 fe/src/main/cup/sql-parser.cup                     |  17 +-
 .../apache/impala/analysis/AnalysisContext.java    |   9 +
 .../impala/analysis/ConvertTableToIcebergStmt.java | 215 +++++++++++++
 .../org/apache/impala/analysis/LoadDataStmt.java   |  21 +-
 .../apache/impala/analysis/QueryStringBuilder.java | 167 ++++++++--
 .../org/apache/impala/catalog/IcebergTable.java    |   3 +
 .../impala/catalog/events/MetastoreEvents.java     |   2 +-
 .../impala/catalog/iceberg/IcebergCatalog.java     |   9 +
 .../impala/catalog/iceberg/IcebergCatalogs.java    |  17 +-
 .../catalog/iceberg/IcebergHadoopCatalog.java      |   5 +
 .../catalog/iceberg/IcebergHadoopTables.java       |   6 +
 .../impala/catalog/iceberg/IcebergHiveCatalog.java |   5 +
 .../apache/impala/service/CatalogOpExecutor.java   |   5 +
 .../java/org/apache/impala/service/Frontend.java   |  30 +-
 .../org/apache/impala/service/JniFrontend.java     |  10 +
 .../apache/impala/util/IcebergSchemaConverter.java |  24 ++
 .../java/org/apache/impala/util/IcebergUtil.java   |   4 +-
 .../org/apache/impala/util/MigrateTableUtil.java   | 293 +++++++++++++++++
 fe/src/main/jflex/sql-scanner.flex                 |   3 +-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |  20 ++
 .../org/apache/impala/analysis/ParserTest.java     |   2 +-
 .../iceberg-migrate-from-external-hdfs-tables.test | 355 +++++++++++++++++++++
 tests/authorization/test_ranger.py                 |  94 ++++++
 tests/query_test/test_iceberg.py                   |   4 +
 34 files changed, 1442 insertions(+), 60 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 3513070e8..64b3a4fc7 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -25,11 +25,9 @@
 #include <rapidjson/rapidjson.h>
 #include <rapidjson/stringbuffer.h>
 #include <rapidjson/writer.h>
-#include <rapidjson/error/en.h>
 
 #include "catalog/catalog-service-client-wrapper.h"
 #include "common/status.h"
-#include "exec/kudu/kudu-util.h"
 #include "exprs/timezone_db.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "rpc/rpc-mgr.inline.h"
@@ -52,15 +50,12 @@
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
 #include "util/lineage-util.h"
-#include "util/metrics.h"
 #include "util/pretty-printer.h"
-#include "util/promise.h"
 #include "util/redactor.h"
 #include "util/runtime-profile-counters.h"
 #include "util/time.h"
 #include "util/uid-util.h"
 
-#include "gen-cpp/CatalogService.h"
 #include "gen-cpp/CatalogService_types.h"
 #include "gen-cpp/control_service.pb.h"
 #include "gen-cpp/control_service.proxy.h"
@@ -313,6 +308,10 @@ Status ClientRequestState::Exec() {
       DCHECK(exec_request_->admin_request.type == TAdminRequestType::SHUTDOWN);
       RETURN_IF_ERROR(ExecShutdownRequest());
       break;
+    case TStmtType::CONVERT:
+      DCHECK(exec_request_->__isset.convert_table_request);
+      LOG_AND_RETURN_IF_ERROR(ExecMigrateRequest());
+      break;
     default:
       stringstream errmsg;
       errmsg << "Unknown exec request stmt type: " << exec_request_->stmt_type;
@@ -2219,4 +2218,117 @@ void ClientRequestState::CopyRPCs(ClientRequestState& from_request) {
   }
 }
 
+Status ClientRequestState::ExecMigrateRequest() {
+  ExecMigrateRequestImpl();
+  SetResultSet({"Table has been migrated."});
+  return query_status_;
+}
+
+void ClientRequestState::ExecMigrateRequestImpl() {
+  // A convert table request holds the query strings for the sub-queries. These are
+  // populated by ConvertTableToIcebergStmt in the Frontend during analysis.
+  TConvertTableRequest& params = exec_request_->convert_table_request;
+  {
+    RuntimeProfile* child_profile =
+        RuntimeProfile::Create(&profile_pool_, "Child Queries 1");
+    profile_->AddChild(child_profile);
+    vector<ChildQuery> child_queries;
+
+    // Prepare: SET some table properties for the original table.
+    RuntimeProfile* set_hdfs_table_profile = RuntimeProfile::Create(
+        &profile_pool_, "Set properties for HDFS table query");
+    child_profile->AddChild(set_hdfs_table_profile);
+    child_queries.emplace_back(params.set_hdfs_table_properties_query, this,
+        parent_server_, set_hdfs_table_profile, &profile_pool_);
+
+    // Prepare: RENAME the HDFS table to a temporary HDFS table.
+    RuntimeProfile* rename_hdfs_table_profile = RuntimeProfile::Create(
+        &profile_pool_, "Rename HDFS table query");
+    child_profile->AddChild(rename_hdfs_table_profile);
+    child_queries.emplace_back(params.rename_hdfs_table_to_temporary_query,
+        this, parent_server_, rename_hdfs_table_profile, &profile_pool_);
+
+    // Prepare: REFRESH the temporary HDFS table.
+    RuntimeProfile* refresh_hdfs_table_profile = RuntimeProfile::Create(
+        &profile_pool_, "Refresh temporary HDFS table query");
+    child_profile->AddChild(refresh_hdfs_table_profile);
+    child_queries.emplace_back(params.refresh_temporary_hdfs_table_query, this,
+        parent_server_, refresh_hdfs_table_profile, &profile_pool_);
+
+    // Execute child queries
+    unique_ptr<ChildQueryExecutor> query_executor(new ChildQueryExecutor());
+    RETURN_VOID_IF_ERROR(query_executor->ExecAsync(move(child_queries)));
+    vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>();
+    Status query_status = query_executor->WaitForAll(completed_queries);
+    if (!query_status.ok()) AddTableResetHints(params, &query_status);
+    {
+      lock_guard<mutex> l(lock_);
+      RETURN_VOID_IF_ERROR(UpdateQueryStatus(query_status));
+    }
+  }
+  // Create an external Iceberg table using the data of the HDFS table.
+  Status status = frontend_->Convert(*exec_request_);
+  if (!status.ok()) AddTableResetHints(params, &status);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
+  }
+  {
+    RuntimeProfile* child_profile =
+        RuntimeProfile::Create(&profile_pool_, "Child Queries 2");
+    profile_->AddChild(child_profile);
+    vector<ChildQuery> child_queries;
+
+    if (params.__isset.create_iceberg_table_query) {
+      // Prepare: CREATE the Iceberg table that inherits HDFS table location.
+      RuntimeProfile* create_iceberg_table_profile = RuntimeProfile::Create(
+          &profile_pool_, "Create Iceberg table query");
+      child_profile->AddChild(create_iceberg_table_profile);
+      child_queries.emplace_back(params.create_iceberg_table_query, this,
+          parent_server_, create_iceberg_table_profile, &profile_pool_);
+    } else {
+      // Prepare: Invalidate metadata for tables in Hive catalog to guarantee that it is
+      // propagated instantly to all coordinators.
+      RuntimeProfile* invalidate_metadata_profile = RuntimeProfile::Create(
+          &profile_pool_, "Invalidate metadata Iceberg table query");
+      child_profile->AddChild(invalidate_metadata_profile);
+      child_queries.emplace_back(params.invalidate_metadata_query, this,
+          parent_server_, invalidate_metadata_profile, &profile_pool_);
+    }
+
+    if (params.__isset.post_create_alter_table_query) {
+      // Prepare: ALTER TABLE query after creating the Iceberg table.
+      RuntimeProfile* post_create_alter_table_profile = RuntimeProfile::Create(
+          &profile_pool_, "ALTER TABLE after create Iceberg table query");
+      child_profile->AddChild(post_create_alter_table_profile);
+      child_queries.emplace_back(params.post_create_alter_table_query, this,
+          parent_server_, post_create_alter_table_profile, &profile_pool_);
+    }
+
+    // Prepare: DROP the temporary HDFS table.
+    RuntimeProfile* drop_tmp_hdfs_table_profile = RuntimeProfile::Create(
+        &profile_pool_, "Drop temporary HDFS table query");
+    child_profile->AddChild(drop_tmp_hdfs_table_profile);
+    child_queries.emplace_back(params.drop_temporary_hdfs_table_query, this,
+        parent_server_, drop_tmp_hdfs_table_profile, &profile_pool_);
+
+    // Execute queries
+    unique_ptr<ChildQueryExecutor> query_executor(new ChildQueryExecutor());
+    RETURN_VOID_IF_ERROR(query_executor->ExecAsync(move(child_queries)));
+    vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>();
+    Status query_status = query_executor->WaitForAll(completed_queries);
+    {
+      lock_guard<mutex> l(lock_);
+      RETURN_VOID_IF_ERROR(UpdateQueryStatus(query_status));
+    }
+  }
+}
+
+void ClientRequestState::AddTableResetHints(const TConvertTableRequest& params,
+      Status* status) const {
+  string table_reset_hint("Your table might have been renamed. To reset the name "
+      "try running:\n" + params.reset_table_name_query + ";");
+  status->MergeStatus(Status(table_reset_hint));
+}
+
 }
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 4477d648c..8e5cb4bed 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -876,6 +876,16 @@ class ClientRequestState {
 
   /// Helper function to get common header
   TCatalogServiceRequestHeader GetCatalogServiceRequestHeader();
-};
 
+  /// The logic of executing a MIGRATE TABLE statement.
+  Status ExecMigrateRequest() WARN_UNUSED_RESULT;
+
+  /// Core logic of executing a MIGRATE TABLE statement.
+  void ExecMigrateRequestImpl();
+
+  /// Used when running into an error during table migration to extend 'status' with some
+  /// hints about how to reset the original table name. 'params' holds the SQL query
+  /// string the user should run.
+  void AddTableResetHints(const TConvertTableRequest& params, Status* status) const;
+};
 }
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index d06c86c8d..1d1fe51fc 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -113,6 +113,7 @@ Frontend::Frontend() {
     {"setCatalogIsReady", "()V", &set_catalog_is_ready_id_},
     {"waitForCatalog", "()V", &wait_for_catalog_id_},
     {"loadTableData", "([B)[B", &load_table_data_id_},
+    {"convertTable", "([B)V", &convertTable},
     {"getTableFiles", "([B)[B", &get_table_files_id_},
     {"showCreateFunction", "([B)Ljava/lang/String;", &show_create_function_id_},
     {"buildTestDescriptorTable", "([B)[B", &build_test_descriptor_table_id_},
@@ -360,3 +361,7 @@ Status Frontend::AbortKuduTransaction(const TUniqueId& query_id) {
 Status Frontend::CommitKuduTransaction(const TUniqueId& query_id) {
   return JniUtil::CallJniMethod(fe_, commit_kudu_txn_, query_id);
 }
+
+Status Frontend::Convert(const TExecRequest& request) {
+  return JniUtil::CallJniMethod(fe_, convertTable, request);
+}
diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h
index 5e504d003..1df8dc44b 100644
--- a/be/src/service/frontend.h
+++ b/be/src/service/frontend.h
@@ -222,6 +222,9 @@ class Frontend {
   /// Commits Kudu transaction with the given query id.
   Status CommitKuduTransaction(const TUniqueId& query_id);
 
+  /// Convert external Hdfs tables to Iceberg tables
+  Status Convert(const TExecRequest& request);
+
  private:
   jobject fe_;  // instance of org.apache.impala.service.JniFrontend
   jmethodID create_exec_request_id_;  // JniFrontend.createExecRequest()
@@ -260,6 +263,7 @@ class Frontend {
   jmethodID validate_saml2_bearer_id_; // JniFrontend.validateSaml2Bearer()
   jmethodID abort_kudu_txn_; // JniFrontend.abortKuduTransaction()
   jmethodID commit_kudu_txn_; // JniFrontend.commitKuduTransaction()
+  jmethodID convertTable; // JniFrontend.convertTable
 
   // Only used for testing.
   jmethodID build_test_descriptor_table_id_; // JniFrontend.buildTestDescriptorTable()
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 09715f0d4..493b71be1 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1111,6 +1111,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_allow_unsafe_casts(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::NUM_THREADS_FOR_TABLE_MIGRATION: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveRange<int32_t>(
+            option, value, 0, 1024, &int32_t_val));
+        query_options->__set_num_threads_for_table_migration(int32_t_val);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 0a424301e..931e2009e 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                                 \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                                 \
-      TImpalaQueryOptions::ALLOW_UNSAFE_CASTS + 1);                                      \
+      TImpalaQueryOptions::NUM_THREADS_FOR_TABLE_MIGRATION + 1);                         \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)               \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)             \
@@ -293,7 +293,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(max_fragment_instances_per_node, MAX_FRAGMENT_INSTANCES_PER_NODE,         \
       TQueryOptionLevel::ADVANCED)                                                       \
   QUERY_OPT_FN(max_sort_run_size, MAX_SORT_RUN_SIZE, TQueryOptionLevel::DEVELOPMENT)     \
-  QUERY_OPT_FN(allow_unsafe_casts, ALLOW_UNSAFE_CASTS, TQueryOptionLevel::DEVELOPMENT);
+  QUERY_OPT_FN(allow_unsafe_casts, ALLOW_UNSAFE_CASTS, TQueryOptionLevel::DEVELOPMENT)   \
+  QUERY_OPT_FN(num_threads_for_table_migration, NUM_THREADS_FOR_TABLE_MIGRATION,         \
+      TQueryOptionLevel::ADVANCED);
 
 /// Enforce practical limits on some query options to avoid undesired query state.
 static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 45035389f..f81034bd7 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -587,6 +587,22 @@ struct TAccessEvent {
   3: required string privilege
 }
 
+// Request for "ALTER TABLE ... CONVERT TO" statements
+struct TConvertTableRequest {
+  1: required CatalogObjects.TTableName table_name
+  2: required CatalogObjects.TTableName hdfs_table_name
+  3: required CatalogObjects.THdfsFileFormat file_format
+  4: optional map<string, string> properties
+  5: optional string set_hdfs_table_properties_query
+  6: optional string rename_hdfs_table_to_temporary_query
+  7: optional string refresh_temporary_hdfs_table_query
+  8: optional string reset_table_name_query
+  9: optional string create_iceberg_table_query
+  10: optional string invalidate_metadata_query
+  11: optional string post_create_alter_table_query
+  12: optional string drop_temporary_hdfs_table_query
+}
+
 // Result of call to createExecRequest()
 struct TExecRequest {
   1: required Types.TStmtType stmt_type
@@ -647,6 +663,9 @@ struct TExecRequest {
   // True if request pool is set by Frontend rather than user specifically setting it via
   // REQUEST_POOL query option.
   18: optional bool request_pool_set_by_frontend = false
+
+  // Request for "ALTER TABLE ... CONVERT TO" statements.
+  19: optional TConvertTableRequest convert_table_request
 }
 
 // Parameters to FeSupport.cacheJar().
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 20f83bf44..75e471f0e 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -799,6 +799,10 @@ enum TImpalaQueryOptions {
   // implicit casts between numeric and string types in set operations and insert
   // statements.
   ALLOW_UNSAFE_CASTS = 158;
+
+  // The maximum number of threads Impala can use for migrating a table to a different
+  // type. E.g. from Hive table to Iceberg table.
+  NUM_THREADS_FOR_TABLE_MIGRATION = 159;
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 36e3a827d..db046dc2b 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -642,6 +642,8 @@ struct TQueryOptions {
   // See comment in ImpalaService.thrift
   159: optional bool allow_unsafe_casts = false;
 
+  // See comment in ImpalaService.thrift
+  160: optional i32 num_threads_for_table_migration = 1;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index e10ed0543..f0fc2626c 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -109,6 +109,7 @@ enum TStmtType {
   SET = 5
   ADMIN_FN = 6
   TESTCASE = 7
+  CONVERT = 8
 }
 
 enum TIcebergOperation {
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index b5d83645b..eca1ee3b8 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -305,7 +305,7 @@ terminal
   KW_ARRAY, KW_AS, KW_ASC, KW_AUTHORIZATION, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY,
   KW_BLOCKSIZE, KW_BOOLEAN, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST, KW_CHANGE,
   KW_CHAR, KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPRESSION,
-  KW_COMPUTE, KW_CONSTRAINT, KW_COPY, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_DATA,
+  KW_COMPUTE, KW_CONSTRAINT, KW_CONVERT, KW_COPY, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_DATA,
   KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT, KW_DELETE,
   KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISABLE, KW_DISTINCT, KW_DIV, KW_DOUBLE,
   KW_DROP, KW_ELSE, KW_ENABLE, KW_ENCODING, KW_END, KW_ESCAPED, KW_EXCEPT, KW_EXECUTE,
@@ -617,6 +617,9 @@ nonterminal CopyTestCaseStmt copy_testcase_stmt;
 // Admin statements.
 nonterminal AdminFnStmt admin_fn_stmt;
 
+// For "ALTER TABLE ... CONVERT TO" statements
+nonterminal ConvertTableToIcebergStmt convert_tbl_stmt;
+
 precedence left KW_LOGICAL_OR;
 precedence left KW_OR;
 precedence left KW_AND;
@@ -764,6 +767,8 @@ stmt ::=
   {: RESULT = shutdown; :}
   | stmt:s SEMICOLON
   {: RESULT = s; :}
+  | convert_tbl_stmt: convert
+  {: RESULT = convert; :}
   ;
 
 load_stmt ::=
@@ -1369,6 +1374,14 @@ alter_tbl_stmt ::=
   :}
   ;
 
+convert_tbl_stmt ::=
+  KW_ALTER KW_TABLE table_name:table KW_CONVERT KW_TO KW_ICEBERG
+  {: RESULT = new ConvertTableToIcebergStmt(table); :}
+  | KW_ALTER KW_TABLE table_name:table KW_CONVERT KW_TO KW_ICEBERG
+    KW_TBLPROPERTIES LPAREN properties_map:props RPAREN
+  {: RESULT =  new ConvertTableToIcebergStmt(table, props); :}
+  ;
+
 table_property_type ::=
   KW_TBLPROPERTIES
   {: RESULT = TTablePropertyType.TBL_PROPERTY; :}
@@ -4211,6 +4224,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_CONSTRAINT:r
   {: RESULT = r.toString(); :}
+  | KW_CONVERT:r
+  {: RESULT = r.toString(); :}
   | KW_COPY:r
   {: RESULT = r.toString(); :}
   | KW_CREATE:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index f151cf19c..a712cbaa8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -212,6 +212,10 @@ public class AnalysisContext {
           || isShowTablesStmt() || isAlterTableStmt() || isShowFunctionsStmt();
     }
 
+    public boolean isConvertTableToIcebergStmt() {
+      return stmt_ instanceof ConvertTableToIcebergStmt;
+    }
+
     public AlterTableStmt getAlterTableStmt() {
       Preconditions.checkState(isAlterTableStmt());
       return (AlterTableStmt) stmt_;
@@ -381,6 +385,11 @@ public class AnalysisContext {
       return (AdminFnStmt) stmt_;
     }
 
+    public ConvertTableToIcebergStmt getConvertTableToIcebergStmt() {
+      Preconditions.checkState(isConvertTableToIcebergStmt());
+      return (ConvertTableToIcebergStmt) stmt_;
+    }
+
     public StatementBase getStmt() { return stmt_; }
     public Analyzer getAnalyzer() { return analyzer_; }
     public Set<TAccessEvent> getAccessEvents() { return analyzer_.getAccessEvents(); }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ConvertTableToIcebergStmt.java b/fe/src/main/java/org/apache/impala/analysis/ConvertTableToIcebergStmt.java
new file mode 100644
index 000000000..e5f00ea6b
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/ConvertTableToIcebergStmt.java
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.analysis.QueryStringBuilder.Create;
+import org.apache.impala.analysis.QueryStringBuilder.Drop;
+import org.apache.impala.analysis.QueryStringBuilder.Invalidate;
+import org.apache.impala.analysis.QueryStringBuilder.Refresh;
+import org.apache.impala.analysis.QueryStringBuilder.Rename;
+import org.apache.impala.analysis.QueryStringBuilder.SetTblProps;
+import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.IcebergTable;
+import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.catalog.Table;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.THdfsFileFormat;
+import org.apache.impala.thrift.TIcebergCatalog;
+import org.apache.impala.thrift.TConvertTableRequest;
+import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.IcebergUtil;
+import org.apache.impala.util.MigrateTableUtil;
+
+/**
+ * Represents an "ALTER TABLE ... CONVERT TO" statement for the migration from HDFS table
+ * to an Iceberg table:
+ * ALTER TABLE <table name> CONVERT TO ICEBERG
+ * [TBLPROPERTIES (prop1=val1, prop2=val2 ...)]
+ */
+public class ConvertTableToIcebergStmt extends StatementBase {
+
+  private TableName tableName_;
+  private TableName tmpHdfsTableName_;
+  private final Map<String, String> properties_;
+  private String setHdfsTablePropertiesQuery_;
+  private String renameHdfsTableToTemporaryQuery_;
+  private String refreshTemporaryHdfsTableQuery_;
+  private String resetTableNameQuery_;
+  private String createIcebergTableQuery_;
+  private String invalidateMetadataQuery_;
+  private String postCreateAlterTableQuery_;
+  private String dropTemporaryHdfsTableQuery_;
+
+  public ConvertTableToIcebergStmt(TableName tableName, Map<String, String> properties) {
+    tableName_ = tableName;
+    properties_ = properties;
+  }
+
+  public ConvertTableToIcebergStmt(TableName tableName) {
+    this(tableName, Maps.newHashMap());
+  }
+
+  @Override
+  public void collectTableRefs(List<TableRef> tblRefs) {
+    tblRefs.add(new TableRef(tableName_.toPath(), null));
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    tableName_ = analyzer.getFqTableName(tableName_);
+    // TODO: Until IMPALA-12190 is fixed, user needs ALL privileges on the DB to migrate a
+    // table. Once it's fixed, ALL privileges on the table are enough.
+    analyzer.getDb(tableName_.getDb(), Privilege.ALL);
+    FeTable table = analyzer.getTable(tableName_, Privilege.ALL);
+    if (!(table instanceof FeFsTable)) {
+      throw new AnalysisException("CONVERT TO ICEBERG is not supported for " +
+          table.getClass().getSimpleName());
+    }
+
+    if (table.getMetaStoreTable().getParameters() != null &&
+        AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) {
+      throw new AnalysisException(
+          "CONVERT TO ICEBERG is not supported for transactional tables");
+    }
+
+    if (!MetaStoreUtils.isExternalTable(table.getMetaStoreTable())) {
+      throw new AnalysisException(
+              "CONVERT TO ICEBERG is not supported for managed tables");
+    }
+
+    StorageDescriptor sd = table.getMetaStoreTable().getSd();
+    if (MigrateTableUtil.getFileFormat(sd) == null) {
+      throw new AnalysisException("CONVERT TO ICEBERG is not supported for " +
+          sd.getInputFormat());
+    }
+
+    if (properties_.size() > 1 ||
+        properties_.keySet().stream().anyMatch(
+            key -> !key.equalsIgnoreCase(IcebergTable.ICEBERG_CATALOG)) ) {
+      throw new AnalysisException(String.format(
+          "CONVERT TO ICEBERG only accepts '%s' as TBLPROPERTY.",
+          IcebergTable.ICEBERG_CATALOG));
+    }
+
+    if (TIcebergCatalog.HADOOP_CATALOG == IcebergUtil.getTIcebergCatalog(properties_)) {
+      throw new AnalysisException("The Hadoop Catalog is not supported because the " +
+          "location may change");
+    }
+
+    // TODO: this is a temporary check until https://github.com/apache/iceberg/issues/7612
+    // is fixed.
+    for (PrunablePartition partition : ((FeFsTable) table).getPartitions()) {
+      for (LiteralExpr partitionExpr : partition.getPartitionValues()) {
+        if (!partitionExpr.getType().isStringType()) continue;
+        String partitionValue = partitionExpr.getStringValue();
+        if (partitionValue == null) continue;
+        if (partitionValue.contains("/")) {
+          throw new AnalysisException ("Can't migrate table with '/' in the partition " +
+              "values until Iceberg #7612 is fixed. '" + partitionValue + "'");
+        }
+      }
+    }
+
+    createSubQueryStrings((FeFsTable) table);
+  }
+
+  private void createSubQueryStrings(FeFsTable table)  {
+    setHdfsTablePropertiesQuery_ = SetTblProps.builder()
+              .table(table.getFullName())
+              .property(Table.TBL_PROP_EXTERNAL_TABLE_PURGE, "false")
+              .property("TRANSLATED_TO_EXTERNAL", "FALSE").build();
+
+    tmpHdfsTableName_ = createTmpTableName();
+    Preconditions.checkState(tmpHdfsTableName_.isFullyQualified());
+
+    renameHdfsTableToTemporaryQuery_ = Rename.builder()
+        .source(table.getFullName())
+        .target(tmpHdfsTableName_.toString()).build();
+
+    refreshTemporaryHdfsTableQuery_ = Refresh.builder()
+        .table(tmpHdfsTableName_.toString())
+        .build();
+
+    resetTableNameQuery_ = Rename.builder()
+            .source(tmpHdfsTableName_.toString())
+            .target(table.getFullName()).build();
+
+    if (!IcebergUtil.isHiveCatalog(properties_)) {
+      Preconditions.checkState(tableName_.isFullyQualified());
+      Create create = Create.builder()
+          .table(tableName_.toString(), true)
+          .storedAs(THdfsFileFormat.ICEBERG.toString())
+          .tableLocation(table.getLocation());
+      for (Map.Entry<String, String> propEntry : properties_.entrySet()) {
+        create.property(propEntry.getKey(), propEntry.getValue());
+      }
+      createIcebergTableQuery_ = create.build();
+
+      postCreateAlterTableQuery_ = SetTblProps.builder()
+          .table(tableName_.toString())
+          .property(Table.TBL_PROP_EXTERNAL_TABLE_PURGE, "true").build();
+    } else {
+      // In HiveCatalog we invoke an IM after creating the table to immediately propagate
+      // the existance of the new Iceberg table and avoid timing issues.
+      invalidateMetadataQuery_ = Invalidate.builder()
+          .table(tableName_.toString())
+          .build();
+    }
+
+    dropTemporaryHdfsTableQuery_ = Drop.builder()
+        .table(tmpHdfsTableName_.toString()).build();
+  }
+
+  private TableName createTmpTableName() {
+    String tmpTableNameStr = QueryStringBuilder.createTmpTableName(
+        tableName_.getDb(), tableName_.getTbl());
+    return TableName.parse(tmpTableNameStr);
+  }
+
+  public TConvertTableRequest toThrift() {
+    Preconditions.checkNotNull(tableName_);
+    Preconditions.checkNotNull(tmpHdfsTableName_);
+    TConvertTableRequest params = new TConvertTableRequest(tableName_.toThrift(),
+        tmpHdfsTableName_.toThrift(), THdfsFileFormat.ICEBERG);
+    params.setProperties(properties_);
+    params.setSet_hdfs_table_properties_query(setHdfsTablePropertiesQuery_);
+    params.setRename_hdfs_table_to_temporary_query(renameHdfsTableToTemporaryQuery_);
+    params.setRefresh_temporary_hdfs_table_query(refreshTemporaryHdfsTableQuery_);
+    params.setReset_table_name_query(resetTableNameQuery_);
+    if (!Strings.isNullOrEmpty(createIcebergTableQuery_)) {
+      params.setCreate_iceberg_table_query(createIcebergTableQuery_);
+    }
+    if (!Strings.isNullOrEmpty(invalidateMetadataQuery_)) {
+      params.setInvalidate_metadata_query(invalidateMetadataQuery_);
+    }
+    if (!Strings.isNullOrEmpty(postCreateAlterTableQuery_)) {
+      params.setPost_create_alter_table_query(postCreateAlterTableQuery_);
+    }
+    params.setDrop_temporary_hdfs_table_query(dropTemporaryHdfsTableQuery_);
+    return params;
+  }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
index a828d41b8..a6dcc6946 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
@@ -17,10 +17,11 @@
 
 package org.apache.impala.analysis;
 
+import com.google.common.base.Preconditions;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.List;
-import java.util.UUID;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,8 +40,6 @@ import org.apache.impala.util.FsPermissionChecker;
 import org.apache.orc.OrcFile;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Represents a LOAD DATA statement for moving data into an existing table:
  * LOAD DATA INPATH 'filepath' [OVERWRITE] INTO TABLE <table name>
@@ -241,10 +240,10 @@ public class LoadDataStmt extends StatementBase {
    */
   private void analyzeLoadIntoIcebergTable() throws AnalysisException {
     Path sourcePath = sourceDataPath_.getPath();
-    String tmpTableName = dbName_ + "." + tableName_ + "_tmp" +
-        UUID.randomUUID().toString().substring(0, 8);
+    String tmpTableName = QueryStringBuilder.createTmpTableName(dbName_,
+        tableName_.getTbl());
     QueryStringBuilder.Create createTableQueryBuilder =
-        new QueryStringBuilder.Create().table(tmpTableName, true);
+        QueryStringBuilder.Create.builder().table(tmpTableName, true);
     try {
       FileSystem fs = sourcePath.getFileSystem(FileSystemUtil.getConfiguration());
       Path filePathForLike = sourcePath;
@@ -272,20 +271,20 @@ public class LoadDataStmt extends StatementBase {
             + "format, file '%s' has '%s' magic string.", filePathForLike, magicString));
       }
       createTableQueryBuilder.tableLocation("%s");
-      createTableQueryBuilder.addTableProperty("TEMPORARY", "true");
+      createTableQueryBuilder.property("TEMPORARY", "true");
     } catch (IOException e) {
       throw new AnalysisException("Failed to generate CREATE TABLE subquery "
           + "statement. ", e);
     }
     createTmpTblQuery_ = createTableQueryBuilder.build();
-    QueryStringBuilder.Insert insertTblQueryBuilder =
-        new QueryStringBuilder.Insert().overwrite(overwrite_)
+    QueryStringBuilder.Insert insertTblQueryBuilder = QueryStringBuilder.Insert.builder()
+        .overwrite(overwrite_)
         .table(tableName_.toString());
     QueryStringBuilder.Select insertSelectTblQueryBuilder =
-        new QueryStringBuilder.Select().selectList("*").from(tmpTableName);
+        QueryStringBuilder.Select.builder().selectList("*").from(tmpTableName);
     insertTblQueryBuilder.select(insertSelectTblQueryBuilder);
     insertTblQuery_ = insertTblQueryBuilder.build();
-    dropTmpTblQuery_ = new QueryStringBuilder.Drop().table(tmpTableName).build();
+    dropTmpTblQuery_ = QueryStringBuilder.Drop.builder().table(tmpTableName).build();
   }
 
   public TLoadDataReq toThrift() {
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java b/fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java
index 6515bf872..20ac1aea4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java
@@ -17,33 +17,53 @@
 
 package org.apache.impala.analysis;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
 
 /**
- * This class standardizes the query string building process. At this point only used for
- * child query creation for Iceberg LOAD DATA INPATH queries. Each inner class is
- * responsible for a specific query type, while the outer class can be used to instantiate
- * the inner classes. The methods of the inner classes are supposed to be chainable.
+ * This class standardizes the query string building process. Each inner class is
+ * responsible for a specific query type, while the outer class can be used to
+ * instantiate the inner classes. The methods of the inner classes are supposed to be
+ * chainable.
  */
 public class QueryStringBuilder {
 
+  public static String createTmpTableName(String dbName, String tableName) {
+    return dbName + "." + tableName + "_tmp_" + UUID.randomUUID().toString()
+        .substring(0, 8);
+  }
+
+  private static String appendProps(StringBuilder builder, Map<String, String> props_) {
+    builder.append(" TBLPROPERTIES (");
+    for (Entry<String, String> prop : props_.entrySet()) {
+      builder.append("'").append(prop.getKey()).append("'='").append(prop.getValue())
+          .append("',");
+    }
+    builder.deleteCharAt(builder.length() - 1);
+    builder.append(")");
+    return builder.toString();
+  }
+
   public static class Create {
     private String tableName_;
-    private Boolean external_;
-    private Boolean like_;
+    private boolean external_;
+    private boolean like_;
     private String likeFileFormat_;
     private String likeLocation_;
     private String storedAsFileFormat_;
     private String tableLocation_;
-    private List<String> tableProperties_;
+    private final Map<String, String> props_= Maps.newHashMap();
 
-    public Create() {
-      tableProperties_ = new ArrayList<String>();
+    public Create() {}
+
+    public static Create builder() {
+      return new Create();
     }
 
-    public Create table(String tableName, Boolean external) {
+    public Create table(String tableName, boolean external) {
       tableName_ = tableName;
       external_ = external;
       return this;
@@ -66,8 +86,8 @@ public class QueryStringBuilder {
       return this;
     }
 
-    public Create addTableProperty(String key, String value) {
-      tableProperties_.add("'" + key + "'='" + value + "'");
+    public Create property(String k, String v) {
+      props_.put(k, v);
       return this;
     }
 
@@ -84,32 +104,28 @@ public class QueryStringBuilder {
       }
       builder.append("STORED AS " + storedAsFileFormat_ + " ");
       builder.append("LOCATION '" + tableLocation_ + "'");
-      if (!tableProperties_.isEmpty()) {
-        builder.append(" TBLPROPERTIES (");
-        Iterator<String> it = tableProperties_.iterator();
-        while(it.hasNext()) {
-          builder.append(it.next());
-          if(it.hasNext()) builder.append(", ");
-        }
-        builder.append(")");
+      if (props_.isEmpty()) {
+        return builder.toString();
       }
-      return builder.toString();
+      return appendProps(builder, props_);
     }
   }
 
   public static class Insert {
     private String tableName_;
-    private Boolean overwrite_;
+    private boolean overwrite_;
     private Select select_;
 
-    public Insert() {}
+    public static Insert builder() {
+      return new Insert();
+    }
 
     public Insert table(String tableName) {
       tableName_ = tableName + " ";
       return this;
     }
 
-    public Insert overwrite(Boolean overwrite) {
+    public Insert overwrite(boolean overwrite) {
       overwrite_ = overwrite;
       return this;
     }
@@ -136,7 +152,9 @@ public class QueryStringBuilder {
     private String selectList_;
     private String tableName_;
 
-    public Select() {}
+    public static Select builder() {
+      return new Select();
+    }
 
     public Select selectList(String selectList) {
       selectList_ = selectList;
@@ -159,7 +177,9 @@ public class QueryStringBuilder {
   public static class Drop {
     private String tableName_;
 
-    public Drop() {}
+    public static Drop builder() {
+      return new Drop();
+    }
 
     public Drop table(String tableName) {
       tableName_ = tableName;
@@ -173,4 +193,93 @@ public class QueryStringBuilder {
       return builder.toString();
     }
   }
+
+  public static class SetTblProps {
+    private String tableName_;
+    private final Map<String, String> props_ = Maps.newHashMap();;
+
+    public static SetTblProps builder() {
+      return new SetTblProps();
+    }
+
+    public SetTblProps table(String tableName) {
+      tableName_ = tableName;
+      return this;
+    }
+
+    public SetTblProps property(String k, String v) {
+      props_.put(k, v);
+      return this;
+    }
+
+    public String build() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("ALTER TABLE ");
+      builder.append(tableName_);
+      builder.append(" SET");
+      Preconditions.checkState(props_.size() >= 1);
+      return appendProps(builder, props_);
+    }
+  }
+
+  public static class Rename {
+
+    private String sourceTableName_;
+    private String targetTableName_;
+
+    public static Rename builder() {
+      return new Rename();
+    }
+
+    public Rename source(String tableName) {
+      sourceTableName_ = tableName;
+      return this;
+    }
+
+    public Rename target(String tableName) {
+      targetTableName_ = tableName;
+      return this;
+    }
+
+    public String build() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("ALTER TABLE ");
+      builder.append(sourceTableName_);
+      builder.append(" RENAME TO ");
+      builder.append(targetTableName_);
+      return builder.toString();
+    }
+  }
+
+  public static class Refresh {
+    private String tableName_;
+
+    public static Refresh builder() {
+      return new Refresh();
+    }
+
+    public Refresh table(String tableName) {
+      tableName_ = tableName;
+      return this;
+    }
+
+    public String build() {
+      return "REFRESH " + tableName_;
+    }
+  }
+
+  public static class Invalidate {
+    private String tableName_;
+
+    public static Invalidate builder() { return new Invalidate(); }
+
+    public Invalidate table(String tableName) {
+      tableName_ = tableName;
+      return this;
+    }
+
+    public String build() {
+      return "INVALIDATE METADATA " + tableName_;
+    }
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 368f67de4..91eefeee9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -134,6 +134,9 @@ public class IcebergTable extends Table implements FeIcebergTable {
   public static final int V2_FILE_PATH_FIELD_ID = 2147483546;
   public static final int V2_POS_FIELD_ID = 2147483545;
 
+  // The name of the folder where Iceberg metadata lives.
+  public static final String METADATA_FOLDER_NAME = "metadata";
+
   // Iceberg catalog type dependent on table properties
   private TIcebergCatalog icebergCatalog_;
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 11aa1e407..e26acaff4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -2051,7 +2051,7 @@ public class MetastoreEvents {
         }
       } catch (CatalogException e) {
         throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
-                + "refresh newly added partitions of table {}. Event processing cannot "
+                + "refresh newly added partitions of table '%s'. Event processing cannot "
                 + "continue. Issue an invalidate metadata command to reset event "
                 + "processor.", getFullyQualifiedTblName()), e);
       }
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
index 18bc7c981..d75e7359c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalog.java
@@ -67,6 +67,15 @@ public interface IcebergCatalog {
    */
   boolean dropTable(FeIcebergTable feTable, boolean purge);
 
+  /**
+   * Drops the table from this catalog using database name and table name.
+   * @param dbName the database name
+   * @param tblName the table name
+   * @param purge whether to drop data/metadata files or not
+   * @return true if table was dropped, false if the table did not exist
+   */
+  boolean dropTable(String dbName, String tblName, boolean purge);
+
   /**
    * Renames Iceberg table.
    * For HadoopTables, Iceberg does not supported 'renameTable' method
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
index 99beb6794..3c87dcf82 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCatalogs.java
@@ -17,12 +17,13 @@
 
 package org.apache.impala.catalog.iceberg;
 
+
 import static org.apache.impala.catalog.Table.TBL_PROP_EXTERNAL_TABLE_PURGE;
 import static org.apache.impala.catalog.Table.TBL_PROP_EXTERNAL_TABLE_PURGE_DEFAULT;
 
+import com.google.common.base.Preconditions;
 import java.util.Map;
 import java.util.Properties;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.CatalogUtil;
@@ -43,8 +44,6 @@ import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.util.IcebergUtil;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Implementation of IcebergCatalog for tables handled by Iceberg's Catalogs API.
  */
@@ -134,6 +133,12 @@ public class IcebergCatalogs implements IcebergCatalog {
     return Catalogs.dropTable(configuration_, properties);
   }
 
+  @Override
+  public boolean dropTable(String dbName, String tblName, boolean purge) {
+    throw new UnsupportedOperationException(
+        "'Catalogs' doesn't support dropping table by name");
+  }
+
   @Override
   public void renameTable(FeIcebergTable feTable, TableIdentifier newTableId) {
     // Iceberg's Catalogs class has no renameTable() method
@@ -150,8 +155,8 @@ public class IcebergCatalogs implements IcebergCatalog {
     return configuration_.get(propKey);
   }
 
-  private Properties createPropsForCatalogs(TableIdentifier tableId, String location,
-      Map<String, String> tableProps) {
+  public static Properties createPropsForCatalogs(TableIdentifier tableId,
+      String location, Map<String, String> tableProps) {
     Properties properties = new Properties();
     properties.putAll(tableProps);
     if (tableId != null) {
@@ -160,7 +165,7 @@ public class IcebergCatalogs implements IcebergCatalog {
       properties.setProperty(Catalogs.LOCATION, location);
     }
     properties.setProperty(IcebergTable.ICEBERG_CATALOG,
-                           tableProps.get(IcebergTable.ICEBERG_CATALOG));
+        tableProps.get(IcebergTable.ICEBERG_CATALOG));
     return properties;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
index e7bf4dd63..409aa78f1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopCatalog.java
@@ -116,6 +116,11 @@ public class IcebergHadoopCatalog implements IcebergCatalog {
     return hadoopCatalog.dropTable(tableId, purge);
   }
 
+  @Override
+  public boolean dropTable(String dbName, String tblName, boolean purge) {
+    return hadoopCatalog.dropTable(TableIdentifier.of(dbName, tblName), purge);
+  }
+
   @Override
   public void renameTable(FeIcebergTable feTable, TableIdentifier newTableId) {
     TableIdentifier oldTableId = IcebergUtil.getIcebergTableIdentifier(feTable);
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
index a0c6d2df0..09934c0e6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHadoopTables.java
@@ -124,6 +124,12 @@ public class IcebergHadoopTables implements IcebergCatalog {
     return true;
   }
 
+  @Override
+  public boolean dropTable(String dbName, String tblName, boolean purge) {
+    throw new UnsupportedOperationException(
+        "Hadoop Tables doesn't support dropping table by name");
+  }
+
   @Override
   public void renameTable(FeIcebergTable feTable, TableIdentifier newTableId) {
     // HadoopTables no renameTable method in Iceberg
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
index ed1757b4a..40e01b028 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergHiveCatalog.java
@@ -102,6 +102,11 @@ public class IcebergHiveCatalog implements IcebergCatalog {
     return hiveCatalog_.dropTable(tableId, purge);
   }
 
+  @Override
+  public boolean dropTable(String dbName, String tblName, boolean purge) {
+    return hiveCatalog_.dropTable(TableIdentifier.of(dbName, tblName), purge);
+  }
+
   @Override
   public void renameTable(FeIcebergTable feTable, TableIdentifier newTableId) {
     TableIdentifier oldTableId = IcebergUtil.getIcebergTableIdentifier(feTable);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 35057b9f9..b4efe74ee 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4304,6 +4304,11 @@ public class CatalogOpExecutor {
           table.getFullName());
       return 0;
     }
+    if (table instanceof IcebergTable) {
+      LOG.info("EventId: {} Table {} is an Iceberg table. Partitioning is handled by " +
+          "Iceberg. Skipping add partitions", eventId, table.getFullName());
+      return 0;
+    }
     if(!(table instanceof HdfsTable)) {
       throw new CatalogException(
           "Partition event " + eventId + " received on a non-hdfs table");
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 9786f1856..18365487d 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -48,7 +48,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -133,6 +132,7 @@ import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.KuduTransactionManager;
 import org.apache.impala.common.NotImplementedException;
@@ -183,6 +183,7 @@ import org.apache.impala.thrift.TLineageGraph;
 import org.apache.impala.thrift.TLoadDataReq;
 import org.apache.impala.thrift.TLoadDataResp;
 import org.apache.impala.thrift.TMetadataOpRequest;
+import org.apache.impala.thrift.TConvertTableRequest;
 import org.apache.impala.thrift.TPlanExecInfo;
 import org.apache.impala.thrift.TPlanFragment;
 import org.apache.impala.thrift.TPoolConfig;
@@ -208,6 +209,7 @@ import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.KuduUtil;
+import org.apache.impala.util.MigrateTableUtil;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.RequestPoolService;
 import org.apache.impala.util.TResultRowBuilder;
@@ -913,6 +915,25 @@ public class Frontend {
     }
   }
 
+  /**
+   * Migrate external Hdfs tables to Iceberg tables.
+   */
+  public void convertTable(TExecRequest execRequest)
+      throws DatabaseNotFoundException, ImpalaRuntimeException, InternalException {
+    Preconditions.checkState(execRequest.isSetConvert_table_request());
+    TQueryOptions queryOptions = execRequest.query_options;
+    TConvertTableRequest convertTableRequest = execRequest.convert_table_request;
+    TTableName tableName = convertTableRequest.getHdfs_table_name();
+    FeTable table = getCatalog().getTable(tableName.getDb_name(),
+        tableName.getTable_name());
+    Preconditions.checkNotNull(table);
+    Preconditions.checkState(table instanceof FeFsTable);
+    try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
+      MigrateTableUtil.migrateToIcebergTable(client.getHiveClient(), convertTableRequest,
+          (FeFsTable) table, queryOptions);
+    }
+  }
+
   private TLoadDataResp doLoadTableData(TLoadDataReq request) throws ImpalaException,
       IOException {
     TableName tableName = TableName.fromThrift(request.getTable_name());
@@ -2394,6 +2415,13 @@ public class Frontend {
             new TColumn("summary", Type.STRING.toThrift()))));
         result.setAdmin_request(analysisResult.getAdminFnStmt().toThrift());
         return result;
+      } else if (analysisResult.isConvertTableToIcebergStmt()) {
+        result.stmt_type = TStmtType.CONVERT;
+        result.setResult_set_metadata(new TResultSetMetadata(
+            Collections.singletonList(new TColumn("summary", Type.STRING.toThrift()))));
+        result.setConvert_table_request(
+            analysisResult.getConvertTableToIcebergStmt().toThrift());
+        return result;
       } else if (analysisResult.isTestCaseStmt()) {
         CopyTestCaseStmt testCaseStmt = ((CopyTestCaseStmt) stmt);
         if (testCaseStmt.isTestCaseExport()) {
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 8d7860849..b672cb6a8 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -80,6 +80,7 @@ import org.apache.impala.thrift.TLoadDataReq;
 import org.apache.impala.thrift.TLoadDataResp;
 import org.apache.impala.thrift.TLogLevel;
 import org.apache.impala.thrift.TMetadataOpRequest;
+import org.apache.impala.thrift.TConvertTableRequest;
 import org.apache.impala.thrift.TQueryCompleteContext;
 import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TResultSet;
@@ -216,6 +217,15 @@ public class JniFrontend {
     }
   }
 
+  /**
+   * Jni wrapper for Frontend#convertTable(TConvertRequest).
+   */
+  public void convertTable(byte[] params) throws ImpalaException {
+    TExecRequest execRequest = new TExecRequest();
+    JniUtil.deserializeThrift(protocolFactory_, execRequest, params);
+    frontend_.convertTable(execRequest);
+  }
+
   /**
    * Return an explain plan based on thriftQueryContext, a serialized TQueryContext.
    * This call is thread-safe.
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java b/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
index ddb18f00f..78d8b4458 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
@@ -20,9 +20,17 @@ package org.apache.impala.util;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.apache.impala.analysis.IcebergPartitionField;
+import org.apache.impala.analysis.IcebergPartitionSpec;
+import org.apache.impala.analysis.IcebergPartitionTransform;
 import org.apache.impala.catalog.ArrayType;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.IcebergColumn;
@@ -35,6 +43,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnType;
+import org.apache.impala.thrift.TIcebergPartitionTransformType;
 
 /**
  * Utility class for converting between Iceberg and Impala schemas and types.
@@ -142,6 +151,21 @@ public class IcebergSchemaConverter {
     return ret;
   }
 
+  public static Schema convertToIcebergSchema(Table table) {
+    List<FieldSchema> columns = Lists.newArrayList(table.getSd().getCols());
+    columns.addAll(table.getPartitionKeys());
+    return HiveSchemaUtil.convert(columns, false);
+  }
+
+  public static PartitionSpec createIcebergPartitionSpec(Table table,
+      Schema schema) {
+    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema);
+    for (FieldSchema partitionKey : table.getPartitionKeys()) {
+      specBuilder.identity(partitionKey.getName());
+    }
+    return specBuilder.build();
+  }
+
   /**
    * Generates Iceberg schema from given columns. It also assigns a unique 'field id' for
    * each schema element, although Iceberg will reassign the ids.
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 3378d635a..a1e4729d7 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -101,12 +101,14 @@ import org.apache.impala.thrift.TIcebergPartitionTransformType;
 
 @SuppressWarnings("UnstableApiUsage")
 public class IcebergUtil {
+
   private static final int ICEBERG_EPOCH_YEAR = 1970;
   private static final int ICEBERG_EPOCH_MONTH = 1;
   @SuppressWarnings("unused")
   private static final int ICEBERG_EPOCH_DAY = 1;
   @SuppressWarnings("unused")
   private static final int ICEBERG_EPOCH_HOUR = 0;
+  public static final String HIVE_CATALOG = "hive.catalog";
 
   /**
    * Returns the corresponding catalog implementation for 'feTable'.
@@ -277,7 +279,7 @@ public class IcebergUtil {
       return TIcebergCatalog.HADOOP_TABLES;
     } else if ("hadoop.catalog".equalsIgnoreCase(catalog)) {
       return TIcebergCatalog.HADOOP_CATALOG;
-    } else if ("hive.catalog".equalsIgnoreCase(catalog) ||
+    } else if (HIVE_CATALOG.equalsIgnoreCase(catalog) ||
                catalog == null) {
       return TIcebergCatalog.HIVE_CATALOG;
     }
diff --git a/fe/src/main/java/org/apache/impala/util/MigrateTableUtil.java b/fe/src/main/java/org/apache/impala/util/MigrateTableUtil.java
new file mode 100644
index 000000000..443233d18
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/MigrateTableUtil.java
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import static org.apache.impala.catalog.Table.TBL_PROP_EXTERNAL_TABLE_PURGE;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.impala.analysis.TableName;
+import org.apache.impala.catalog.FeCatalog;
+import org.apache.impala.catalog.FeFsPartition;
+import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.IcebergTable;
+import org.apache.impala.catalog.iceberg.IcebergCatalog;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TConvertTableRequest;
+import org.apache.impala.thrift.TIcebergCatalog;
+import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.thrift.TTableName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Executes the migration of a legacy Hive table to Iceberg table.
+ *
+ * This is an in-place migration where an Iceberg table is created on the same location
+ * as the Hive table and the existing files are appended to the Iceberg table. The file
+ * metadata creation is done using TableMigrationUtil from Iceberg.
+ */
+public class MigrateTableUtil {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MigrateTableUtil.class);
+  private static final long RETRY_TIMEOUT_MS = 3600000; // 1 hour
+  private static final int RETRY_DELAY_MS = 300;
+
+  private MigrateTableUtil() { }
+
+  /**
+   * Create an external Iceberg table using the data of the HDFS table.
+   */
+  public static void migrateToIcebergTable(
+      IMetaStoreClient hmsClient,
+      TConvertTableRequest request,
+      FeFsTable table,
+      TQueryOptions queryOptions) throws ImpalaRuntimeException {
+    LOG.info("Migrating table to Iceberg: " + table.getFullName());
+    Schema schema =
+        IcebergSchemaConverter.convertToIcebergSchema(table.getMetaStoreTable());
+    PartitionSpec spec = IcebergSchemaConverter.createIcebergPartitionSpec(
+        table.getMetaStoreTable(), schema);
+    String fileFormat = getFileFormat(table.getMetaStoreTable().getSd());
+    Preconditions.checkNotNull(fileFormat);
+    Map<String, String> props = Maps.newHashMap(request.getProperties());
+    props.put(IcebergTable.ICEBERG_FILE_FORMAT, fileFormat);
+    if (IcebergUtil.isHiveCatalog(props)) {
+      props.put(TBL_PROP_EXTERNAL_TABLE_PURGE, "true");
+    }
+    String location = table.getLocation();
+    TIcebergCatalog tCatalog = IcebergUtil.getTIcebergCatalog(props);
+    IcebergCatalog catalog = IcebergUtil.getIcebergCatalog(tCatalog, location);
+    TTableName name = request.getTable_name();
+    TableIdentifier id = TableIdentifier.of(name.getDb_name(), name.getTable_name());
+
+    Table icebergTable = catalog.createTable(id, schema, spec, location, props);
+    Preconditions.checkNotNull(icebergTable);
+
+    TableName tableName = TableName.fromThrift(name);
+    try {
+      if (IcebergUtil.isHiveCatalog(props)) {
+        waitForTableToBeCreated(hmsClient , tableName);
+      }
+
+      importDataFilesInHdfsTable(table, icebergTable, queryOptions);
+    } catch (Exception e) {
+      // If the migration failed for some reason, we clean up the Iceberg table.
+      if (IcebergUtil.isHiveCatalog(props)) {
+        // For tables in Hive Catalog we have to use the Iceberg API to make sure it's
+        // removed from HMS. Using purge='false' to guarantee that the files remain on
+        // disk.
+        catalog.dropTable(tableName.getDb(), tableName.getTbl(), false);
+      }
+      // We drop the metadata folder, otherwise running MIGRATE TABLE the next time could
+      // fail because Iceberg might think that there is already an existing Iceberg table.
+      Path metadataPath = new Path(location, IcebergTable.METADATA_FOLDER_NAME);
+      FileSystemUtil.deleteIfExists(metadataPath);
+
+      throw new ImpalaRuntimeException("Failed to import data into Iceberg table\n", e);
+    }
+  }
+
+  private static void waitForTableToBeCreated(IMetaStoreClient hmsClient,
+      TableName tableName) throws ImpalaRuntimeException {
+    if (getHmsTableNoThrow(hmsClient, tableName.getDb(), tableName.getTbl()) != null) {
+      return;
+    }
+    long att = 0;
+    try (ThreadNameAnnotator nameAnnotator = new ThreadNameAnnotator(
+        "waiting for " + tableName + " to be created")) {
+      long begin = System.currentTimeMillis();
+      long end;
+      do {
+        try {
+          Thread.sleep(RETRY_DELAY_MS);
+          LOG.info("Waiting for " + tableName + " to be created, attempt: " + ++att);
+        } catch (InterruptedException e) {
+          // Ignore
+        }
+        end = System.currentTimeMillis();
+      } while (getHmsTableNoThrow(hmsClient, tableName.getDb(),
+          tableName.getTbl()) == null && (end - begin < RETRY_TIMEOUT_MS));
+    }
+
+    if (getHmsTableNoThrow(hmsClient, tableName.getDb(), tableName.getTbl()) == null) {
+      throw new ImpalaRuntimeException("Failed to wait for " + tableName +
+          " to be created");
+    }
+  }
+
+  private static org.apache.hadoop.hive.metastore.api.Table getHmsTableNoThrow(
+      IMetaStoreClient hiveClient, String dbName, String tblName) {
+    try {
+      return hiveClient.getTable(dbName, tblName);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  public static String getFileFormat(StorageDescriptor sd) {
+    for (String fileFormat : ImmutableList.of(FileFormat.PARQUET.name().toLowerCase(),
+        FileFormat.ORC.name().toLowerCase(),
+        FileFormat.AVRO.name().toLowerCase())) {
+      if (sd.getInputFormat().toLowerCase().contains(fileFormat)) {
+        return fileFormat;
+      }
+    }
+    return null;
+  }
+
+  private static void importDataFilesInHdfsTable(FeFsTable hdfsTable, Table icebergTable,
+      TQueryOptions queryOptions) throws ImpalaRuntimeException {
+    Params params = Params.of(
+        hdfsTable.getMetaStoreTable().getSd().getInputFormat(),
+        icebergTable.spec(),
+        icebergTable.schema(),
+        MetricsConfig.forTable(icebergTable),
+        getDegreeOfParallelism(queryOptions),
+        icebergTable.newAppend(),
+        getDebugAction(queryOptions));
+
+    if (hdfsTable.isPartitioned()) {
+      importDataFiles(hdfsTable, params);
+    } else {
+      importDataFiles(hdfsTable.getMetaStoreTable().getSd().getLocation(), params);
+    }
+    params.append_.commit();
+  }
+
+  private static void importDataFiles(FeFsTable hdfsTable, Params params)
+      throws ImpalaRuntimeException {
+    List<? extends FeFsPartition> partitions =
+        hdfsTable.loadPartitions(hdfsTable.getPartitionIds());
+    for (FeFsPartition part : partitions) {
+      String partitionName = part.getPartitionName();
+      Map<String, String> partitionKeys = Collections.emptyMap();
+      try {
+        partitionKeys = Warehouse.makeSpecFromName(partitionName);
+      } catch (MetaException e) {
+        throw new ImpalaRuntimeException(
+            "Unable to create partition keys for " + partitionName, e);
+      }
+
+      importDataFilesImpl(partitionKeys, part.getLocationPath(), params);
+    }
+  }
+
+  private static void importDataFiles(String location, Params params)
+          throws ImpalaRuntimeException {
+    importDataFilesImpl(Collections.emptyMap(), new Path(location), params);
+  }
+
+  private static void importDataFilesImpl(Map<String, String> partitionKeys,
+      Path location, Params params) throws ImpalaRuntimeException {
+    try {
+      LOG.info("Creating Iceberg metadata for folder: " + location.toString() + " using "
+          + params.threadNum_ + " thread(s).");
+
+      if (params.debugAction_.equalsIgnoreCase("CONVERT_TABLE_FAIL_ICEBERG_CALL")) {
+        throw new IllegalArgumentException("Exception thrown by debug action.");
+      }
+
+      List<DataFile> dataFiles = TableMigrationUtil.listPartition(
+              partitionKeys,
+              location.toString(),
+              params.format_,
+              params.spec_,
+              FileSystemUtil.getConfiguration(),
+              params.metricsConfig_,
+              null, // NameMapping mapping
+              params.threadNum_);
+
+      dataFiles.forEach(params.append_::appendFile);
+    } catch (Exception e) {
+      throw new ImpalaRuntimeException(
+          "Unable load data files for location: " + location.toString(), e);
+    }
+  }
+
+  private static int getDegreeOfParallelism(TQueryOptions queryOptions) {
+    int threadNum = Runtime.getRuntime().availableProcessors();
+    if (queryOptions.isSetNum_threads_for_table_migration() &&
+        queryOptions.num_threads_for_table_migration > 0) {
+      threadNum = Math.min(queryOptions.num_threads_for_table_migration,
+          Runtime.getRuntime().availableProcessors());
+    }
+    return threadNum;
+  }
+
+  private static String getDebugAction(TQueryOptions queryOptions) {
+    if (!queryOptions.isSetDebug_action()) return "";
+    return queryOptions.getDebug_action();
+  }
+
+  private static class Params {
+    final String format_;
+    final PartitionSpec spec_;
+    final Schema schema_;
+    final MetricsConfig metricsConfig_;
+    final int threadNum_;
+    final AppendFiles append_;
+    final String debugAction_;
+
+    private Params(String format, PartitionSpec spec, Schema schema,
+        MetricsConfig metricsConfig, int threadNum, AppendFiles append,
+        String debugAction) {
+      format_ = format;
+      spec_ = spec;
+      schema_ = schema;
+      metricsConfig_ = metricsConfig;
+      threadNum_ = threadNum;
+      append_ = append;
+      debugAction_ = debugAction;
+    }
+
+    static Params of(String format, PartitionSpec spec, Schema schema,
+        MetricsConfig metricsConfig, int threadNum, AppendFiles append,
+        String debugAction) {
+      return new Params(
+          format,
+          spec,
+          schema,
+          metricsConfig,
+          threadNum,
+          append,
+          debugAction);
+    }
+  }
+}
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index ef01747c0..45cf79da9 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -99,6 +99,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("compression", SqlParserSymbols.KW_COMPRESSION);
     keywordMap.put("compute", SqlParserSymbols.KW_COMPUTE);
     keywordMap.put("constraint", SqlParserSymbols.KW_CONSTRAINT);
+    keywordMap.put("convert", SqlParserSymbols.KW_CONVERT);
     keywordMap.put("copy", SqlParserSymbols.KW_COPY);
     keywordMap.put("create", SqlParserSymbols.KW_CREATE);
     keywordMap.put("cross", SqlParserSymbols.KW_CROSS);
@@ -592,4 +593,4 @@ EndOfLineComment = "--" !({HintContent}|{ContainsLineTerminator}) {LineTerminato
 
 // Provide a default error token when nothing matches, otherwise the user sees
 // "Error: could not match input" which is confusing.
-[^] { return newToken(SqlParserSymbols.UNEXPECTED_CHAR, yytext()); }
+[^] { return newToken(SqlParserSymbols.UNEXPECTED_CHAR, yytext()); }
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index c453c0aef..8307dc6b0 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -5184,4 +5184,24 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
         "Selectivity hints are ignored for 'AND' compound predicates, either in the SQL "
             + "query or internally generated.");
   }
+
+  @Test
+  public void TestConvertTable() {
+    AnalyzesOk("alter table functional_parquet.alltypes convert to iceberg");
+    AnalyzesOk("alter table functional_parquet.alltypes convert to iceberg"
+            + " tblproperties('iceberg.catalog'='hadoop.tables')");
+    AnalyzesOk("alter table functional_parquet.alltypes convert to iceberg"
+            + " tblproperties('iceberg.catalog'='hive.catalog')");
+    AnalysisError("alter table functional_parquet.alltypes convert to iceberg"
+            + " tblproperties('iceberg.catalog'='hadoop.catalog')",
+        "The Hadoop Catalog is not supported because the location may change");
+    AnalysisError("alter table functional_kudu.alltypes convert to iceberg",
+        "CONVERT TO ICEBERG is not supported for KuduTable");
+    AnalysisError("alter table functional.alltypes convert to iceberg",
+        "CONVERT TO ICEBERG is not supported for " +
+        "org.apache.hadoop.mapred.TextInputFormat");
+    AnalysisError("alter table functional_parquet.alltypes convert to iceberg"
+            + " tblproperties('metadata.generator.threads'='a1')",
+        "CONVERT TO ICEBERG only accepts 'iceberg.catalog' as TBLPROPERTY.");
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index a38405e02..e7c5214ff 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -4420,4 +4420,4 @@ public class ParserTest extends FrontendTestBase {
     ParsesOk("--test\nSELECT 1\n");
     ParsesOk("--test\nSELECT 1\n  ");
   }
-}
+}
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrate-from-external-hdfs-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrate-from-external-hdfs-tables.test
new file mode 100644
index 000000000..a2918744f
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrate-from-external-hdfs-tables.test
@@ -0,0 +1,355 @@
+====
+---- QUERY
+create table alltypes (
+    id int,
+    bool_col boolean,
+    tinyint_col int,
+    smallint_col int,
+    int_col int,
+    bigint_col bigint,
+    float_col float,
+    double_col double,
+    string_col string,
+    timestamp_col timestamp)
+partitioned by (year int, month int, date_col date, date_string_col string)
+stored as parquet;
+insert into alltypes partition (year, month, date_col, date_string_col)
+    select
+        id,
+        bool_col,
+        cast(tinyint_col as int) as tinyint_col,
+        cast(smallint_col as int) as smallint_col,
+        int_col,
+        bigint_col,
+        float_col,
+        double_col,
+        string_col,
+        timestamp_col,
+        year,
+        month,
+        cast(date_string_col as date format 'MM/DD/YY') as date_col,
+        # removing '/' until Iceberg issue #7612 is fixed
+        replace(date_string_col, "/", "")
+    from functional.alltypes t;
+insert into alltypes partition (year, month, date_col, date_string_col)
+values (10000, true, 1, 2, 3, 4, 5.1, 6.2, "str", "2023-05-01 01:02:03", 2023, 5,
+    cast("2023-05-02" as date) as date_col, null as date_string_col);
+describe alltypes;
+---- RESULTS
+'id','int',regex:'.*'
+'bool_col','boolean',regex:'.*'
+'tinyint_col','int',regex:'.*'
+'smallint_col','int',regex:'.*'
+'int_col','int',regex:'.*'
+'bigint_col','bigint',regex:'.*'
+'float_col','float',regex:'.*'
+'double_col','double',regex:'.*'
+'string_col','string',regex:'.*'
+'timestamp_col','timestamp',regex:'.*'
+'year','int',regex:'.*'
+'month','int',regex:'.*'
+'date_col','date',regex:'.*'
+'date_string_col','string',regex:'.*'
+---- TYPES
+string,string,string
+====
+---- QUERY
+create table parquet_partitioned like alltypes stored as parquet;
+insert into parquet_partitioned partition(year, month, date_col, date_string_col)
+    select * from alltypes;
+select count(*) from parquet_partitioned;
+---- RESULTS
+7301
+---- TYPES
+bigint
+====
+---- QUERY
+describe formatted parquet_partitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/parquet_partitioned','NULL'
+'SerDe Library:      ','org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe','NULL'
+'InputFormat:        ','org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat','NULL'
+'OutputFormat:       ','org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+alter table parquet_partitioned convert to iceberg tblproperties('iceberg.catalog' = 'hadoop.tables');
+---- RESULTS
+'Table has been migrated.'
+====
+---- QUERY
+select count(*) from parquet_partitioned;
+---- RESULTS
+7301
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+describe formatted parquet_partitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/parquet_partitioned','NULL'
+'','iceberg.catalog     ','hadoop.tables       '
+'','external.table.purge','true                '
+'','storage_handler     ','org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
+'','write.format.default','parquet             '
+'SerDe Library:      ','org.apache.iceberg.mr.hive.HiveIcebergSerDe','NULL'
+'InputFormat:        ','org.apache.iceberg.mr.hive.HiveIcebergInputFormat','NULL'
+'OutputFormat:       ','org.apache.iceberg.mr.hive.HiveIcebergOutputFormat','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+create table alltypesnopart like functional.alltypesnopart stored as parquet;
+alter table alltypesnopart change column smallint_col smallint_col int comment 'changed to int col';
+alter table alltypesnopart change column tinyint_col tinyint_col int comment 'changed to int col';
+describe alltypesnopart;
+---- RESULTS
+'id','int',regex:'.*'
+'bool_col','boolean',regex:'.*'
+'tinyint_col','int',regex:'.*'
+'smallint_col','int',regex:'.*'
+'int_col','int',regex:'.*'
+'bigint_col','bigint',regex:'.*'
+'float_col','float',regex:'.*'
+'double_col','double',regex:'.*'
+'date_string_col','string',regex:'.*'
+'string_col','string',regex:'.*'
+'timestamp_col','timestamp',regex:'.*'
+---- TYPES
+string,string,string
+====
+---- QUERY
+create table parquet_nopartitioned like alltypesnopart stored as parquet;
+insert into parquet_nopartitioned
+select id,
+       bool_col,
+       tinyint_col,
+       smallint_col,
+       int_col,
+       bigint_col,
+       float_col,
+       double_col,
+       date_string_col,
+       string_col,
+       timestamp_col
+from alltypes;
+select count(*) from parquet_nopartitioned;
+---- RESULTS
+7301
+---- TYPES
+bigint
+====
+---- QUERY
+describe formatted parquet_nopartitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/parquet_nopartitioned','NULL'
+'SerDe Library:      ','org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe','NULL'
+'InputFormat:        ','org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat','NULL'
+'OutputFormat:       ','org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+alter table parquet_nopartitioned convert to iceberg tblproperties('iceberg.catalog' = 'hadoop.catalog');
+---- CATCH
+AnalysisException: The Hadoop Catalog is not supported because the location may change
+====
+---- QUERY
+alter table parquet_nopartitioned convert to iceberg;
+---- RESULTS
+'Table has been migrated.'
+====
+---- QUERY
+select count(*) from parquet_nopartitioned;
+---- RESULTS
+7301
+---- TYPES
+bigint
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 0
+====
+---- QUERY
+describe formatted parquet_nopartitioned;
+---- RESULTS: VERIFY_IS_NOT_IN
+'','iceberg.catalog     ','hadoop.tables       '
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/parquet_nopartitioned','NULL'
+row_regex: '','metadata_location   ','$NAMENODE/test-warehouse/$DATABASE.db/parquet_nopartitioned/metadata/.*.metadata.json'
+'','external.table.purge','true                '
+'','storage_handler     ','org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
+'','write.format.default','parquet             '
+'SerDe Library:      ','org.apache.iceberg.mr.hive.HiveIcebergSerDe','NULL'
+'InputFormat:        ','org.apache.iceberg.mr.hive.HiveIcebergInputFormat','NULL'
+'OutputFormat:       ','org.apache.iceberg.mr.hive.HiveIcebergOutputFormat','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+create table hdfs_table (col int);
+alter table hdfs_table set tblproperties ('EXTERNAL'='FALSE');
+alter table hdfs_table convert to iceberg;
+---- CATCH
+AnalysisException: CONVERT TO ICEBERG is not supported for managed tables
+====
+---- QUERY
+alter table hdfs_table set tblproperties ('EXTERNAL'='TRUE', 'transactional'='true', 'transactional_properties'='insert_only');
+alter table hdfs_table convert to iceberg;
+---- CATCH
+AnalysisException: CONVERT TO ICEBERG is not supported for transactional tables
+====
+---- QUERY
+# Check that we get an error when converting a table that has column type(s) that is invalid in Iceberg.
+create table hdfs_table2 (col tinyint) stored as parquet;
+alter table hdfs_table2 convert to iceberg;
+---- CATCH
+Unsupported Hive type: BYTE, use integer instead
+====
+---- QUERY
+# Test table migration for decimal partitioned table.
+create table decimal_tbl (
+    d2 decimal(10,0),
+    d3 decimal(20,10),
+    d4 decimal(38,38),
+    d5 decimal(10,5),
+    d6 decimal(9,0))
+partitioned by (d1 decimal(9,0))
+stored as parquet;
+insert into decimal_tbl partition (d1)
+    select d2, d3, d4, d5, d6, d1 from functional_parquet.decimal_tbl;
+describe formatted decimal_tbl;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/decimal_tbl','NULL'
+'SerDe Library:      ','org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe','NULL'
+'InputFormat:        ','org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat','NULL'
+'OutputFormat:       ','org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat','NULL'
+---- TYPES
+string,string,string
+====
+---- QUERY
+alter table decimal_tbl convert to iceberg;
+---- RESULTS
+'Table has been migrated.'
+====
+---- QUERY
+describe formatted decimal_tbl;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/decimal_tbl','NULL'
+row_regex: '','metadata_location   ','$NAMENODE/test-warehouse/$DATABASE.db/decimal_tbl/metadata/.*.metadata.json'
+'','external.table.purge','true                '
+'','storage_handler     ','org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
+'','write.format.default','parquet             '
+'SerDe Library:      ','org.apache.iceberg.mr.hive.HiveIcebergSerDe','NULL'
+'InputFormat:        ','org.apache.iceberg.mr.hive.HiveIcebergInputFormat','NULL'
+'OutputFormat:       ','org.apache.iceberg.mr.hive.HiveIcebergOutputFormat','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+# Test table migration when table is at a different location than what the table name
+# would imply.
+create table table_at_random_location (i int, s string)
+    stored as parquet
+    location '$NAMENODE/test-warehouse/$DATABASE.db/random_location/';
+insert into table_at_random_location values (1, "str1"), (2, "str2"), (3, "str3");
+select * from table_at_random_location
+---- RESULTS
+1,'str1'
+2,'str2'
+3,'str3'
+---- TYPES
+int, string
+====
+---- QUERY
+describe formatted table_at_random_location;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/random_location','NULL'
+'SerDe Library:      ','org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe','NULL'
+'InputFormat:        ','org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat','NULL'
+'OutputFormat:       ','org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+alter table table_at_random_location convert to iceberg;
+---- RESULTS
+'Table has been migrated.'
+====
+---- QUERY
+describe formatted table_at_random_location;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE.db/random_location','NULL'
+row_regex: '','metadata_location   ','$NAMENODE/test-warehouse/$DATABASE.db/random_location/metadata/.*.metadata.json'
+'','external.table.purge','true                '
+'','storage_handler     ','org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
+'','write.format.default','parquet             '
+'SerDe Library:      ','org.apache.iceberg.mr.hive.HiveIcebergSerDe','NULL'
+'InputFormat:        ','org.apache.iceberg.mr.hive.HiveIcebergInputFormat','NULL'
+'OutputFormat:       ','org.apache.iceberg.mr.hive.HiveIcebergOutputFormat','NULL'
+---- TYPES
+string, string, string
+====
+---- QUERY
+select * from table_at_random_location;
+---- RESULTS
+1,'str1'
+2,'str2'
+3,'str3'
+---- TYPES
+int, string
+====
+---- QUERY
+# Currently not feasible to convert directly into a V2 Iceberg table.
+create table converted_into_v2 (i int) partitioned by (s string) stored as parquet;
+alter table converted_into_v2 convert to iceberg tblproperties ('format-version'='2');
+---- CATCH
+AnalysisException: CONVERT TO ICEBERG only accepts 'iceberg.catalog' as TBLPROPERTY.
+====
+---- QUERY
+create table simple_tbl (i int) stored as parquet;
+set debug_action="CONVERT_TABLE_FAIL_ICEBERG_CALL";
+alter table simple_tbl convert to iceberg;
+---- CATCH
+ImpalaRuntimeException: Unable load data files for location:
+====
+---- QUERY
+create table special_chars (i int) partitioned by (s string) stored as parquet;
+insert into special_chars partition (s='11 22-33&44%55"') values (1);
+insert into special_chars partition (s='aa - bb') values (2);
+insert into special_chars partition (s=null) values (3);
+alter table special_chars convert to iceberg;
+---- RESULTS
+'Table has been migrated.'
+====
+---- QUERY
+select * from special_chars;
+---- RESULTS
+1,'11 22-33&44%55"'
+2,'aa - bb'
+3,'NULL'
+---- TYPES
+int, string
+====
+---- QUERY
+select * from special_chars where s='aa - bb';
+---- RESULTS
+2,'aa - bb'
+---- TYPES
+int, string
+====
+---- QUERY
+select * from special_chars where s is null;
+---- RESULTS
+3,'NULL'
+---- TYPES
+int, string
+====
+---- QUERY
+create table special_chars_with_slash (i int) partitioned by (s1 string, s2 string) stored as parquet;
+insert into special_chars_with_slash partition (s1='abcde', s2='11/22/33') values (1);
+alter table special_chars_with_slash convert to iceberg;
+---- CATCH
+AnalysisException: Can't migrate table with '/' in the partition values until Iceberg #7612 is fixed. '11/22/33'
+====
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 949fd4cf1..ae5a39065 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -1932,6 +1932,100 @@ class TestRanger(CustomClusterTestSuite):
       admin_client.execute("drop database if exists {0} cascade".format(unique_database),
                            user=ADMIN)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
+  def test_convert_table_to_iceberg(self, unique_name):
+    """Test that autorization is taken into account when performing a table migration to
+    Iceberg."""
+    user = getuser()
+    admin_client = self.create_impala_client()
+    non_admin_client = self.create_impala_client()
+    unique_database = unique_name + "_db"
+    tbl_name = unique_database + "." + "hive_tbl_to_convert"
+
+    try:
+      admin_client.execute("drop database if exists {0} cascade"
+                           .format(unique_database), user=ADMIN)
+      admin_client.execute("create database {0}".format(unique_database), user=ADMIN)
+
+      # create table using admin user.
+      admin_client.execute("create table {0} (a int, b string) stored as parquet".format(
+          tbl_name), user=ADMIN)
+      admin_client.execute("insert into {0} values (1, 'one')".format(tbl_name),
+                           user=ADMIN)
+
+      try:
+        # non-admin user can't convert table by default.
+        result = self.execute_query_expect_failure(
+            non_admin_client, "alter table {0} convert to iceberg".format(tbl_name),
+            user=user)
+        assert "User '{0}' does not have privileges to access: {1}".format(
+            user, unique_database) in str(result)
+
+        # Grant ALL privileges on the table for non-admin user. Even with this the query
+        # should fail as we expect DB level ALL privileges for table migration. Once
+        # https://issues.apache.org/jira/browse/IMPALA-12190 is fixed, this should also
+        # pass with table-level ALL privileges.
+        admin_client.execute("grant all on table {0} to user {1}".format(tbl_name, user),
+            user=ADMIN)
+        result = self.execute_query_expect_failure(
+            non_admin_client, "alter table {0} convert to iceberg".format(tbl_name),
+            user=user)
+        assert "User '{0}' does not have privileges to access: {1}".format(
+            user, unique_database) in str(result)
+
+        # After granting ALL privileges on the DB, the table migration should succeed.
+        admin_client.execute("grant all on database {0} to user {1}"
+            .format(unique_database, user), user=ADMIN)
+        self.execute_query_expect_success(
+            non_admin_client, "alter table {0} convert to iceberg".format(tbl_name),
+            user=user)
+
+        result = non_admin_client.execute("describe formatted {0}".format(tbl_name),
+            user=user)
+        assert "org.apache.iceberg.mr.hive.HiveIcebergSerDe" in str(result)
+        assert "org.apache.iceberg.mr.hive.HiveIcebergInputFormat" in str(result)
+        assert "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat" in str(result)
+      finally:
+        # Revoke privileges
+        admin_client.execute("revoke all on table {0} from user {1}"
+                            .format(tbl_name, user), user=ADMIN)
+        admin_client.execute("revoke all on database {0} from user {1}"
+                            .format(unique_database, user), user=ADMIN)
+
+      tbl_name2 = unique_database + "." + "hive_tbl_to_convert2"
+      # create table using admin user.
+      admin_client.execute("create table {0} (a int, b string) stored as parquet".format(
+          tbl_name2), user=ADMIN)
+      admin_client.execute("insert into {0} values (1, 'one')".format(tbl_name2),
+                           user=ADMIN)
+
+      try:
+        admin_client.execute("grant all on table {0} to user {1}"
+                             .format(tbl_name2, user), user=ADMIN)
+        result = self.execute_query_expect_success(
+            non_admin_client, "select count(*) from {0}".format(tbl_name2), user=user)
+        assert result.get_data() == "1"
+
+        # Migrates the table by admin and checks if the non-admin usert still has
+        # privileges.
+        self.execute_query_expect_success(
+            admin_client, "alter table {0} convert to iceberg".format(tbl_name2),
+            user=ADMIN)
+
+        result = self.execute_query_expect_success(
+            non_admin_client, "select count(*) from {0}".format(tbl_name2), user=user)
+        assert result.get_data() == "1"
+      finally:
+        # Revoke privileges
+        admin_client.execute("revoke all on table {0} from user {1}"
+                            .format(tbl_name2, user), user=ADMIN)
+
+    finally:
+      admin_client.execute("drop database if exists {0} cascade".format(unique_database),
+                           user=ADMIN)
+
   @pytest.mark.execute_serially
   @SkipIfFS.hive
   @SkipIfHive2.ranger_auth
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 273faa5f8..831f70c0a 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1102,6 +1102,10 @@ class TestIcebergTable(IcebergTestSuite):
   def test_avro_file_format(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-avro', vector, unique_database)
 
+  def test_convert_table(self, vector, unique_database):
+      self.run_test_case('QueryTest/iceberg-migrate-from-external-hdfs-tables',
+                         vector, unique_database)
+
 
 class TestIcebergV2Table(IcebergTestSuite):
   """Tests related to Iceberg V2 tables."""