You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/12/15 14:11:09 UTC

[impala] 01/02: IMPALA-11339: Add Iceberg LOAD DATA INPATH statement

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 05a4b778d395c8813988610b78b71bcd920be037
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Mon Nov 28 12:07:00 2022 +0100

    IMPALA-11339: Add Iceberg LOAD DATA INPATH statement
    
    Extend LOAD DATA INPATH statement to support Iceberg tables. Native
    parquet tables need Iceberg field ids, therefore to add files this
    change uses child queries to load and rewrite the data. The child
    queries create > insert > drop the temporary table over the specified
    directory.
    
    The create part depends on LIKE PARQUET/ORC clauses to infer the file
    format. This requires identifying a file in the directory and using that
    to create the temporary table.
    
    The target file or directory is moved to a staging directory before
    ingestion similar to native file formats. In case of a query failure the
    files are moved back to the original location. Child query executor will
    return the error message of the failing query and the child query
    profiles will be available through the WebUI.
    
    At this point the PARTITION clause it not supported because it would
    require analysis of the PartitionSpec (IMPALA-11750).
    
    Testing:
     - Added e2e tests
     - Added fe unit tests
    
    Change-Id: I8499945fa57ea0499f65b455976141dcd6d789eb
    Reviewed-on: http://gerrit.cloudera.org:8080/19145
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc             |  53 +++++++
 be/src/service/client-request-state.h              |   3 +
 common/thrift/Frontend.thrift                      |  23 +++
 .../org/apache/impala/analysis/LoadDataStmt.java   | 105 ++++++++++++--
 .../apache/impala/analysis/QueryStringBuilder.java | 155 +++++++++++++++++++++
 .../org/apache/impala/common/FileSystemUtil.java   |  22 ++-
 .../java/org/apache/impala/service/Frontend.java   |  50 ++++++-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |  16 +++
 .../queries/QueryTest/iceberg-load.test            | 142 +++++++++++++++++++
 tests/query_test/test_iceberg.py                   |  52 ++++++-
 10 files changed, 607 insertions(+), 14 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index f03a454f1..a641600b9 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -789,6 +789,9 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
 
   TLoadDataResp response;
   Status status = frontend_->LoadData(exec_request_->load_data_request, &response);
+  if (exec_request_->load_data_request.iceberg_tbl) {
+    ExecLoadIcebergDataRequestImpl(response);
+  }
   {
     lock_guard<mutex> l(lock_);
     RETURN_VOID_IF_ERROR(UpdateQueryStatus(status));
@@ -838,6 +841,56 @@ void ClientRequestState::ExecLoadDataRequestImpl(bool exec_in_worker_thread) {
   }
 }
 
+void ClientRequestState::ExecLoadIcebergDataRequestImpl(TLoadDataResp response) {
+  TLoadDataReq load_data_req = exec_request_->load_data_request;
+  RuntimeProfile* child_profile =
+      RuntimeProfile::Create(&profile_pool_, "Child Queries");
+  profile_->AddChild(child_profile);
+  // Add child queries for computing table and column stats.
+  vector<ChildQuery> child_queries;
+  // Prepare CREATE
+  RuntimeProfile* create_profile =
+      RuntimeProfile::Create(&profile_pool_, "Create table query");
+  child_profile->AddChild(create_profile);
+  child_queries.emplace_back(response.create_tmp_tbl_query, this, parent_server_,
+      create_profile, &profile_pool_);
+  // Prepare INSERT
+  RuntimeProfile* insert_profile =
+      RuntimeProfile::Create(&profile_pool_, "Insert query");
+  child_profile->AddChild(insert_profile);
+  child_queries.emplace_back(load_data_req.insert_into_dst_tbl_query, this,
+      parent_server_, insert_profile, &profile_pool_);
+  // Prepare DROP
+  RuntimeProfile* drop_profile =
+      RuntimeProfile::Create(&profile_pool_, "Drop table query");
+  child_profile->AddChild(drop_profile);
+  child_queries.emplace_back(load_data_req.drop_tmp_tbl_query, this,
+      parent_server_, drop_profile, &profile_pool_);
+  // Execute queries
+  RETURN_VOID_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries)));
+  vector<ChildQuery*>* completed_queries = new vector<ChildQuery*>();
+  Status query_status = child_query_executor_->WaitForAll(completed_queries);
+  hdfsFS fs = hdfsConnect("default", 0);
+  if (query_status.ok()) {
+    if (hdfsDelete(fs, response.create_location.c_str(), 1)) {
+      Status hdfs_ret("Failed to remove staging data under '" + response.create_location
+          + "' after query failure: " + query_status.msg().msg());
+      lock_guard<mutex> l(lock_);
+      RETURN_VOID_IF_ERROR(UpdateQueryStatus(hdfs_ret));
+    }
+  } else {
+    for (string path : response.loaded_files) {
+      if (hdfsMove(fs, path.c_str(), fs, load_data_req.source_path.c_str())) {
+        Status hdfs_ret("Failed to revert data movement, some files might still be in '"
+            + response.create_location + "' after query failure: "
+            + query_status.msg().msg());
+        lock_guard<mutex> l(lock_);
+        RETURN_VOID_IF_ERROR(UpdateQueryStatus(hdfs_ret));
+      }
+    }
+  }
+}
+
 
 Status ClientRequestState::ExecLoadDataRequest() {
   if (exec_request_->query_options.enable_async_load_data_execution) {
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 69430df16..c0458c72b 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -716,6 +716,9 @@ class ClientRequestState {
   /// Core logic of executing a load data statement.
   void ExecLoadDataRequestImpl(bool exec_in_worker_thread);
 
+  /// Executes LOAD DATA related sub-queries for Iceberg tables.
+  void ExecLoadIcebergDataRequestImpl(TLoadDataResp response);
+
   /// Executes a shut down request.
   Status ExecShutdownRequest() WARN_UNUSED_RESULT;
 
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 23ffd69ba..d4aa0104a 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -372,6 +372,22 @@ struct TLoadDataReq {
   // An optional partition spec. Set if this operation should apply to a specific
   // partition rather than the base table.
   4: optional list<CatalogObjects.TPartitionKeyValue> partition_spec
+
+  // True if the destination table is an Iceberg table, in this case we need to insert
+  // data to the Iceberg table based on the given files.
+  5: optional bool iceberg_tbl
+
+  // For Iceberg data load. Query template to create a temporary with table location
+  // pointing to the new files. The table location is unknown during planning, these are
+  // filled during execution.
+  6: optional string create_tmp_tbl_query_template
+
+  // For Iceberg data load. Query to insert into the destination table from the
+  // temporary table.
+  7: optional string insert_into_dst_tbl_query
+
+  // For Iceberg data load. Query to drop the temporary table.
+  8: optional string drop_tmp_tbl_query
 }
 
 // Response of a LOAD DATA statement.
@@ -385,6 +401,13 @@ struct TLoadDataResp {
 
   // This is needed to issue TUpdateCatalogRequest
   3: string partition_name = ""
+
+  // For Iceberg data load. The query template after the required fields are substituted.
+  4: optional string create_tmp_tbl_query
+
+  // For Iceberg data load. The temporary table location, used to restore data in case of
+  // query failure.
+  5: optional string create_location
 }
 
 enum TCatalogOpType {
diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
index 4ff13ec9e..d23d1951d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
@@ -20,18 +20,24 @@ package org.apache.impala.analysis;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.List;
+import java.util.UUID;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TLoadDataReq;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.FsPermissionChecker;
+import org.apache.orc.OrcFile;
+import org.apache.parquet.hadoop.ParquetFileWriter;
 
 import com.google.common.base.Preconditions;
 
@@ -48,6 +54,14 @@ import com.google.common.base.Preconditions;
  * (preserving the extension).
  * Loading hidden files is not supported and any hidden files in the source or
  * destination are preserved, even if OVERWRITE is true.
+ *
+ * Notes on Iceberg data loading:
+ * Iceberg files have specific field ids, therefore native parquet tables cannot be added
+ * to the data directory with only moving the files. To rewrite the files with the
+ * necessary metadata the LOAD DATA operation will be executed with 3 sub-queries:
+ *  1. CREATE temporary table
+ *  2. INSERT INTO from the temporary table to the target table
+ *  3. DROP temporary table
  */
 public class LoadDataStmt extends StatementBase {
   private final TableName tableName_;
@@ -57,6 +71,10 @@ public class LoadDataStmt extends StatementBase {
 
   // Set during analysis
   private String dbName_;
+  private FeTable table_;
+  private String createTmpTblQuery_;
+  private String insertTblQuery_;
+  private String dropTmpTblQuery_;
 
   public LoadDataStmt(TableName tableName, HdfsUri sourceDataPath, boolean overwrite,
       PartitionSpec partitionSpec) {
@@ -99,27 +117,35 @@ public class LoadDataStmt extends StatementBase {
   @Override
   public void analyze(Analyzer analyzer) throws AnalysisException {
     dbName_ = analyzer.getTargetDbName(tableName_);
-    FeTable table = analyzer.getTable(tableName_, Privilege.INSERT);
-    if (!(table instanceof FeFsTable)) {
+    table_ = analyzer.getTable(tableName_, Privilege.INSERT);
+    if (!(table_ instanceof FeFsTable)) {
       throw new AnalysisException("LOAD DATA only supported for HDFS tables: " +
           dbName_ + "." + getTbl());
     }
-    analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
-    analyzer.ensureTableNotTransactional(table, "LOAD DATA");
+    analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
+    analyzer.ensureTableNotTransactional(table_, "LOAD DATA");
 
     // Analyze the partition spec, if one was specified.
     if (partitionSpec_ != null) {
-      partitionSpec_.setTableName(tableName_);
-      partitionSpec_.setPartitionShouldExist();
-      partitionSpec_.setPrivilegeRequirement(Privilege.INSERT);
-      partitionSpec_.analyze(analyzer);
+      if (table_ instanceof FeIcebergTable) {
+        throw new AnalysisException("PARTITION clause is not supported for Iceberg " +
+            "tables.");
+      } else {
+        partitionSpec_.setTableName(tableName_);
+        partitionSpec_.setPartitionShouldExist();
+        partitionSpec_.setPrivilegeRequirement(Privilege.INSERT);
+        partitionSpec_.analyze(analyzer);
+      }
     } else {
-      if (table.getMetaStoreTable().getPartitionKeysSize() > 0) {
+      if (table_.getMetaStoreTable().getPartitionKeysSize() > 0) {
         throw new AnalysisException("Table is partitioned but no partition spec was " +
             "specified: " + dbName_ + "." + getTbl());
       }
     }
-    analyzePaths(analyzer, (FeFsTable) table);
+    analyzePaths(analyzer, (FeFsTable) table_);
+    if (table_ instanceof FeIcebergTable) {
+      analyzeLoadIntoIcebergTable();
+    }
   }
 
   /**
@@ -207,6 +233,59 @@ public class LoadDataStmt extends StatementBase {
     }
   }
 
+  /**
+   * Creates the child queries that are used to load data into Iceberg tables:
+   *   1. a temporary table is created which points to the data directory
+   *   2. an insert query copies the data from the source table to the destination table
+   *   3. a drop table deletes the temporary table and the data
+   */
+  private void analyzeLoadIntoIcebergTable() throws AnalysisException {
+    Path sourcePath = sourceDataPath_.getPath();
+    String tmpTableName = dbName_ + "." + tableName_ + "_tmp" +
+        UUID.randomUUID().toString().substring(0, 8);
+    QueryStringBuilder.Create createTableQueryBuilder =
+        new QueryStringBuilder.Create().table(tmpTableName, true);
+    try {
+      FileSystem fs = sourcePath.getFileSystem(FileSystemUtil.getConfiguration());
+      Path filePathForLike = sourcePath;
+      // LIKE <file format> clause needs a file to infer schema, for directories: list
+      // the files under the directory and pick the first one.
+      if (fs.isDirectory(sourcePath)) {
+        RemoteIterator<? extends FileStatus> fileStatuses = FileSystemUtil.listFiles(fs,
+            sourcePath, true, "");
+        while (fileStatuses.hasNext()) {
+          FileStatus fileStatus = fileStatuses.next();
+          if (fileStatus.isFile()) {
+            filePathForLike = fileStatus.getPath();
+            break;
+          }
+        }
+      }
+      String magicString = FileSystemUtil.readMagicString(filePathForLike);
+      if (magicString.substring(0, 4).equals(ParquetFileWriter.MAGIC_STR)) {
+        createTableQueryBuilder.like("PARQUET", "%s").storedAs("PARQUET");
+      } else if (magicString.substring(0, 3).equals(OrcFile.MAGIC)) {
+        createTableQueryBuilder.like("ORC", "%s").storedAs("ORC");
+      } else {
+        throw new AnalysisException(String.format("INPATH contains unsupported LOAD "
+            + "format, file '%s' has '%s' magic string.", filePathForLike, magicString));
+      }
+      createTableQueryBuilder.tableLocation("%s");
+    } catch (IOException e) {
+      throw new AnalysisException("Failed to generate CREATE TABLE subquery "
+          + "statement. ", e);
+    }
+    createTmpTblQuery_ = createTableQueryBuilder.build();
+    QueryStringBuilder.Insert insertTblQueryBuilder =
+        new QueryStringBuilder.Insert().overwrite(overwrite_)
+        .table(tableName_.toString());
+    QueryStringBuilder.Select insertSelectTblQueryBuilder =
+        new QueryStringBuilder.Select().selectList("*").from(tmpTableName);
+    insertTblQueryBuilder.select(insertSelectTblQueryBuilder);
+    insertTblQuery_ = insertTblQueryBuilder.build();
+    dropTmpTblQuery_ = new QueryStringBuilder.Drop().table(tmpTableName).build();
+  }
+
   public TLoadDataReq toThrift() {
     TLoadDataReq loadDataReq = new TLoadDataReq();
     loadDataReq.setTable_name(new TTableName(getDb(), getTbl()));
@@ -215,6 +294,12 @@ public class LoadDataStmt extends StatementBase {
     if (partitionSpec_ != null) {
       loadDataReq.setPartition_spec(partitionSpec_.toThrift());
     }
+    if (table_ instanceof FeIcebergTable) {
+      loadDataReq.setIceberg_tbl(true);
+      loadDataReq.setCreate_tmp_tbl_query_template(createTmpTblQuery_);
+      loadDataReq.setInsert_into_dst_tbl_query(insertTblQuery_);
+      loadDataReq.setDrop_tmp_tbl_query(dropTmpTblQuery_);
+    }
     return loadDataReq;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java b/fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java
new file mode 100644
index 000000000..05131b69e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStringBuilder.java
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+/**
+ * This class standardizes the query string building process. At this point only used for
+ * child query creation for Iceberg LOAD DATA INPATH queries. Each inner class is
+ * responsible for a specific query type, while the outer class can be used to instantiate
+ * the inner classes. The methods of the inner classes are supposed to be chainable.
+ */
+public class QueryStringBuilder {
+
+  public static class Create {
+    private String tableName_;
+    private Boolean external_;
+    private Boolean like_;
+    private String likeFileFormat_;
+    private String likeLocation_;
+    private String storedAsFileFormat_;
+    private String tableLocation_;
+
+    public Create() {}
+
+    public Create table(String tableName, Boolean external) {
+      tableName_ = tableName;
+      external_ = external;
+      return this;
+    }
+
+    public Create like(String fileFormat, String location) {
+      like_ = true;
+      likeFileFormat_ = fileFormat.toUpperCase();
+      likeLocation_ = location;
+      return this;
+    }
+
+    public Create storedAs(String fileFormat) {
+      storedAsFileFormat_ = fileFormat.toUpperCase();
+      return this;
+    }
+
+    public Create tableLocation(String location) {
+      tableLocation_ = location;
+      return this;
+    }
+
+    public String build() {
+      StringBuilder builder = new StringBuilder();
+      if (!external_) {
+        builder.append("CREATE TABLE ");
+      } else {
+        builder.append("CREATE EXTERNAL TABLE ");
+      }
+      builder.append(tableName_ + " ");
+      if (like_) {
+        builder.append("LIKE " + likeFileFormat_ + " '" + likeLocation_ + "' ");
+      }
+      builder.append("STORED AS " + storedAsFileFormat_ + " ");
+      builder.append("LOCATION '" + tableLocation_ + "'");
+      return builder.toString();
+    }
+  }
+
+  public static class Insert {
+    private String tableName_;
+    private Boolean overwrite_;
+    private Select select_;
+
+    public Insert() {}
+
+    public Insert table(String tableName) {
+      tableName_ = tableName + " ";
+      return this;
+    }
+
+    public Insert overwrite(Boolean overwrite) {
+      overwrite_ = overwrite;
+      return this;
+    }
+
+    public Insert select(Select select) {
+      select_ = select;
+      return this;
+    }
+
+    public String build() {
+      StringBuilder builder = new StringBuilder();
+      if (overwrite_) {
+        builder.append("INSERT OVERWRITE ");
+      } else {
+        builder.append("INSERT INTO ");
+      }
+      builder.append(tableName_);
+      builder.append(select_.build());
+      return builder.toString();
+    }
+  }
+
+  public static class Select {
+    private String selectList_;
+    private String tableName_;
+
+    public Select() {}
+
+    public Select selectList(String selectList) {
+      selectList_ = selectList;
+      return this;
+    }
+
+    public Select from(String tableName) {
+      tableName_ = tableName;
+      return this;
+    }
+
+    public String build() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("SELECT " + selectList_ + " ");
+      builder.append("FROM " + tableName_ + " ");
+      return builder.toString();
+    }
+  }
+
+  public static class Drop {
+    private String tableName_;
+
+    public Drop() {}
+
+    public Drop table(String tableName) {
+      tableName_ = tableName;
+      return this;
+    }
+
+    public String build() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("DROP TABLE ");
+      builder.append(tableName_);
+      return builder.toString();
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 2bd64549a..88401d203 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -303,8 +303,9 @@ public class FileSystemUtil {
    * destination location. Instead, a UUID will be appended to the base file name,
    * preserving the existing file extension. If renameIfAlreadyExists is false, an
    * IOException will be thrown if there is a file name conflict.
+   * Returns the Path that points to the destination file.
    */
-  public static void relocateFile(Path sourceFile, Path dest,
+  public static Path relocateFile(Path sourceFile, Path dest,
       boolean renameIfAlreadyExists) throws IOException {
     FileSystem destFs = dest.getFileSystem(CONF);
 
@@ -338,7 +339,7 @@ public class FileSystemUtil {
         throw new IOException(String.format(
             "Failed to move '%s' to '%s'", sourceFile, destFile));
       }
-      return;
+      return destFile;
     }
     Preconditions.checkState(!doRename);
     if (destIsDfs && sameBucket) {
@@ -355,6 +356,7 @@ public class FileSystemUtil {
     }
     FileSystem sourceFs = sourceFile.getFileSystem(CONF);
     FileUtil.copy(sourceFs, sourceFile, destFs, destFile, true, true, CONF);
+    return destFile;
   }
 
   /**
@@ -370,6 +372,22 @@ public class FileSystemUtil {
     }
   }
 
+  /**
+   * Reads the first 4 bytes of the file and returns it as a string. It is used to
+   * identify the file format.
+   */
+  public static String readMagicString(Path file) throws IOException {
+    FileSystem fs = file.getFileSystem(CONF);
+    InputStream fileStream = fs.open(file);
+    byte[] buffer = new byte[4];
+    try {
+      IOUtils.read(fileStream, buffer, 0, 4);
+      return  new String(buffer);
+    } finally {
+      IOUtils.closeQuietly(fileStream);
+    }
+  }
+
   /**
    * Builds a new file name based on a base file name. This is done by inserting
    * the given appendStr into the base file name, preserving the file extension (if
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 398f1e123..c28398e77 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -857,7 +857,10 @@ public class Frontend {
         String.format("load table data %s.%s", tableName.db_name, tableName.table_name));
     while (true) {
       try {
-        return doLoadTableData(request);
+        if (!request.iceberg_tbl)
+          return doLoadTableData(request);
+        else
+          return doLoadIcebergTableData(request);
       } catch(InconsistentMetadataFetchException e) {
         retries.handleRetryOrThrow(e);
       }
@@ -925,6 +928,51 @@ public class Frontend {
     return response;
   }
 
+  private TLoadDataResp doLoadIcebergTableData(TLoadDataReq request)
+      throws ImpalaException, IOException {
+    TLoadDataResp response = new TLoadDataResp();
+    TableName tableName = TableName.fromThrift(request.getTable_name());
+    FeCatalog catalog = getCatalog();
+    String destPathString = catalog.getTable(tableName.getDb(), tableName.getTbl())
+        .getMetaStoreTable().getSd().getLocation();
+    Path destPath = new Path(destPathString);
+    Path sourcePath = new Path(request.source_path);
+    FileSystem sourceFs = sourcePath.getFileSystem(FileSystemUtil.getConfiguration());
+
+    // Create a temporary directory within the final destination directory to stage the
+    // file move.
+    Path tmpDestPath = FileSystemUtil.makeTmpSubdirectory(destPath);
+
+    int numFilesLoaded = 0;
+    List<Path> filesLoaded = new ArrayList<>();
+    String destFile;
+    if (sourceFs.isDirectory(sourcePath)) {
+      numFilesLoaded = FileSystemUtil.relocateAllVisibleFiles(sourcePath,
+          tmpDestPath, filesLoaded);
+      destFile = filesLoaded.get(0).toString();
+    } else {
+      Path destFilePath = FileSystemUtil.relocateFile(sourcePath, tmpDestPath,
+          true);
+      filesLoaded.add(new Path(tmpDestPath.toString() + Path.SEPARATOR
+          + sourcePath.getName()));
+      numFilesLoaded = 1;
+      destFile = destFilePath.toString();
+    }
+
+    String createTmpTblQuery = String.format(request.create_tmp_tbl_query_template,
+        destFile, tmpDestPath.toString());
+
+    TColumnValue col = new TColumnValue();
+    String loadMsg = String.format("Loaded %d file(s).", numFilesLoaded);
+    col.setString_val(loadMsg);
+    response.setLoaded_files(filesLoaded.stream().map(Path::toString)
+        .collect(Collectors.toList()));
+    response.setLoad_summary(new TResultRow(Lists.newArrayList(col)));
+    response.setCreate_tmp_tbl_query(createTmpTblQuery);
+    response.setCreate_location(tmpDestPath.toString());
+    return response;
+  }
+
   /**
    * Parses and plans a query in order to generate its explain string. This method does
    * not increase the query id counter.
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 9fbbd1d72..15cc8a77b 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -4025,6 +4025,22 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
     }
   }
 
+  @Test
+  public void TestIcebergLoadData() throws AnalysisException {
+    AnalyzesOk("load data inpath "
+        + "'/test-warehouse/iceberg_test/iceberg_non_partitioned/data' into table "
+        + "functional_parquet.iceberg_non_partitioned");
+    AnalyzesOk("load data inpath "
+        + "'/test-warehouse/iceberg_test/iceberg_non_partitioned/data' overwrite into "
+        + "table functional_parquet.iceberg_non_partitioned");
+    AnalysisError("load data inpath "
+        + "'/test-warehouse/iceberg_test/iceberg_partitioned/data/"
+        + "event_time_hour=2020-01-01-08/action=view/' into table "
+        + "functional_parquet.iceberg_partitioned partition "
+        + "(event_time_hour='2020-01-01-08', action='view');", "PARTITION clause is not "
+        + "supported for Iceberg tables.");
+  }
+
   @Test
   public void TestInsert() throws AnalysisException {
     for (String qualifier: ImmutableList.of("INTO", "OVERWRITE")) {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-load.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-load.test
new file mode 100644
index 000000000..a0c87aa65
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-load.test
@@ -0,0 +1,142 @@
+====
+---- QUERY
+create table test_iceberg_load_parquet like iceberg_mixed_file_format_test
+stored as iceberg;
+====
+---- QUERY
+# Test 1-2: Recovery from child query failure, first file then directory location
+set mem_limit=1;
+load data inpath '/tmp/$DATABASE/parquet/00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-fff150b6136a-job_16619542960420_0002-1-00001.parquet'
+into table test_iceberg_load_parquet;
+---- CATCH
+minimum memory reservation is greater than memory available to the query for buffer reservations
+====
+---- QUERY
+select count(*) from test_iceberg_load_parquet;
+---- RESULTS
+0
+---- TYPES
+BIGINT
+====
+---- QUERY
+set mem_limit=1;
+load data inpath '/tmp/$DATABASE/parquet/' into table test_iceberg_load_parquet;
+---- CATCH
+minimum memory reservation is greater than memory available to the query for buffer reservations
+====
+---- QUERY
+select count(*) from test_iceberg_load_parquet;
+---- RESULTS
+0
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test 3-4: Load a parquet file then a directory into test table
+load data inpath '/tmp/$DATABASE/parquet/00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-fff150b6136a-job_16619542960420_0002-1-00001.parquet'
+into table test_iceberg_load_parquet;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_parquet;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
+---- QUERY
+load data inpath '/tmp/$DATABASE/parquet/' into table test_iceberg_load_parquet;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_parquet;
+---- RESULTS
+2
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test 5-6: Load an orc file then a directory into test table
+create table test_iceberg_load_orc like iceberg_mixed_file_format_test
+stored as iceberg;
+====
+---- QUERY
+load data inpath '/tmp/$DATABASE/orc/00000-0-data-gfurnstahl_20220906113255_8d49367d-e338-4996-ade5-ee500a19c1d1-job_16619542960420_0003-1-00001.orc'
+into table test_iceberg_load_orc;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_orc;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
+---- QUERY
+load data inpath '/tmp/$DATABASE/orc/' into table test_iceberg_load_orc;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_orc;
+---- RESULTS
+2
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test 7: Overwrite the existing table from Test 1
+load data inpath '/tmp/$DATABASE/overwrite/' overwrite into table test_iceberg_load_parquet;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_parquet;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test 8: Mismatching target table and file schema
+create table test_iceberg_load_schema_mismatch (i int)
+stored as iceberg;
+====
+---- QUERY
+load data inpath '/tmp/$DATABASE/mismatching_schema/' overwrite into table test_iceberg_load_schema_mismatch;
+---- CATCH
+AnalysisException: Target table 'test_load_a61184e9.test_iceberg_load_schema_mismatch' has fewer columns (1) than the SELECT / VALUES clause returns (4)
+====
+---- QUERY
+# Test 9: Partitioned Iceberg table
+---- QUERY
+create table test_iceberg_load_partitioned like functional_parquet.iceberg_partitioned
+stored as iceberg;
+====
+---- QUERY
+load data inpath '/tmp/$DATABASE/partitioned/' overwrite into table test_iceberg_load_partitioned;
+---- RESULTS
+'Loaded 1 file(s).'
+---- TYPES
+STRING
+====
+---- QUERY
+select count(*) from test_iceberg_load_partitioned;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 07e3f309c..38b246637 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -31,7 +31,7 @@ import json
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.iceberg_test_suite import IcebergTestSuite
-from tests.common.skip import SkipIf, SkipIfDockerizedCluster
+from tests.common.skip import SkipIf, SkipIfDockerizedCluster, SkipIfLocal
 from tests.common.file_utils import (
   create_iceberg_table_from_directory,
   create_table_from_parquet)
@@ -792,6 +792,56 @@ class TestIcebergTable(IcebergTestSuite):
     self.run_test_case('QueryTest/iceberg-mixed-file-format', vector,
                       unique_database)
 
+  @SkipIfLocal.hdfs_client
+  def test_load(self, vector, unique_database):
+    """Test LOAD DATA INPATH for Iceberg tables, the first part of this method inits the
+    target directory, copies existing test data to HDFS. The second part runs the test
+    cases then cleans up the test directory.
+    """
+    # Test 1-6 init: target orc/parquet file and directory
+    SRC_DIR = os.path.join(os.environ['IMPALA_HOME'],
+        "testdata/data/iceberg_test/iceberg_mixed_file_format_test/data/{0}")
+    DST_DIR = "/tmp/" + unique_database + "/parquet/"
+    self.hdfs_client.make_dir(DST_DIR, permission=777)
+    file_parq1 = "00000-0-data-gfurnstahl_20220906113044_157fc172-f5d3-4c70-8653-" \
+        "fff150b6136a-job_16619542960420_0002-1-00001.parquet"
+    file_parq2 = "00000-0-data-gfurnstahl_20220906114830_907f72c7-36ac-4135-8315-" \
+        "27ff880faff0-job_16619542960420_0004-1-00001.parquet"
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR)
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file_parq2), DST_DIR)
+    DST_DIR = "/tmp/" + unique_database + "/orc/"
+    self.hdfs_client.make_dir(DST_DIR, permission=777)
+    file_orc1 = "00000-0-data-gfurnstahl_20220906113255_8d49367d-e338-4996-ade5-" \
+        "ee500a19c1d1-job_16619542960420_0003-1-00001.orc"
+    file_orc2 = "00000-0-data-gfurnstahl_20220906114900_9c1b7b46-5643-428f-a007-" \
+        "519c5500ed04-job_16619542960420_0004-1-00001.orc"
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file_orc1), DST_DIR)
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file_orc2), DST_DIR)
+    # Test 7 init: overwrite
+    DST_DIR = "/tmp/" + unique_database + "/overwrite/"
+    self.hdfs_client.make_dir(DST_DIR, permission=777)
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file_parq1), DST_DIR)
+    # Test 8 init: mismatching parquet schema format
+    SRC_DIR = os.path.join(os.environ['IMPALA_HOME'], "testdata/data/iceberg_test/"
+        "iceberg_partitioned/data/event_time_hour=2020-01-01-08/action=view/{0}")
+    DST_DIR = "/tmp/" + unique_database + "/mismatching_schema/"
+    self.hdfs_client.make_dir(DST_DIR, permission=777)
+    file = "00001-1-b975a171-0911-47c2-90c8-300f23c28772-00000.parquet"
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file), DST_DIR)
+    # Test 9 init: partitioned
+    DST_DIR = "/tmp/" + unique_database + "/partitioned/"
+    self.hdfs_client.make_dir(DST_DIR, permission=777)
+    self.hdfs_client.copy_from_local(SRC_DIR.format(file), DST_DIR)
+
+    # Init test table
+    create_iceberg_table_from_directory(self.client, unique_database,
+        "iceberg_mixed_file_format_test", "parquet")
+
+    # Execute tests
+    self.run_test_case('QueryTest/iceberg-load', vector, use_db=unique_database)
+    # Clean up temporary directory
+    self.hdfs_client.delete_file_dir("/tmp/{0}".format(unique_database), True)
+
   def test_table_sampling(self, vector):
     self.run_test_case('QueryTest/iceberg-tablesample', vector,
         use_db="functional_parquet")