You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/06/19 21:14:55 UTC

[impala] branch master updated (2c76ff5 -> 17fd15c)

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 2c76ff5  IMPALA-2515: support parquet decimal with extra padding
     new 8fcad90  IMPALA-9688: Support create iceberg table by impala
     new b02fad2  IMPALA-7538: Support HDFS caching with LocalCatalog
     new 950e51f  IMPALA-9739: Fix data race during impala graceful shutdown
     new 004e3c8  IMPALA-8830: Fix executor group assignment of coordinator only queries
     new 17fd15c  IMPALA-5904: (part 4) Fix more TSAN bugs

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/runtime-state.cc                    |  11 +-
 be/src/runtime/runtime-state.h                     |  10 +-
 be/src/scheduling/admission-controller.cc          |  50 ++--
 be/src/scheduling/admission-controller.h           |  11 +-
 be/src/scheduling/cluster-membership-mgr.cc        |   5 +-
 be/src/scheduling/cluster-membership-mgr.h         |   7 +
 be/src/scheduling/scheduler.cc                     |  22 +-
 be/src/scheduling/scheduler.h                      |   7 +-
 be/src/service/impala-server.cc                    |   5 +-
 be/src/service/query-options-test.cc               |   3 +-
 be/src/statestore/statestore-subscriber.cc         |   2 +-
 be/src/statestore/statestore-subscriber.h          |   5 +-
 be/src/util/metrics.cc                             |  24 +-
 bin/impala-config.sh                               |   2 +
 common/thrift/CatalogObjects.thrift                |  33 +++
 common/thrift/CatalogService.thrift                |   9 +
 common/thrift/Descriptors.thrift                   |   1 +
 common/thrift/JniCatalog.thrift                    |   3 +
 fe/pom.xml                                         |  32 +++
 fe/src/main/cup/sql-parser.cup                     |  81 +++++-
 .../java/org/apache/impala/analysis/Analyzer.java  |   4 +-
 .../apache/impala/analysis/CreateTableStmt.java    |  48 +++-
 .../impala/analysis/IcebergPartitionField.java     |  79 ++++++
 .../impala/analysis/IcebergPartitionSpec.java      |  93 +++++++
 .../org/apache/impala/analysis/ShowStatsStmt.java  |  16 ++
 .../apache/impala/analysis/TableDataLayout.java    |  19 +-
 .../java/org/apache/impala/analysis/TableDef.java  |   4 +
 .../org/apache/impala/analysis/ToSqlUtils.java     |   7 +
 .../impala/catalog/CatalogServiceCatalog.java      |   4 +-
 .../java/org/apache/impala/catalog/FeCatalog.java  |   1 -
 .../org/apache/impala/catalog/FeIcebergTable.java  |  97 +++++++
 .../org/apache/impala/catalog/HdfsFileFormat.java  |  10 +-
 .../impala/catalog/HdfsStorageDescriptor.java      |   3 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |   4 +
 .../org/apache/impala/catalog/IcebergTable.java    | 286 +++++++++++++++++++++
 .../main/java/org/apache/impala/catalog/Table.java |   2 +
 .../impala/catalog/local/CatalogdMetaProvider.java |  89 ++++++-
 .../impala/catalog/local/DirectMetaProvider.java   |  21 +-
 .../apache/impala/catalog/local/LocalCatalog.java  |  12 +-
 .../impala/catalog/local/LocalFsPartition.java     |  14 +-
 .../apache/impala/catalog/local/LocalFsTable.java  |  41 ++-
 .../impala/catalog/local/LocalIcebergTable.java    | 137 ++++++++++
 .../apache/impala/catalog/local/LocalTable.java    |   3 +
 .../apache/impala/catalog/local/MetaProvider.java  |   5 +
 .../apache/impala/planner/SingleNodePlanner.java   |   4 +
 .../apache/impala/service/CatalogOpExecutor.java   |  70 ++++-
 .../java/org/apache/impala/service/Frontend.java   |   8 +
 .../impala/service/IcebergCatalogOpExecutor.java   | 169 ++++++++++++
 .../java/org/apache/impala/util/IcebergUtil.java   | 186 ++++++++++++++
 fe/src/main/jflex/sql-scanner.flex                 |   2 +
 impala-parent/pom.xml                              |   1 +
 .../queries/QueryTest/iceberg_create.test          | 154 +++++++++++
 .../functional-query/queries/QueryTest/set.test    |   2 +-
 .../queries/QueryTest/show-create-table.test       |  19 ++
 tests/common/skip.py                               |   7 -
 tests/custom_cluster/test_coordinators.py          |  16 +-
 tests/custom_cluster/test_executor_groups.py       |  44 ++--
 .../{test_disable_features.py => test_iceberg.py}  |  11 +-
 tests/metadata/test_ddl.py                         |   1 -
 tests/query_test/test_hdfs_caching.py              |   1 -
 60 files changed, 1893 insertions(+), 124 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java
 create mode 100644 fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
 create mode 100644 fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
 create mode 100644 fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
 create mode 100644 fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
 create mode 100644 fe/src/main/java/org/apache/impala/util/IcebergUtil.java
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/iceberg_create.test
 copy tests/custom_cluster/{test_disable_features.py => test_iceberg.py} (75%)


[impala] 04/05: IMPALA-8830: Fix executor group assignment of coordinator only queries

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 004e3c897e4890adb7d751b881b31ca71f5a533d
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Mon Jun 8 12:22:34 2020 -0700

    IMPALA-8830: Fix executor group assignment of coordinator only queries
    
    With this fix, coordinator only queries are submitted to a pseudo
    executor group named "empty group (using coordinator only)" which
    is empty. This allows running coordinator only queries regardless
    of the presence of any healthy executor groups.
    
    Testing:
    Added a custom cluster test and modified tests that relied on
    coordinator only queries to be queued in absence of executor groups.
    
    Change-Id: I8fe098032744aa20bbbe4faddfc67e7a46ce03d5
    Reviewed-on: http://gerrit.cloudera.org:8080/14183
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/admission-controller.cc    | 50 ++++++++++++++++------------
 be/src/scheduling/admission-controller.h     | 11 +++---
 be/src/scheduling/cluster-membership-mgr.cc  |  5 ++-
 be/src/scheduling/cluster-membership-mgr.h   |  7 ++++
 be/src/scheduling/scheduler.cc               | 22 ++++++++++--
 be/src/scheduling/scheduler.h                |  7 +++-
 tests/custom_cluster/test_coordinators.py    | 16 ++++++---
 tests/custom_cluster/test_executor_groups.py | 44 ++++++++++++++----------
 8 files changed, 110 insertions(+), 52 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 6337288..a5c1402 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -210,8 +210,10 @@ const string REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED =
 // $0 is the error message returned by the scheduler.
 const string REASON_SCHEDULER_ERROR = "Error during scheduling: $0";
 const string REASON_LOCAL_BACKEND_NOT_STARTED = "Local backend has not started up yet.";
-const string REASON_NO_EXECUTOR_GROUPS = "Waiting for executors to start. Only DDL "
-    "queries can currently run.";
+const string REASON_NO_EXECUTOR_GROUPS =
+    "Waiting for executors to start. Only DDL queries and queries scheduled only on the "
+    "coordinator (either NUM_NODES set to 1 or when small query optimization is "
+    "triggered) can currently run.";
 
 // Queue decision details
 // $0 = num running queries, $1 = num queries limit, $2 = staleness detail
@@ -1161,8 +1163,6 @@ Status AdmissionController::ComputeGroupSchedules(
   std::vector<GroupSchedule>* output_schedules = &queue_node->group_schedules;
   output_schedules->clear();
 
-  const string& pool_name = request.request.query_ctx.request_pool;
-
   // If the first statestore update arrives before the local backend has finished starting
   // up, we might not have a local backend descriptor yet. We return no schedules, which
   // will result in the query being queued.
@@ -1173,9 +1173,8 @@ Status AdmissionController::ComputeGroupSchedules(
   }
   const BackendDescriptorPB& local_be_desc = *membership_snapshot->local_be_desc;
 
-  vector<const ExecutorGroup*> executor_groups;
-  GetExecutorGroupsForPool(
-      membership_snapshot->executor_groups, pool_name, &executor_groups);
+  vector<const ExecutorGroup*> executor_groups =
+      GetExecutorGroupsForQuery(membership_snapshot->executor_groups, request);
 
   if (executor_groups.empty()) {
     queue_node->not_admitted_reason = REASON_NO_EXECUTOR_GROUPS;
@@ -1188,8 +1187,9 @@ Status AdmissionController::ComputeGroupSchedules(
   // to balance queries across executor groups equally.
   // TODO(IMPALA-8731): balance queries across executor groups more evenly
   for (const ExecutorGroup* executor_group : executor_groups) {
-    DCHECK(executor_group->IsHealthy());
-    DCHECK_GT(executor_group->NumExecutors(), 0);
+    DCHECK(executor_group->IsHealthy()
+        || cluster_membership_mgr_->GetEmptyExecutorGroup() == executor_group)
+        << executor_group->name();
     unique_ptr<QuerySchedule> group_schedule =
         make_unique<QuerySchedule>(request.query_id, request.request,
             request.query_options, request.summary_profile, request.query_events);
@@ -1232,7 +1232,6 @@ bool AdmissionController::FindGroupToAdmitOrReject(
 
   for (GroupSchedule& group_schedule : queue_node->group_schedules) {
     const ExecutorGroup& executor_group = group_schedule.executor_group;
-    DCHECK_GT(executor_group.NumExecutors(), 0);
     QuerySchedule* schedule = group_schedule.schedule.get();
     schedule->UpdateMemoryRequirements(pool_config);
 
@@ -1493,7 +1492,7 @@ void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) {
            << " coord_backend_mem_limit set to: "
            << PrintBytes(schedule->coord_backend_mem_limit())
            << " coord_backend_mem_to_admit set to: "
-           << PrintBytes(schedule->coord_backend_mem_to_admit());;
+           << PrintBytes(schedule->coord_backend_mem_to_admit());
   // Update memory and number of queries.
   UpdateStatsOnAdmission(*schedule);
   UpdateExecGroupMetric(schedule->executor_group(), 1);
@@ -1759,31 +1758,40 @@ string AdmissionController::MakePoolTopicKey(
   return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
 }
 
-void AdmissionController::GetExecutorGroupsForPool(
-    const ClusterMembershipMgr::ExecutorGroups& all_groups, const string& pool_name,
-    vector<const ExecutorGroup*>* matching_groups) {
+vector<const ExecutorGroup*> AdmissionController::GetExecutorGroupsForQuery(
+    const ClusterMembershipMgr::ExecutorGroups& all_groups,
+    const AdmissionRequest& request) {
+  vector<const ExecutorGroup*> matching_groups;
+  if (ExecEnv::GetInstance()->scheduler()->IsCoordinatorOnlyQuery(request.request)) {
+    // Coordinator only queries can run regardless of the presence of exec groups. This
+    // empty group works as a proxy to schedule coordinator only queries.
+    matching_groups.push_back(cluster_membership_mgr_->GetEmptyExecutorGroup());
+    return matching_groups;
+  }
+  const string& pool_name = request.request.query_ctx.request_pool;
   string prefix(pool_name + POOL_GROUP_DELIMITER);
   // We search for matching groups before the health check so that we don't fall back to
   // the default group in case there are matching but unhealthy groups.
   for (const auto& it : all_groups) {
     StringPiece name(it.first);
-    if (name.starts_with(prefix)) matching_groups->push_back(&it.second);
+    if (name.starts_with(prefix)) matching_groups.push_back(&it.second);
   }
-  if (matching_groups->empty()) {
+  if (matching_groups.empty()) {
     auto default_it = all_groups.find(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
-    if (default_it == all_groups.end()) return;
+    if (default_it == all_groups.end()) return matching_groups;
     VLOG(3) << "Checking default executor group for pool " << pool_name;
-    matching_groups->push_back(&default_it->second);
+    matching_groups.push_back(&default_it->second);
   }
   // Filter out unhealthy groups.
-  auto erase_from = std::remove_if(matching_groups->begin(), matching_groups->end(),
+  auto erase_from = std::remove_if(matching_groups.begin(), matching_groups.end(),
       [](const ExecutorGroup* g) { return !g->IsHealthy(); });
-  matching_groups->erase(erase_from, matching_groups->end());
+  matching_groups.erase(erase_from, matching_groups.end());
   // Sort executor groups by name.
   auto cmp = [](const ExecutorGroup* a, const ExecutorGroup* b) {
     return a->name() < b->name();
   };
-  sort(matching_groups->begin(), matching_groups->end(), cmp);
+  sort(matching_groups.begin(), matching_groups.end(), cmp);
+  return matching_groups;
 }
 
 int64_t AdmissionController::GetClusterSize(
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index abeaa13..9be49bf 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -947,10 +947,13 @@ class AdmissionController {
   /// Returns the maximum number of requests that can be queued in the pool.
   static int64_t GetMaxQueuedForPool(const TPoolConfig& pool_config);
 
-  /// Return all executor groups from 'all_groups' that can be used to run queries in
-  /// 'pool_name'.
-  void GetExecutorGroupsForPool(const ClusterMembershipMgr::ExecutorGroups& all_groups,
-      const std::string& pool_name, std::vector<const ExecutorGroup*>* matching_groups);
+  /// Return all executor groups from 'all_groups' that can be used to run the query. If
+  /// the query is a coordinator only query then a reserved empty group is returned
+  /// otherwise returns all healthy groups that can be used to run queries for the
+  /// resource pool associated with the query.
+  std::vector<const ExecutorGroup*> GetExecutorGroupsForQuery(
+      const ClusterMembershipMgr::ExecutorGroups& all_groups,
+      const AdmissionRequest& request);
 
   /// Returns the current size of the cluster.
   int64_t GetClusterSize(const ClusterMembershipMgr::Snapshot& membership_snapshot);
diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
index 72a473d..de8caeb 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -45,6 +45,8 @@ ExecutorGroup* FindOrInsertExecutorGroup(const ExecutorGroupDescPB& group,
 
 namespace impala {
 
+static const string EMPTY_GROUP_NAME("empty group (using coordinator only)");
+
 static const string LIVE_EXEC_GROUP_KEY("cluster-membership.executor-groups.total");
 static const string HEALTHY_EXEC_GROUP_KEY(
     "cluster-membership.executor-groups.total-healthy");
@@ -52,7 +54,8 @@ static const string TOTAL_BACKENDS_KEY("cluster-membership.backends.total");
 
 ClusterMembershipMgr::ClusterMembershipMgr(
     string local_backend_id, StatestoreSubscriber* subscriber, MetricGroup* metrics)
-  : current_membership_(std::make_shared<const Snapshot>()),
+  : empty_exec_group_(EMPTY_GROUP_NAME),
+    current_membership_(std::make_shared<const Snapshot>()),
     statestore_subscriber_(subscriber),
     local_backend_id_(move(local_backend_id)) {
   DCHECK(metrics != nullptr);
diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h
index 3954fd3..e2ce538 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -164,6 +164,10 @@ class ClusterMembershipMgr {
   /// 'cause' is an error status representing the reason the node was blacklisted.
   void BlacklistExecutor(const BackendDescriptorPB& be_desc, const Status& cause);
 
+  /// Returns a pointer to the static empty group reserved for scheduling coord only
+  /// queries.
+  const ExecutorGroup* GetEmptyExecutorGroup() { return &empty_exec_group_; }
+
  private:
   /// Serializes and adds the local backend descriptor to 'subscriber_topic_updates'.
   void AddLocalBackendToStatestore(const BackendDescriptorPB& local_be_desc,
@@ -204,6 +208,9 @@ class ClusterMembershipMgr {
   bool IsBackendInExecutorGroups(
       const BackendDescriptorPB& be_desc, const ExecutorGroups& executor_groups);
 
+  /// An empty group used for scheduling coordinator only queries.
+  const ExecutorGroup empty_exec_group_;
+
   /// Ensures that only one thread is processing a membership update at a time, either
   /// from a statestore update or a blacklisting decision. Must be taken before any other
   /// locks in this class.
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index e11f273..b93b16c 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -163,6 +163,7 @@ Status Scheduler::ComputeScanRangeAssignment(
       const TPlanNodeId node_id = entry.first;
       const TPlanFragment& fragment = schedule->GetContainingFragment(node_id);
       bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
+      DCHECK(executor_config.group.NumExecutors() > 0 || exec_at_coord);
 
       const TPlanNode& node = schedule->GetNode(node_id);
       DCHECK_EQ(node.node_id, node_id);
@@ -542,11 +543,19 @@ Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_conf
   // random rank.
   bool random_replica = query_options.schedule_random_replica || node_random_replica;
 
-  // TODO: Build this one from executor_group
+  // This temp group is necessary because of the AssignmentCtx interface. This group is
+  // used to schedule scan ranges for the plan node passed where the caller of this method
+  // has determined that it needs to be scheduled on the coordinator only. Note that this
+  // also includes queries where the whole query should run on the coordinator, as is
+  // determined by Scheduler::IsCoordinatorOnlyQuery(). For those queries, the
+  // AdmissionController will pass an empty executor group and rely on this method being
+  // called with exec_at_coord = true.
+  // TODO: Either get this from the ExecutorConfig or modify the AssignmentCtx interface
+  // to handle this case.
   ExecutorGroup coord_only_executor_group("coordinator-only-group");
   const BackendDescriptorPB& local_be_desc = executor_config.local_be_desc;
   coord_only_executor_group.AddExecutor(local_be_desc);
-  VLOG_QUERY << "Exec at coord is " << (exec_at_coord ? "true" : "false");
+  VLOG_ROW << "Exec at coord is " << (exec_at_coord ? "true" : "false");
   AssignmentCtx assignment_ctx(
       exec_at_coord ? coord_only_executor_group : executor_group, total_assignments_,
       total_local_assignments_);
@@ -742,6 +751,15 @@ Status Scheduler::Schedule(
   return Status::OK();
 }
 
+bool Scheduler::IsCoordinatorOnlyQuery(const TQueryExecRequest& exec_request) {
+  DCHECK_GT(exec_request.plan_exec_info.size(), 0);
+  const TPlanExecInfo& plan_exec_info = exec_request.plan_exec_info[0];
+  int64_t num_fragments = plan_exec_info.fragments.size();
+  DCHECK_GT(num_fragments, 0);
+  auto type = plan_exec_info.fragments[0].partition.type;
+  return num_fragments == 1 && type == TPartitionType::UNPARTITIONED;
+}
+
 void Scheduler::ComputeBackendExecParams(
     const ExecutorConfig& executor_config, QuerySchedule* schedule) {
   PerBackendExecParams per_backend_params;
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index b55dfb0..0844203 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -68,9 +68,14 @@ class Scheduler {
   };
 
   /// Populates given query schedule and assigns fragments to hosts based on scan
-  /// ranges in the query exec request.
+  /// ranges in the query exec request. 'executor_config' must contain a non-empty group
+  /// unless IsCoordinatorOnlyQuery() is true.
   Status Schedule(const ExecutorConfig& executor_config, QuerySchedule* schedule);
 
+  /// Returns true if the query is only supposed to run on the coordinator (single
+  /// unpartitioned fragment).
+  bool IsCoordinatorOnlyQuery(const TQueryExecRequest& exec_request);
+
  private:
   /// Map from a host's IP address to the next executor to be round-robin scheduled for
   /// that host (needed for setups with multiple executors on a single host)
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index b63b1f7..7a4336b 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -291,15 +291,21 @@ class TestCoordinators(CustomClusterTestSuite):
       self._stop_impala_cluster()
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(cluster_size=1, num_exclusive_coordinators=1)
+  @CustomClusterTestSuite.with_args(impalad_args="--queue_wait_timeout_ms=2000",
+                                    cluster_size=1, num_exclusive_coordinators=1)
   def test_dedicated_coordinator_without_executors(self):
     """This test verifies that a query gets queued and times out when no executors are
-    present."""
-    result = self.execute_query_expect_failure(self.client, "select 2")
-    expected_error = "Query aborted:Admission for query exceeded timeout 60000ms in " \
+    present but a coordinator only query gets executed."""
+    # Pick a non-trivial query that needs to be scheduled on executors.
+    query = "select count(*) from functional.alltypes where month + random() < 3"
+    result = self.execute_query_expect_failure(self.client, query)
+    expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
                      "pool default-pool. Queued reason: Waiting for executors to " \
-                     "start. Only DDL queries can currently run."
+                     "start."
     assert expected_error in str(result)
+    # Now pick a coordinator only query.
+    query = "select 1"
+    self.execute_query_expect_success(self.client, query)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(cluster_size=1, num_exclusive_coordinators=1,
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 1ba8830..df21193 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -27,6 +27,8 @@ from time import sleep
 
 LOG = logging.getLogger("test_auto_scaling")
 
+# Non-trivial query that gets scheduled on all executors within a group.
+TEST_QUERY = "select count(*) from functional.alltypes where month + random() < 3"
 
 class TestExecutorGroups(CustomClusterTestSuite):
   """This class contains tests that exercise the logic related to scaling clusters up and
@@ -120,28 +122,35 @@ class TestExecutorGroups(CustomClusterTestSuite):
       60, 1, lambda: expected_str in self.client.get_runtime_profile(query_handle))
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(impalad_args="-queue_wait_timeout_ms=2000")
-  def test_no_group_timeout(self):
-    """Tests that a query submitted to a coordinator with no executor group times out."""
-    result = self.execute_query_expect_failure(self.client, "select sleep(2)")
+  @CustomClusterTestSuite.with_args(impalad_args="-queue_wait_timeout_ms=1000")
+  def test_no_group(self):
+    """Tests that a regular query submitted to a coordinator with no executor group
+    times out but coordinator only queries can still run."""
+    result = self.execute_query_expect_failure(self.client, TEST_QUERY)
     assert "Admission for query exceeded timeout" in str(result)
     assert self._get_num_executor_groups(only_healthy=True) == 0
+    expected_group = "Executor Group: empty group (using coordinator only)"
+    # Force the query to run on coordinator only.
+    result = self.execute_query_expect_success(self.client, TEST_QUERY,
+                                               query_options={'NUM_NODES': '1'})
+    assert expected_group in str(result.runtime_profile)
+    # Small query runs on coordinator only.
+    result = self.execute_query_expect_success(self.client, "select 1")
+    assert expected_group in str(result.runtime_profile)
 
   @pytest.mark.execute_serially
   def test_single_group(self):
     """Tests that we can start a single executor group and run a simple query."""
-    QUERY = "select count(*) from functional.alltypestiny"
     self._add_executor_group("group1", 2)
-    self.execute_query_expect_success(self.client, QUERY)
+    self.execute_query_expect_success(self.client, TEST_QUERY)
     assert self._get_num_executor_groups(only_healthy=True) == 1
 
   @pytest.mark.execute_serially
   def test_executor_group_starts_while_qeueud(self):
     """Tests that a query can stay in the queue of an empty cluster until an executor
     group comes online."""
-    QUERY = "select count(*) from functional.alltypestiny"
     client = self.client
-    handle = client.execute_async(QUERY)
+    handle = client.execute_async(TEST_QUERY)
     self._assert_eventually_in_profile(handle, "Waiting for executors to start")
     assert self._get_num_executor_groups(only_healthy=True) == 0
     self._add_executor_group("group1", 2)
@@ -151,13 +160,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   def test_executor_group_health(self):
     """Tests that an unhealthy executor group will not run queries."""
-    QUERY = "select count(*) from functional.alltypestiny"
     # Start cluster and group
     self._add_executor_group("group1", 2)
     self._wait_for_num_executor_groups(1, only_healthy=True)
     client = self.client
     # Run query to validate
-    self.execute_query_expect_success(client, QUERY)
+    self.execute_query_expect_success(client, TEST_QUERY)
     # Kill an executor
     executor = self.cluster.impalads[1]
     executor.kill()
@@ -165,14 +173,14 @@ class TestExecutorGroups(CustomClusterTestSuite):
                                                    timeout=20)
     assert self._get_num_executor_groups(only_healthy=True) == 0
     # Run query and observe timeout
-    handle = client.execute_async(QUERY)
+    handle = client.execute_async(TEST_QUERY)
     self._assert_eventually_in_profile(handle, "Waiting for executors to start")
     # Restart executor
     executor.start()
     # Query should now finish
     client.wait_for_finished_timeout(handle, 20)
     # Run query and observe success
-    self.execute_query_expect_success(client, QUERY)
+    self.execute_query_expect_success(client, TEST_QUERY)
     self._wait_for_num_executor_groups(1, only_healthy=True)
 
   @pytest.mark.execute_serially
@@ -310,7 +318,6 @@ class TestExecutorGroups(CustomClusterTestSuite):
   def test_sequential_startup_wait(self):
     """Tests that starting an executor group sequentially works as expected, i.e. queries
     don't fail and no queries are admitted until the group is in a healthy state."""
-    QUERY = "select sleep(4)"
     # Start first executor
     self._add_executor_group("group1", 3, num_executors=1)
     self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 2)
@@ -318,7 +325,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     assert self._get_num_executor_groups(only_healthy=True) == 0
     # Run query and observe that it gets queued
     client = self.client
-    handle = client.execute_async(QUERY)
+    handle = client.execute_async(TEST_QUERY)
     self._assert_eventually_in_profile(handle, "Initial admission queue reason:"
                                                " Waiting for executors to start")
     initial_state = client.get_state(handle)
@@ -342,8 +349,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._add_executor_group("", min_size=2, num_executors=2,
                              admission_control_slots=3)
     # Run query to make sure things work
-    QUERY = "select count(*) from functional.alltypestiny"
-    self.execute_query_expect_success(self.client, QUERY)
+    self.execute_query_expect_success(self.client, TEST_QUERY)
     assert self._get_num_executor_groups(only_healthy=True) == 1
     # Kill executors to make group empty
     impalads = self.cluster.impalads
@@ -351,10 +357,12 @@ class TestExecutorGroups(CustomClusterTestSuite):
     impalads[2].kill()
     self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1)
     # Run query to make sure it times out
-    result = self.execute_query_expect_failure(self.client, QUERY)
+    result = self.execute_query_expect_failure(self.client, TEST_QUERY)
     expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
                      "pool default-pool. Queued reason: Waiting for executors to " \
-                     "start. Only DDL queries can currently run."
+                     "start. Only DDL queries and queries scheduled only on the " \
+                     "coordinator (either NUM_NODES set to 1 or when small query " \
+                     "optimization is triggered) can currently run."
     assert expected_error in str(result)
     assert self._get_num_executor_groups(only_healthy=True) == 0
 


[impala] 03/05: IMPALA-9739: Fix data race during impala graceful shutdown

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 950e51f9a8531b9388d8e427e5e76dcf13048362
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Fri Jun 12 16:36:25 2020 -0700

    IMPALA-9739: Fix data race during impala graceful shutdown
    
    When impala does a graceful shutdown, exit() method is called at the
    end that performs cleanup which interferes with the shutdown signal
    handling thread spawned during init() and triggers a data race which
    gets caught by the thread sanitizer build. This patch fixes that by
    using an _exit() call instead.
    
    Testing:
    Ran the offending test TestGracefulShutdown on a thread sanitizer
    build and made sure no data race was flagged.
    
    Change-Id: I59bb5326791cd877df4711e23979f9bd88e4659a
    Reviewed-on: http://gerrit.cloudera.org:8080/16074
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/impala-server.cc | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 6d0a4dd..96a8857 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -2805,7 +2805,10 @@ Status ImpalaServer::StartShutdown(
     }
   }
   LOG(INFO) << "Shutdown complete, going down.";
-  exit(0);
+  // Use _exit here instead since exit() does cleanup which interferes with the shutdown
+  // signal handler thread causing a data race.
+  ShutdownLogging();
+  _exit(0);
 }
 
 // This should never be inlined to prevent it potentially being optimized, e.g.


[impala] 01/05: IMPALA-9688: Support create iceberg table by impala

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8fcad905a12d018eb0a354f7e4793e5b0d5efd3b
Author: skyyws <sk...@163.com>
AuthorDate: Wed Apr 8 16:07:03 2020 +0800

    IMPALA-9688: Support create iceberg table by impala
    
    This patch mainly realizes the creation of iceberg table through impala,
    we can use the following sql to create a new iceberg table:
        create table iceberg_test(
            level string,
            event_time timestamp,
            message string,
            register_time date,
            telephone array <string>
        )
        partition by spec(
            level identity,
            event_time identity,
            event_time hour,
            register_time day
        )
        stored as iceberg;
    'identity' is one of Iceberg's Partition Transforms. 'identity' means that
    the source data values are used to create partitions, and other partition
    transfroms would be supported in the future, such as BUCKET/TRUNCATE. We
    can alse use 'show create table iceberg_test' to display table schema, and
    use 'show partitions iceberg_test' to display partition column info. By the
    way, partition column must be the source column.
    
    Testing:
    - Add test cases in metadata/test_show_create_table.py.
    - Add custom cluster test test_iceberg.py.
    
    Change-Id: I8d85db4c904a8c758c4cfb4f19cfbdab7e6ea284
    Reviewed-on: http://gerrit.cloudera.org:8080/15797
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options-test.cc               |   3 +-
 bin/impala-config.sh                               |   2 +
 common/thrift/CatalogObjects.thrift                |  33 +++
 common/thrift/Descriptors.thrift                   |   1 +
 common/thrift/JniCatalog.thrift                    |   3 +
 fe/pom.xml                                         |  32 +++
 fe/src/main/cup/sql-parser.cup                     |  81 +++++-
 .../java/org/apache/impala/analysis/Analyzer.java  |   4 +-
 .../apache/impala/analysis/CreateTableStmt.java    |  48 +++-
 .../impala/analysis/IcebergPartitionField.java     |  79 ++++++
 .../impala/analysis/IcebergPartitionSpec.java      |  93 +++++++
 .../org/apache/impala/analysis/ShowStatsStmt.java  |  16 ++
 .../apache/impala/analysis/TableDataLayout.java    |  19 +-
 .../java/org/apache/impala/analysis/TableDef.java  |   4 +
 .../org/apache/impala/analysis/ToSqlUtils.java     |   7 +
 .../org/apache/impala/catalog/FeIcebergTable.java  |  97 +++++++
 .../org/apache/impala/catalog/HdfsFileFormat.java  |  10 +-
 .../impala/catalog/HdfsStorageDescriptor.java      |   3 +-
 .../org/apache/impala/catalog/IcebergTable.java    | 286 +++++++++++++++++++++
 .../main/java/org/apache/impala/catalog/Table.java |   2 +
 .../impala/catalog/local/LocalIcebergTable.java    | 137 ++++++++++
 .../apache/impala/catalog/local/LocalTable.java    |   3 +
 .../apache/impala/planner/SingleNodePlanner.java   |   4 +
 .../apache/impala/service/CatalogOpExecutor.java   |  70 ++++-
 .../java/org/apache/impala/service/Frontend.java   |   8 +
 .../impala/service/IcebergCatalogOpExecutor.java   | 169 ++++++++++++
 .../java/org/apache/impala/util/IcebergUtil.java   | 186 ++++++++++++++
 fe/src/main/jflex/sql-scanner.flex                 |   2 +
 impala-parent/pom.xml                              |   1 +
 .../queries/QueryTest/iceberg_create.test          | 154 +++++++++++
 .../functional-query/queries/QueryTest/set.test    |   2 +-
 .../queries/QueryTest/show-create-table.test       |  19 ++
 tests/custom_cluster/test_iceberg.py               |  32 +++
 33 files changed, 1594 insertions(+), 16 deletions(-)

diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 83b7833..9141560 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -221,7 +221,8 @@ TEST(QueryOptions, SetEnumOptions) {
   TestEnumCase(options, CASE(parquet_array_resolution, TParquetArrayResolution,
       (THREE_LEVEL, TWO_LEVEL, TWO_LEVEL_THEN_THREE_LEVEL)), true);
   TestEnumCase(options, CASE(default_file_format, THdfsFileFormat,
-      (TEXT, RC_FILE, SEQUENCE_FILE, AVRO, PARQUET, KUDU, ORC, HUDI_PARQUET)), true);
+      (TEXT, RC_FILE, SEQUENCE_FILE, AVRO, PARQUET, KUDU, ORC, HUDI_PARQUET, ICEBERG)),
+      true);
   TestEnumCase(options, CASE(runtime_filter_mode, TRuntimeFilterMode,
       (OFF, LOCAL, GLOBAL)), true);
   TestEnumCase(options, CASE(kudu_read_mode, TKuduReadMode,
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 0fcf5cc..0d10496 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -188,6 +188,7 @@ export CDP_TEZ_VERSION=0.9.1.7.2.1.0-57
 export IMPALA_HUDI_VERSION=0.5.0-incubating
 export IMPALA_KITE_VERSION=1.0.0-cdh6.x-SNAPSHOT
 export IMPALA_ORC_JAVA_VERSION=1.6.2
+export IMPALA_ICEBERG_VERSION=0.8.0-incubating
 
 # When IMPALA_(CDP_COMPONENT)_URL are overridden, they may contain '$(platform_label)'
 # which will be substituted for the CDP platform label in bootstrap_toolchain.py
@@ -727,6 +728,7 @@ echo "IMPALA_HUDI_VERSION     = $IMPALA_HUDI_VERSION"
 echo "IMPALA_KUDU_VERSION     = $IMPALA_KUDU_VERSION"
 echo "IMPALA_KUDU_JAVA_VERSION= $IMPALA_KUDU_JAVA_VERSION"
 echo "IMPALA_RANGER_VERSION   = $IMPALA_RANGER_VERSION"
+echo "IMPALA_ICEBERG_VERSION  = $IMPALA_ICEBERG_VERSION"
 
 # Kerberos things.  If the cluster exists and is kerberized, source
 # the required environment.  This is required for any hadoop tool to
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index f8ddf63..7a73270 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -50,6 +50,7 @@ enum TTableType {
   VIEW = 2
   DATA_SOURCE_TABLE = 3
   KUDU_TABLE = 4
+  ICEBERG_TABLE = 5
 }
 
 // TODO: Separate the storage engines (e.g. Kudu) from the file formats.
@@ -64,6 +65,7 @@ enum THdfsFileFormat {
   KUDU = 5
   ORC = 6
   HUDI_PARQUET = 7
+  ICEBERG = 8
 }
 
 // TODO: Since compression is also enabled for Kudu columns, we should
@@ -113,6 +115,16 @@ enum TAccessLevel {
   WRITE_ONLY = 3
 }
 
+enum TIcebergPartitionTransform {
+  IDENTITY = 0
+  HOUR = 1
+  DAY = 2
+  MONTH = 3
+  YEAR = 4
+  BUCKET = 5
+  TRUNCATE = 6
+}
+
 struct TCompressionCodec {
   // Compression codec
   1: required THdfsCompression codec
@@ -474,6 +486,24 @@ struct TKuduTable {
   4: required list<TKuduPartitionParam> partition_by
 }
 
+struct TIcebergPartitionField {
+  1: required i32 source_id
+  2: required i32 field_id
+  3: required string field_name
+  4: required TIcebergPartitionTransform field_type
+}
+
+struct TIcebergPartitionSpec {
+  1: required i32 partition_id
+  2: optional list<TIcebergPartitionField> partition_fields
+}
+
+struct TIcebergTable {
+  // Iceberg file system table location
+  1: required string table_location
+  2: required list<TIcebergPartitionSpec> partition_spec
+}
+
 // Represents a table or view.
 struct TTable {
   // Name of the parent database. Case insensitive, expected to be stored as lowercase.
@@ -522,6 +552,9 @@ struct TTable {
   // Set if this table needs storage access during metadata load.
   // Time used for storage loading in nanoseconds.
   15: optional i64 storage_metadata_load_time_ns
+
+  // Set if this a iceberg table
+  16: optional TIcebergTable iceberg_table
 }
 
 // Represents a database.
diff --git a/common/thrift/Descriptors.thrift b/common/thrift/Descriptors.thrift
index 68c78bc..225e714 100644
--- a/common/thrift/Descriptors.thrift
+++ b/common/thrift/Descriptors.thrift
@@ -69,6 +69,7 @@ struct TTableDescriptor {
   6: optional CatalogObjects.THBaseTable hbaseTable
   9: optional CatalogObjects.TDataSourceTable dataSourceTable
   10: optional CatalogObjects.TKuduTable kuduTable
+  11: optional CatalogObjects.TIcebergTable icebergTable
 
   // Unqualified name of table
   7: required string tableName
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 2b487a8..491055b 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -542,6 +542,9 @@ struct TCreateTableParams {
 
   // Foreign Keys Structure for Hive API
   20: optional list<hive_metastore.SQLForeignKey> foreign_keys;
+
+  // Just one PartitionSpec when create iceberg table
+  21: optional CatalogObjects.TIcebergPartitionSpec partition_spec
 }
 
 // Parameters of a CREATE VIEW or ALTER VIEW AS SELECT command
diff --git a/fe/pom.xml b/fe/pom.xml
index 787cafa..0f5e777 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -447,6 +447,38 @@ under the License.
       <version>1.3</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-api</artifactId>
+      <version>${iceberg.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-core</artifactId>
+      <version>${iceberg.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <reporting>
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index bc2613d..c0088fd 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -35,6 +35,8 @@ import org.apache.impala.analysis.UnionStmt.UnionOperand;
 import org.apache.impala.analysis.RangePartition;
 import org.apache.impala.analysis.TableSampleClause;
 import org.apache.impala.analysis.AlterTableAddDropRangePartitionStmt.Operation;
+import org.apache.impala.analysis.IcebergPartitionSpec;
+import org.apache.impala.analysis.IcebergPartitionField;
 import org.apache.impala.catalog.ArrayType;
 import org.apache.impala.catalog.MapType;
 import org.apache.impala.catalog.RowFormat;
@@ -55,7 +57,9 @@ import org.apache.impala.thrift.TShowStatsOp;
 import org.apache.impala.thrift.TTablePropertyType;
 import org.apache.impala.thrift.TPrincipalType;
 import org.apache.impala.thrift.TSortingOrder;
+import org.apache.impala.thrift.TIcebergPartitionTransform;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.common.NotImplementedException;
 
 parser code {:
@@ -287,8 +291,8 @@ terminal
   KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN,
   KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FOREIGN, KW_FORMAT, KW_FORMATTED,
   KW_FROM, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_HUDIPARQUET,
-  KW_IGNORE, KW_HAVING, KW_IF, KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH,
-  KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP,
+  KW_IGNORE, KW_HAVING, KW_ICEBERG, KW_IF, KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER,
+  KW_INPATH, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP,
   KW_IS, KW_JOIN,  KW_KUDU, KW_LAST, KW_LEFT, KW_LEXICAL, KW_LIKE, KW_LIMIT, KW_LINES,
   KW_LOAD, KW_LOCATION, KW_MAP, KW_MERGE_FN, KW_METADATA, KW_NORELY, KW_NOT,
   KW_NOVALIDATE, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORC, KW_ORDER, KW_OUTER,
@@ -298,7 +302,7 @@ terminal
   KW_RENAME, KW_REPEATABLE, KW_REPLACE, KW_REPLICATION, KW_RESTRICT, KW_RETURNS,
   KW_REVOKE, KW_RIGHT, KW_RLIKE, KW_ROLE, KW_ROLES, KW_ROW, KW_ROWS, KW_SCHEMA,
   KW_SCHEMAS, KW_SELECT, KW_SEMI, KW_SEQUENCEFILE, KW_SERDEPROPERTIES, KW_SERIALIZE_FN,
-  KW_SET, KW_SHOW, KW_SMALLINT, KW_SORT, KW_STORED, KW_STRAIGHT_JOIN, KW_STRING,
+  KW_SET, KW_SHOW, KW_SMALLINT, KW_SORT, KW_SPEC, KW_STORED, KW_STRAIGHT_JOIN, KW_STRING,
   KW_STRUCT, KW_SYMBOL, KW_TABLE, KW_TABLES, KW_TABLESAMPLE, KW_TBLPROPERTIES,
   KW_TERMINATED, KW_TEXTFILE, KW_THEN, KW_TIMESTAMP, KW_TINYINT, KW_TRUNCATE, KW_STATS,
   KW_TO, KW_TRUE, KW_UNBOUNDED, KW_UNCACHED, KW_UNION, KW_UNKNOWN, KW_UPDATE,
@@ -477,6 +481,13 @@ nonterminal ColumnDef column_def, view_column_def;
 nonterminal List<ColumnDef> column_def_list, partition_column_defs,
   view_column_def_list, view_column_defs;
 nonterminal List<StructField> struct_field_def_list;
+nonterminal List<IcebergPartitionSpec> iceberg_partition_spec_list,
+  iceberg_partition_spec_defs;
+nonterminal IcebergPartitionSpec iceberg_partition_spec_def;
+nonterminal List<IcebergPartitionField> iceberg_partition_field_list,
+  iceberg_partition_field_defs;
+nonterminal IcebergPartitionField iceberg_partition_field_def;
+nonterminal TIcebergPartitionTransform iceberg_partition_transform;
 // Options for DDL commands - CREATE/DROP/ALTER
 nonterminal HdfsCachingOp cache_op_val, opt_cache_op_val;
 nonterminal BigDecimal opt_cache_op_replication;
@@ -1320,6 +1331,7 @@ create_tbl_as_select_params ::=
   {:
     tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys);
     tbl_def.getKuduPartitionParams().addAll(partition_params.getKuduPartitionParams());
+    tbl_def.getIcebergPartitionSpecs().addAll(partition_params.getIcebergPartitionSpecs());
     tbl_def.setOptions(options);
     RESULT = new CreateTableAsSelectStmt.CtasParams(new CreateTableStmt(tbl_def),
         select_stmt, null);
@@ -1360,6 +1372,7 @@ create_tbl_stmt ::=
   {:
     tbl_def.getPartitionColumnDefs().addAll(data_layout.getPartitionColumnDefs());
     tbl_def.getKuduPartitionParams().addAll(data_layout.getKuduPartitionParams());
+    tbl_def.getIcebergPartitionSpecs().addAll(data_layout.getIcebergPartitionSpecs());
     tbl_def.setOptions(options);
     RESULT = new CreateTableStmt(tbl_def);
   :}
@@ -1385,6 +1398,7 @@ create_tbl_stmt ::=
   {:
     tbl_def.getPartitionColumnDefs().addAll(data_layout.getPartitionColumnDefs());
     tbl_def.getKuduPartitionParams().addAll(data_layout.getKuduPartitionParams());
+    tbl_def.getIcebergPartitionSpecs().addAll(data_layout.getIcebergPartitionSpecs());
     tbl_def.setOptions(options);
     RESULT = new CreateTableLikeFileStmt(new CreateTableStmt(tbl_def),
         schema_file_format, new HdfsUri(schema_location));
@@ -1589,6 +1603,8 @@ opt_tbl_data_layout ::=
 partitioned_data_layout ::=
   partition_param_list:partition_params
   {: RESULT = TableDataLayout.createKuduPartitionedLayout(partition_params); :}
+  | iceberg_partition_spec_list:iceberg_partition_specs
+  {: RESULT = TableDataLayout.createIcebergPartitionedLayout(iceberg_partition_specs); :}
   | /* empty */
   {: RESULT = TableDataLayout.createEmptyLayout(); :}
   ;
@@ -1708,6 +1724,59 @@ opt_upper_range_val ::=
   {: RESULT = null; :}
   ;
 
+iceberg_partition_spec_list ::=
+  iceberg_partition_spec_defs:cols
+  {: RESULT = cols; :}
+  ;
+
+iceberg_partition_spec_defs ::=
+  iceberg_partition_spec_def:col_def
+  {:
+    List<IcebergPartitionSpec> list = new ArrayList<>();
+    list.add(col_def);
+    RESULT = list;
+  :}
+  | iceberg_partition_spec_defs:list COMMA iceberg_partition_spec_def:col_def
+  {:
+    list.add(col_def);
+    RESULT = list;
+  :}
+  ;
+
+iceberg_partition_spec_def ::=
+  iceberg_partition_field_list:partition_fields
+  {: RESULT = new IcebergPartitionSpec(partition_fields); :}
+  ;
+
+iceberg_partition_field_list ::=
+  KW_PARTITION KW_BY KW_SPEC LPAREN iceberg_partition_field_defs:cols RPAREN
+  {: RESULT = cols; :}
+  ;
+
+iceberg_partition_field_defs ::=
+  iceberg_partition_field_def:col_def
+  {:
+    List<IcebergPartitionField> list = new ArrayList<>();
+    list.add(col_def);
+    RESULT = list;
+  :}
+  | iceberg_partition_field_defs:list COMMA iceberg_partition_field_def:col_def
+  {:
+    list.add(col_def);
+    RESULT = list;
+  :}
+  ;
+
+iceberg_partition_field_def ::=
+  IDENT:col_name iceberg_partition_transform:partition_transform
+  {: RESULT = new IcebergPartitionField(col_name, partition_transform); :}
+  ;
+
+iceberg_partition_transform ::=
+  IDENT:transfrom_type
+  {: RESULT = IcebergUtil.getPartitionTransform(transfrom_type); :}
+  ;
+
 create_udf_stmt ::=
   KW_CREATE KW_FUNCTION if_not_exists_val:if_not_exists
   function_name:fn_name function_def_args:fn_args
@@ -1880,6 +1949,8 @@ file_format_val ::=
   {: RESULT = THdfsFileFormat.AVRO; :}
   | KW_HUDIPARQUET
   {: RESULT = THdfsFileFormat.HUDI_PARQUET; :}
+  | KW_ICEBERG
+  {: RESULT = THdfsFileFormat.ICEBERG; :}
   ;
 
 tbl_properties ::=
@@ -3789,6 +3860,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_HUDIPARQUET:r
   {: RESULT = r.toString(); :}
+  | KW_ICEBERG:r
+  {: RESULT = r.toString(); :}
   | KW_IF:r
   {: RESULT = r.toString(); :}
   | KW_IGNORE:r
@@ -3955,6 +4028,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_SORT:r
   {: RESULT = r.toString(); :}
+  | KW_SPEC:r
+  {: RESULT = r.toString(); :}
   | KW_STORED:r
   {: RESULT = r.toString(); :}
   | KW_STRAIGHT_JOIN:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 367edab..59e129d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -54,6 +54,7 @@ import org.apache.impala.catalog.FeIncompleteTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
@@ -764,7 +765,8 @@ public class Analyzer {
         Preconditions.checkState(table instanceof FeFsTable ||
             table instanceof FeKuduTable ||
             table instanceof FeHBaseTable ||
-            table instanceof FeDataSourceTable);
+            table instanceof FeDataSourceTable ||
+            table instanceof IcebergTable);
         resolvedTableRef = new BaseTableRef(tableRef, resolvedPath);
       }
       // Only do table masking when authorization is enabled and the authorization
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index fb9a972..920d725 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.impala.authorization.AuthorizationConfig;
 import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.AnalysisException;
@@ -139,6 +140,10 @@ public class CreateTableStmt extends StatementBase {
     return tableDef_.getPrimaryKeyColumnNames();
   }
 
+  List<IcebergPartitionSpec> getIcebergPartitionSpecs() {
+    return tableDef_.getIcebergPartitionSpecs();
+  }
+
   /**
    * Get foreign keys information as strings. Useful for toSqlUtils.
    * @return List of strings of the form "(col1, col2,..) REFERENCES [pk_db].pk_table
@@ -218,6 +223,13 @@ public class CreateTableStmt extends StatementBase {
       params.addToForeign_keys(fk);
     }
     params.setServer_name(serverName_);
+
+    // Create table stmt only have one PartitionSpec
+    if (!getIcebergPartitionSpecs().isEmpty()) {
+      Preconditions.checkState(getIcebergPartitionSpecs().size() == 1);
+      params.setPartition_spec(getIcebergPartitionSpecs().get(0).toThrift());
+    }
+
     return params;
   }
 
@@ -243,7 +255,8 @@ public class CreateTableStmt extends StatementBase {
     // Avro tables can have empty column defs because they can infer them from the Avro
     // schema. Likewise for external Kudu tables, the schema can be read from Kudu.
     if (getColumnDefs().isEmpty() && getFileFormat() != THdfsFileFormat.AVRO
-        && getFileFormat() != THdfsFileFormat.KUDU) {
+        && getFileFormat() != THdfsFileFormat.KUDU && getFileFormat() !=
+        THdfsFileFormat.ICEBERG) {
       throw new AnalysisException("Table requires at least 1 column");
     }
     if (getFileFormat() == THdfsFileFormat.AVRO) {
@@ -255,6 +268,10 @@ public class CreateTableStmt extends StatementBase {
       AvroSchemaUtils.setFromSerdeComment(getColumnDefs());
     }
 
+    if (getFileFormat() == THdfsFileFormat.ICEBERG) {
+      analyzeIcebergFormat();
+    }
+
     // If lineage logging is enabled, compute minimal lineage graph.
     if (BackendConfig.INSTANCE.getComputeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
        computeLineageGraph(analyzer);
@@ -542,4 +559,33 @@ public class CreateTableStmt extends StatementBase {
           new StringLiteral(kv.getValue()).getUnescapedValue());
     }
   }
+
+  /**
+   * For iceberg file format, add related storage handler
+   */
+  private void analyzeIcebergFormat() throws AnalysisException {
+    // A managed table cannot have 'external.table.purge' property set
+    if (!isExternal() && Boolean.parseBoolean(
+        getTblProperties().get(IcebergTable.TBL_PROP_EXTERNAL_TABLE_PURGE))) {
+      throw new AnalysisException(String.format("Table property '%s' cannot be set to " +
+          "true with a managed Iceberg table.",
+          IcebergTable.TBL_PROP_EXTERNAL_TABLE_PURGE));
+    }
+
+    if ((!isExternal() || Boolean.parseBoolean(getTblProperties().get(
+        Table.TBL_PROP_EXTERNAL_TABLE_PURGE))) && getColumnDefs().isEmpty()) {
+      // External iceberg table can have empty column, but managed iceberg table
+      // requires at least one column.
+      throw new AnalysisException("Table requires at least 1 column for " +
+          "managed iceberg table.");
+    }
+
+    String handler = getTblProperties().get(IcebergTable.KEY_STORAGE_HANDLER);
+    if (handler != null && !IcebergTable.isIcebergStorageHandler(handler)) {
+      throw new AnalysisException("Invalid storage handler " +
+          "specified for Iceberg format: " + handler);
+    }
+    putGeneratedKuduProperty(IcebergTable.KEY_STORAGE_HANDLER,
+        IcebergTable.ICEBERG_STORAGE_HANDLER);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
new file mode 100644
index 0000000..2cc2d5d
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionField.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.thrift.TIcebergPartitionField;
+import org.apache.impala.thrift.TIcebergPartitionTransform;
+
+/**
+ * Represents a PartitionField of iceberg
+ */
+public class IcebergPartitionField extends StmtNode {
+  // The id of the source field in iceberg table Schema, you can get these source
+  // fields by Schema.columns(), the return type is List<NestedField>.
+  private int sourceId_;
+
+  // The field id from Iceberg PartitionField, which across all the table
+  // metadata's partition specs
+  private int fieldId_;
+
+  //Column name
+  private String fieldName_;
+
+  //Column partition type
+  private TIcebergPartitionTransform fieldType_;
+
+  public IcebergPartitionField(int sourceId, int fieldId, String fieldName,
+                               TIcebergPartitionTransform fieldType) {
+    sourceId_ = sourceId;
+    fieldId_ = fieldId;
+    fieldName_ = fieldName;
+    fieldType_ = fieldType;
+  }
+
+  public IcebergPartitionField(String fieldName, TIcebergPartitionTransform fieldType) {
+    this(0, 0, fieldName, fieldType);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    return;
+  }
+
+  @Override
+  public final String toSql() {
+    return toSql(ToSqlOptions.DEFAULT);
+  }
+
+  @Override
+  public String toSql(ToSqlOptions options) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(fieldName_ + " " + fieldType_.toString());
+    return builder.toString();
+  }
+
+  public TIcebergPartitionField toThrift() {
+    TIcebergPartitionField result = new TIcebergPartitionField();
+    result.setField_id(fieldId_);
+    result.setSource_id(sourceId_);
+    result.setField_name(fieldName_);
+    result.setField_type(fieldType_);
+    return result;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java
new file mode 100644
index 0000000..6bf2829
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import java.util.List;
+
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.thrift.TIcebergPartitionSpec;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents the partitioning of a Iceberg table as defined in the PARTITION BY SPEC
+ * clause of a CREATE TABLE statement. Iceberg supported kinds of partition.
+ * Examples:
+ * PARTITION BY SPEC
+ * (
+ * dt identity,
+ * event_time hour,
+ * event_time day,
+ * event_time month
+ * )
+ */
+public class IcebergPartitionSpec extends StmtNode {
+  // Partition id from iceberg PartitionSpec
+  private int partitionId_;
+
+  private List<IcebergPartitionField> icebergPartitionFields_;
+
+  public IcebergPartitionSpec(int partitionId, List<IcebergPartitionField> fields) {
+    partitionId_ = partitionId;
+    icebergPartitionFields_ = fields;
+  }
+
+  public IcebergPartitionSpec(List<IcebergPartitionField> fields) {
+    this(0, fields);
+  }
+
+  public List<IcebergPartitionField> getIcebergPartitionFields_() {
+    return icebergPartitionFields_;
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    for (IcebergPartitionField field : icebergPartitionFields_) {
+      field.analyze(analyzer);
+    }
+  }
+
+  @Override
+  public final String toSql() {
+    return toSql(ToSqlOptions.DEFAULT);
+  }
+
+  @Override
+  public String toSql(ToSqlOptions options) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("(");
+    if (!icebergPartitionFields_.isEmpty()) {
+      builder.append("\n");
+      for (IcebergPartitionField field : icebergPartitionFields_) {
+        builder.append(String.format("  %s,\n", field.toSql()));
+      }
+    }
+    builder.append(")");
+    return builder.toString();
+  }
+
+  public TIcebergPartitionSpec toThrift() {
+    TIcebergPartitionSpec result = new TIcebergPartitionSpec();
+    result.setPartition_id(partitionId_);
+    for (IcebergPartitionField field : icebergPartitionFields_) {
+      result.addToPartition_fields(field.toThrift());
+    }
+    return result;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java
index 14698eb..42fba21 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ShowStatsStmt.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
@@ -94,6 +95,21 @@ public class ShowStatsStmt extends StatementBase {
         throw new AnalysisException(getSqlPrefix() + " requested but table does not " +
             "have range partitions: " + table_.getFullName());
       }
+    } else if (table_ instanceof FeIcebergTable) {
+      FeIcebergTable icebergTable = (FeIcebergTable) table_;
+      if (op_ == TShowStatsOp.PARTITIONS) {
+        //Non-partition iceberg table only has an empty PartitionField set
+        Preconditions.checkArgument(!icebergTable.getPartitionSpec().isEmpty());
+        IcebergPartitionSpec spec = icebergTable.getPartitionSpec().get(0);
+        boolean emptySpec = (spec.getIcebergPartitionFields_() == null ||
+            spec.getIcebergPartitionFields_().size() == 0);
+        if (icebergTable.getPartitionSpec().size() == 1 && emptySpec) {
+          throw new AnalysisException("Iceberg table does not have PartitionSpec: "
+              + table_.getFullName());
+        }
+      } else {
+        throw new AnalysisException(getSqlPrefix() + " not supported for Iceberg table.");
+      }
     } else {
       if (op_ == TShowStatsOp.RANGE_PARTITIONS) {
         throw new AnalysisException(getSqlPrefix() + " must target a Kudu table: " +
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
index 251cc2f..d11c2c1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDataLayout.java
@@ -22,32 +22,43 @@ import java.util.List;
 
 /**
  * Represents the PARTITION BY and PARTITIONED BY clauses of a DDL statement.
+ * We can use PARTITION BY SPEC clause to create iceberg table partitions.
  */
 class TableDataLayout {
 
   private final List<ColumnDef> partitionColDefs_;
   private final List<KuduPartitionParam> kuduPartitionParams_;
+  private final List<IcebergPartitionSpec> icebergPartitionSpecs_;
 
   private TableDataLayout(List<ColumnDef> partitionColumnDefs,
-      List<KuduPartitionParam> partitionParams) {
+                          List<KuduPartitionParam> partitionParams,
+                          List<IcebergPartitionSpec> icebergPartitionSpecs) {
     partitionColDefs_ = partitionColumnDefs;
     kuduPartitionParams_ = partitionParams;
+    icebergPartitionSpecs_ = icebergPartitionSpecs;
   }
 
   static TableDataLayout createPartitionedLayout(List<ColumnDef> partitionColumnDefs) {
     return new TableDataLayout(partitionColumnDefs,
-        new ArrayList<>());
+        new ArrayList<>(), new ArrayList<>());
   }
 
   static TableDataLayout createKuduPartitionedLayout(
       List<KuduPartitionParam> partitionParams) {
-    return new TableDataLayout(new ArrayList<>(), partitionParams);
+    return new TableDataLayout(new ArrayList<>(), partitionParams, new ArrayList<>());
+  }
+
+  static TableDataLayout createIcebergPartitionedLayout(
+      List<IcebergPartitionSpec> icebergPartitionSpecs) {
+    return new TableDataLayout(new ArrayList<>(), new ArrayList<>(),
+        icebergPartitionSpecs);
   }
 
   static TableDataLayout createEmptyLayout() {
-    return new TableDataLayout(new ArrayList<>(), new ArrayList<>());
+    return new TableDataLayout(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
   }
 
   List<ColumnDef> getPartitionColumnDefs() { return partitionColDefs_; }
   List<KuduPartitionParam> getKuduPartitionParams() { return kuduPartitionParams_; }
+  List<IcebergPartitionSpec> getIcebergPartitionSpecs() { return icebergPartitionSpecs_; }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 71d8de4..addfa8f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -374,6 +374,10 @@ class TableDef {
   List<KuduPartitionParam> getKuduPartitionParams() {
     return dataLayout_.getKuduPartitionParams();
   }
+
+  List<IcebergPartitionSpec> getIcebergPartitionSpecs() {
+    return dataLayout_.getIcebergPartitionSpecs();
+  }
   void setOptions(Options options) {
     Preconditions.checkNotNull(options);
     options_ = options;
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 562412b..a0a3b4c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -35,12 +35,14 @@ import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeHBaseTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
@@ -385,7 +387,12 @@ public class ToSqlUtils {
       } catch (Exception e) {
         throw new CatalogException("Could not get primary key/foreign keys sql.", e);
       }
+    } else if (table instanceof FeIcebergTable) {
+      storageHandlerClassName = null;
+      format = HdfsFileFormat.ICEBERG;
+      properties.remove(IcebergTable.KEY_STORAGE_HANDLER);
     }
+
     HdfsUri tableLocation = location == null ? null : new HdfsUri(location);
     return getCreateTableSql(table.getDb().getName(), table.getName(), comment, colsSql,
         partitionColsSql, primaryKeySql, foreignKeySql, kuduPartitionByParams,
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
new file mode 100644
index 0000000..d245d11
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.impala.analysis.IcebergPartitionField;
+import org.apache.impala.analysis.IcebergPartitionSpec;
+import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TResultSetMetadata;
+import org.apache.impala.util.IcebergUtil;
+import org.apache.impala.util.TResultRowBuilder;
+
+/**
+ * Frontend interface for interacting with an Iceberg-backed table.
+ */
+public interface FeIcebergTable extends FeTable {
+  /**
+   * Return the name of iceberg table name, usually a hdfs location path
+   */
+  String getIcebergTableLocation();
+
+  /**
+   * Return the Iceberg partition spec info
+   */
+  List<IcebergPartitionSpec> getPartitionSpec();
+
+  /**
+   * Utility functions
+   */
+  public static abstract class Utils {
+    public static TResultSet getPartitionSpecs(FeIcebergTable table)
+        throws TableLoadingException {
+      TResultSet result = new TResultSet();
+      TResultSetMetadata resultSchema = new TResultSetMetadata();
+      result.setSchema(resultSchema);
+
+      resultSchema.addToColumns(new TColumn("Partition Id", Type.BIGINT.toThrift()));
+      resultSchema.addToColumns(new TColumn("Source Id", Type.BIGINT.toThrift()));
+      resultSchema.addToColumns(new TColumn("Field Id", Type.BIGINT.toThrift()));
+      resultSchema.addToColumns(new TColumn("Field Name", Type.STRING.toThrift()));
+      resultSchema.addToColumns(new TColumn("Field Partition Transform",
+          Type.STRING.toThrift()));
+
+      TableMetadata metadata = IcebergUtil.
+          getIcebergTableMetadata(table.getIcebergTableLocation());
+      if (!metadata.specs().isEmpty()) {
+        // Just show the latest PartitionSpec from iceberg table metadata
+        PartitionSpec latestSpec = metadata.specs().get(metadata.specs().size() - 1);
+        for(PartitionField field : latestSpec.fields()) {
+          TResultRowBuilder builder = new TResultRowBuilder();
+          builder.add(latestSpec.specId());
+          builder.add(field.sourceId());
+          builder.add(field.fieldId());
+          builder.add(field.name());
+          builder.add(IcebergUtil.getPartitionTransform(field).toString());
+          result.addToRows(builder.get());
+        }
+      }
+      return result;
+    }
+
+    public static List<IcebergPartitionSpec> loadPartitionSpecByIceberg(
+        TableMetadata metadata) throws TableLoadingException {
+      List<IcebergPartitionSpec> ret = new ArrayList<>();
+      for (PartitionSpec spec : metadata.specs()) {
+        List<IcebergPartitionField> fields = new ArrayList<>();;
+        for (PartitionField field : spec.fields()) {
+          fields.add(new IcebergPartitionField(field.sourceId(), field.fieldId(),
+              field.name(), IcebergUtil.getPartitionTransform(field)));
+        }
+        ret.add(new IcebergPartitionSpec(spec.specId(), fields));
+      }
+      return ret;
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
index 4eeebe1..346c09f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsFileFormat.java
@@ -76,7 +76,10 @@ public enum HdfsFileFormat {
       "org.apache.hadoop.hive.kudu.KuduSerDe", false, false, false),
   HUDI_PARQUET("org.apache.hudi.hadoop.HoodieParquetInputFormat",
       "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
-      "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", true, true, true);
+      "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", true, true, true),
+  ICEBERG("com.expediagroup.hiveberg.IcebergInputFormat",
+      "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
+      "com.expediagroup.hiveberg.IcebergSerDe", false, false, false);
 
   private final String inputFormat_;
   private final String outputFormat_;
@@ -130,6 +133,7 @@ public enum HdfsFileFormat {
           .put(KUDU.inputFormat(), KUDU)
           .put(ORC.inputFormat(), ORC)
           .put(HUDI_PARQUET.inputFormat(), HUDI_PARQUET)
+          .put(ICEBERG.inputFormat(), ICEBERG)
           .build();
 
   /**
@@ -168,6 +172,7 @@ public enum HdfsFileFormat {
       case HUDI_PARQUET: return HdfsFileFormat.HUDI_PARQUET;
       case PARQUET: return HdfsFileFormat.PARQUET;
       case KUDU: return HdfsFileFormat.KUDU;
+      case ICEBERG: return HdfsFileFormat.ICEBERG;
       default:
         throw new RuntimeException("Unknown THdfsFileFormat: "
             + thriftFormat + " - should never happen!");
@@ -184,6 +189,7 @@ public enum HdfsFileFormat {
       case HUDI_PARQUET:
       case PARQUET: return THdfsFileFormat.PARQUET;
       case KUDU: return THdfsFileFormat.KUDU;
+      case ICEBERG: return THdfsFileFormat.ICEBERG;
       default:
         throw new RuntimeException("Unknown HdfsFormat: "
             + this + " - should never happen!");
@@ -208,6 +214,7 @@ public enum HdfsFileFormat {
       case PARQUET: return "PARQUET";
       case KUDU: return "KUDU";
       case HUDI_PARQUET: return "HUDIPARQUET";
+      case ICEBERG: return "ICEBERG";
       default:
         throw new RuntimeException("Unknown HdfsFormat: "
             + this + " - should never happen!");
@@ -227,6 +234,7 @@ public enum HdfsFileFormat {
       case PARQUET:
       case HUDI_PARQUET:
       case ORC:
+      case ICEBERG:
         return true;
       case KUDU:
         return false;
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java b/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java
index f53f3b0..f6ca7b0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsStorageDescriptor.java
@@ -71,7 +71,8 @@ public class HdfsStorageDescriptor {
       "parquet.hive.serde.ParquetHiveSerDe", // (parquet - legacy)
       // TODO: Verify the following Parquet SerDe works with Impala and add
       // support for the new input/output format classes. See IMPALA-4214.
-      "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"); // (parquet)
+      "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", // (parquet)
+      "com.expediagroup.hiveberg.IcebergSerDe"); // (iceberg)
 
   private final static Logger LOG = LoggerFactory.getLogger(HdfsStorageDescriptor.class);
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
new file mode 100644
index 0000000..d448277
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.types.Types;
+import org.apache.impala.analysis.IcebergPartitionField;
+import org.apache.impala.analysis.IcebergPartitionSpec;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TIcebergPartitionField;
+import org.apache.impala.thrift.TIcebergPartitionSpec;
+import org.apache.impala.thrift.TIcebergTable;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableType;
+import org.apache.impala.util.IcebergUtil;
+import org.apache.thrift.TException;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Representation of an Iceberg table in the catalog cache.
+ */
+public class IcebergTable extends Table implements FeIcebergTable {
+
+  // Alias to the string key that identifies the storage handler for Iceberg tables.
+  public static final String KEY_STORAGE_HANDLER =
+      hive_metastoreConstants.META_TABLE_STORAGE;
+
+  // Iceberg specific value for the storage handler table property keyed by
+  // KEY_STORAGE_HANDLER.
+  public static final String ICEBERG_STORAGE_HANDLER =
+      "com.expediagroup.hiveberg.IcebergStorageHandler";
+
+  // The iceberg file system table location
+  private String icebergTableLocation_;
+
+  // Partitioning schemes of this Iceberg table.
+  private List<IcebergPartitionSpec> partitionSpecs_;
+
+  // Schema of the iceberg table.
+  private org.apache.iceberg.Schema icebergSchema_;
+
+  // PartitionSpec of the iceberg table.
+  private List<PartitionSpec> icebergPartitionSpecs_;
+
+  protected IcebergTable(org.apache.hadoop.hive.metastore.api.Table msTable,
+                         Db db, String name, String owner) {
+    super(msTable, db, name, owner);
+    icebergTableLocation_ = msTable.getSd().getLocation();
+  }
+
+  /**
+   * If managed table or external purge table , we create table by iceberg api,
+   * or we just create hms table.
+   */
+  public static boolean needsCreateInIceberg(
+      org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    Preconditions.checkState(isIcebergTable(msTbl));
+    return isManagedTable(msTbl) || isExternalPurgeTable(msTbl);
+  }
+
+  /**
+   * Returns if this metastore table has managed table type
+   */
+  private static boolean isManagedTable(
+      org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    return msTbl.getTableType().equalsIgnoreCase(TableType.MANAGED_TABLE.toString());
+  }
+
+  @Override
+  public TCatalogObjectType getCatalogObjectType() {
+    return TCatalogObjectType.TABLE;
+  }
+
+  @Override
+  public String getStorageHandlerClassName() {
+    return ICEBERG_STORAGE_HANDLER;
+  }
+
+  public static boolean isIcebergStorageHandler(String handler) {
+    return handler != null && handler.equals(ICEBERG_STORAGE_HANDLER);
+  }
+
+  public static boolean isIcebergTable(org.apache.hadoop.hive.metastore.api.Table msTbl) {
+    return isIcebergStorageHandler(msTbl.getParameters().get(KEY_STORAGE_HANDLER));
+  }
+
+  public org.apache.iceberg.Schema getIcebergSchema() {
+    return icebergSchema_;
+  }
+
+  public List<PartitionSpec> getIcebergPartitionSpec() {
+    return icebergPartitionSpecs_;
+  }
+
+  @Override
+  public String getIcebergTableLocation() {
+    return icebergTableLocation_;
+  }
+
+  @Override
+  public List<IcebergPartitionSpec> getPartitionSpec() {
+    Preconditions.checkState(partitionSpecs_ != null);
+    return ImmutableList.copyOf(partitionSpecs_);
+  }
+
+  @Override
+  public TTable toThrift() {
+    TTable table = super.toThrift();
+    table.setTable_type(TTableType.ICEBERG_TABLE);
+    table.setIceberg_table(getTIcebergTable());
+    return table;
+  }
+
+  /**
+   * Loads the metadata of a Iceberg table.
+   * <p>
+   * Schema and partitioning schemes are loaded directly from Iceberg whereas column stats
+   * are loaded from HMS. The function also updates the table schema in HMS in order to
+   * propagate alterations made to the Iceberg table to HMS.
+   */
+  @Override
+  public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
+                   org.apache.hadoop.hive.metastore.api.Table msTbl, String reason)
+      throws TableLoadingException {
+    final Timer.Context context =
+        getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
+    try {
+      // Copy the table to check later if anything has changed.
+      msTable_ = msTbl.deepCopy();
+      // Load metadata from Iceberg
+      final Timer.Context ctxStorageLdTime =
+          getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time();
+      try {
+        loadSchemaFromIceberg();
+      } catch (Exception e) {
+        throw new TableLoadingException("Error loading metadata for Iceberg table " +
+            icebergTableLocation_, e);
+      } finally {
+        storageMetadataLoadTime_ = ctxStorageLdTime.stop();
+      }
+
+      refreshLastUsedTime();
+
+      // Avoid updating HMS if the schema didn't change.
+      if (msTable_.equals(msTbl)) return;
+
+      // Update the table schema in HMS.
+      try {
+        updateTimestampProperty(msTable_, TBL_PROP_LAST_DDL_TIME);
+        msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS,
+            StatsSetupConst.TRUE);
+        msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_);
+      } catch (TException e) {
+        throw new TableLoadingException(e.getMessage());
+      }
+    } finally {
+      context.stop();
+    }
+  }
+
+  /**
+   * Load schema and partitioning schemes directly from Iceberg.
+   */
+  public void loadSchemaFromIceberg() throws TableLoadingException {
+    TableMetadata metadata = IcebergUtil.getIcebergTableMetadata(icebergTableLocation_);
+    icebergSchema_ = metadata.schema();
+    icebergPartitionSpecs_ = metadata.specs();
+    loadSchema();
+    partitionSpecs_ = buildIcebergPartitionSpec(icebergPartitionSpecs_);
+  }
+
+  /**
+   * Loads the HMS schema by Iceberg schema
+   */
+  private void loadSchema() throws TableLoadingException {
+    clearColumns();
+
+    List<FieldSchema> cols = msTable_.getSd().getCols();
+    cols.clear();
+
+    int pos = 0;
+    for (Types.NestedField column : icebergSchema_.columns()) {
+      Preconditions.checkNotNull(column);
+      Type colType = IcebergUtil.toImpalaType(column.type());
+      // Update sd cols by iceberg NestedField
+      cols.add(new FieldSchema(column.name(), colType.toSql().toLowerCase(),
+          column.doc()));
+      // Update impala Table columns by iceberg NestedField
+      addColumn(new Column(column.name(), colType, column.doc(), pos++));
+    }
+  }
+
+  /**
+   * Build IcebergPartitionSpec list by iceberg PartitionSpec
+   */
+  private List<IcebergPartitionSpec> buildIcebergPartitionSpec(
+      List<PartitionSpec> specs) throws TableLoadingException {
+    List<IcebergPartitionSpec> ret = new ArrayList<>();
+    for (PartitionSpec spec : specs) {
+      List<IcebergPartitionField> fields = new ArrayList<>();
+      for (PartitionField field : spec.fields()) {
+        fields.add(new IcebergPartitionField(field.sourceId(), field.fieldId(),
+            field.name(), IcebergUtil.getPartitionTransform(field)));
+      }
+      ret.add(new IcebergPartitionSpec(spec.specId(), fields));
+    }
+    return ret;
+  }
+
+  @Override
+  protected void loadFromThrift(TTable thriftTable) throws TableLoadingException {
+    super.loadFromThrift(thriftTable);
+    TIcebergTable ticeberg = thriftTable.getIceberg_table();
+    icebergTableLocation_ = ticeberg.getTable_location();
+    partitionSpecs_ = loadPartitionBySpecsFromThrift(ticeberg.getPartition_spec());
+  }
+
+  private List<IcebergPartitionSpec> loadPartitionBySpecsFromThrift(
+      List<TIcebergPartitionSpec> params) {
+    List<IcebergPartitionSpec> ret = new ArrayList<>();
+    for (TIcebergPartitionSpec param : params) {
+      // Non-partition iceberg table only has one PartitionSpec with an empty
+      // PartitionField set and a partition id
+      if (param.getPartition_fields() != null) {
+        List<IcebergPartitionField> fields = new ArrayList<>();
+        for (TIcebergPartitionField field : param.getPartition_fields()) {
+          fields.add(new IcebergPartitionField(field.getSource_id(), field.getField_id(),
+              field.getField_name(), field.getField_type()));
+        }
+        ret.add(new IcebergPartitionSpec(param.getPartition_id(),
+            fields));
+      } else {
+        ret.add(new IcebergPartitionSpec(param.getPartition_id(), null));
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  public TTableDescriptor toThriftDescriptor(int tableId,
+                                             Set<Long> referencedPartitions) {
+    TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.ICEBERG_TABLE,
+        getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
+    desc.setIcebergTable(getTIcebergTable());
+    return desc;
+  }
+
+  private TIcebergTable getTIcebergTable() {
+    TIcebergTable tbl = new TIcebergTable();
+    tbl.setTable_location(icebergTableLocation_);
+    for (IcebergPartitionSpec partition : partitionSpecs_) {
+      tbl.addToPartition_spec(partition.toThrift());
+    }
+    return tbl;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index c1bb27f..62e0a7e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -386,6 +386,8 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
       table = new HBaseTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
     } else if (KuduTable.isKuduTable(msTbl)) {
       table = new KuduTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
+    } else if (IcebergTable.isIcebergTable(msTbl)) {
+      table = new IcebergTable(msTbl, db, msTbl.getTableName(), msTbl.getOwner());
     } else if (DataSourceTable.isDataSourceTable(msTbl)) {
       // It's important to check if this is a DataSourceTable before HdfsTable because
       // DataSourceTables are still represented by HDFS tables in the metastore but
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
new file mode 100644
index 0000000..83b3608
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.local;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.types.Types;
+import org.apache.impala.analysis.IcebergPartitionSpec;
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeCatalogUtils;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.thrift.TIcebergTable;
+import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableType;
+import org.apache.impala.util.IcebergUtil;
+
+import com.google.common.base.Preconditions;
+import com.google.errorprone.annotations.Immutable;
+
+/**
+ * Iceberg table for LocalCatalog
+ */
+public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
+  private final TableParams tableParams_;
+  private final List<IcebergPartitionSpec> partitionSpecs_;
+
+  static LocalTable loadFromIceberg(LocalDb db, Table msTable,
+                                    MetaProvider.TableMetaRef ref)
+      throws TableLoadingException {
+    Preconditions.checkNotNull(db);
+    Preconditions.checkNotNull(msTable);
+    String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
+    TableParams params = new TableParams(msTable);
+
+    List<Column> cols = new ArrayList<>();
+    List<FieldSchema> fieldSchemas = new ArrayList<>();
+    TableMetadata metadata =
+        IcebergUtil.getIcebergTableMetadata(params.icebergTableName_);
+    convertColsFromKIceberg(metadata.schema(), cols, fieldSchemas);
+
+    msTable.getSd().setCols(fieldSchemas);
+
+    List<IcebergPartitionSpec> partitionSpecs =
+        Utils.loadPartitionSpecByIceberg(metadata);
+
+    ColumnMap cmap = new ColumnMap(cols, /*numClusteringCols=*/0, fullTableName,
+                                  false);
+    return new LocalIcebergTable(db, msTable, ref, cmap, partitionSpecs);
+  }
+
+  private static void convertColsFromKIceberg(Schema schema, List<Column> cols,
+                                              List<FieldSchema> fieldSchemas)
+      throws TableLoadingException {
+    List<Types.NestedField> columns = schema.columns();
+    int pos = 0;
+    for (Types.NestedField column : columns) {
+      Preconditions.checkNotNull(column);
+      Type colType = IcebergUtil.toImpalaType(column.type());
+      fieldSchemas.add(new FieldSchema(column.name(), colType.toSql().toLowerCase(),
+          column.doc()));
+      cols.add(new Column(column.name(), colType, column.doc(), pos++));
+    }
+  }
+
+  private LocalIcebergTable(LocalDb db, Table msTable, MetaProvider.TableMetaRef ref,
+                            ColumnMap cmap, List<IcebergPartitionSpec> partitionSpecs) {
+    super(db, msTable, ref, cmap);
+    tableParams_ = new TableParams(msTable);
+    partitionSpecs_ = partitionSpecs;
+  }
+
+  @Override
+  public String getIcebergTableLocation() {
+    return tableParams_.icebergTableName_;
+  }
+
+  @Override
+  public List<IcebergPartitionSpec> getPartitionSpec() {
+    return partitionSpecs_;
+  }
+
+  @Override
+  public TTableDescriptor toThriftDescriptor(int tableId,
+                                             Set<Long> referencedPartitions) {
+    TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.ICEBERG_TABLE,
+        FeCatalogUtils.getTColumnDescriptors(this),
+        getNumClusteringCols(),
+        name_, db_.getName());
+
+    TIcebergTable tbl = new TIcebergTable();
+    tbl.setTable_location(tableParams_.icebergTableName_);
+    tbl.partition_spec = new ArrayList<>();
+    for (IcebergPartitionSpec spec : partitionSpecs_) {
+      tbl.addToPartition_spec(spec.toThrift());
+    }
+    desc.setIcebergTable(tbl);
+    return desc;
+  }
+
+  @Immutable
+  private static class TableParams {
+    private final String icebergTableName_;
+
+    TableParams(Table msTable) {
+      String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
+      if (msTable.getSd().isSetLocation()) {
+        icebergTableName_ = msTable.getSd().getLocation();
+      } else {
+        throw new LocalCatalogException("Cannot find iceberg table name for table "
+            + fullTableName);
+      }
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index 6c22898..ca2d736 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -35,6 +35,7 @@ import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.SqlConstraints;
 import org.apache.impala.catalog.StructField;
@@ -97,6 +98,8 @@ abstract class LocalTable implements FeTable {
       t = LocalHbaseTable.loadFromHbase(db, msTbl, ref);
     } else if (KuduTable.isKuduTable(msTbl)) {
       t = LocalKuduTable.loadFromKudu(db, msTbl, ref);
+    } else if (IcebergTable.isIcebergTable(msTbl)) {
+      t = LocalIcebergTable.loadFromIceberg(db, msTbl, ref);
     } else if (DataSourceTable.isDataSourceTable(msTbl)) {
       // TODO(todd) support datasource table
     } else if (HdfsFileFormat.isHdfsInputFormatClass(
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 72713ce..54ca1d1 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -64,6 +64,7 @@ import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
@@ -1485,6 +1486,9 @@ public class SingleNodePlanner {
           aggInfo);
       scanNode.init(analyzer);
       return scanNode;
+    } else if (tblRef.getTable() instanceof IcebergTable) {
+      // This function will be supported in the future
+      throw new NotImplementedException("Query is not supported for iceberg table now");
     } else {
       throw new NotImplementedException(
           "Planning not implemented for table ref class: " + tblRef.getClass());
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 854d1d9..a9d5ba1 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -34,9 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -88,6 +85,7 @@ import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.HiveStorageDescriptorFactory;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.IncompleteTable;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
@@ -2251,6 +2249,9 @@ public class CatalogOpExecutor {
     org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params);
     LOG.trace("Creating table {}", tableName);
     if (KuduTable.isKuduTable(tbl)) return createKuduTable(tbl, params, response);
+    else if (IcebergTable.isIcebergTable(tbl)) {
+      return createIcebergTable(tbl, params, response);
+    }
     Preconditions.checkState(params.getColumns().size() > 0,
         "Empty column list given as argument to Catalog.createTable");
     return createTable(tbl, params.if_not_exists, params.getCache_op(),
@@ -2525,6 +2526,69 @@ public class CatalogOpExecutor {
   }
 
   /**
+   * Creates a new Icebrg table.
+   */
+  private boolean createIcebergTable(org.apache.hadoop.hive.metastore.api.Table newTable,
+                                  TCreateTableParams params, TDdlExecResponse response)
+      throws ImpalaException {
+    Preconditions.checkState(IcebergTable.isIcebergTable(newTable));
+
+    try {
+      // Add the table to the HMS and the catalog cache. Acquire metastoreDdlLock_ to
+      // ensure the atomicity of these operations.
+      synchronized (metastoreDdlLock_) {
+        try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+          boolean tableInMetastore =
+              msClient.getHiveClient().tableExists(newTable.getDbName(),
+                  newTable.getTableName());
+          if (!tableInMetastore) {
+            String location = newTable.getSd().getLocation();
+            //Create table in iceberg if necessary
+            if (IcebergTable.needsCreateInIceberg(newTable)) {
+              if (location == null) {
+                //Set location here if not been specified in sql
+                location = MetastoreShim.getPathForNewTable(
+                    msClient.getHiveClient().getDatabase(newTable.getDbName()), newTable);
+                newTable.getSd().setLocation(location);
+              }
+              IcebergCatalogOpExecutor.createTable(location, params);
+            } else {
+              if (location == null) {
+                addSummary(response, "Location is necessary for external iceberg table.");
+                return false;
+              }
+            }
+
+            // Iceberg tables are always unpartitioned. The partition columns are
+            // derived from the TCreateTableParams.partition_spec field, and could
+            // include one or more of the table columns
+            Preconditions.checkState(newTable.getPartitionKeys() == null ||
+                newTable.getPartitionKeys().isEmpty());
+            msClient.getHiveClient().createTable(newTable);
+          } else {
+            addSummary(response, "Table already exists.");
+            return false;
+          }
+        }
+        // Add the table to the catalog cache
+        Table newTbl = catalog_.addIncompleteTable(newTable.getDbName(),
+            newTable.getTableName());
+        addTableToCatalogUpdate(newTbl, response.result);
+      }
+    } catch (Exception e) {
+      if (e instanceof AlreadyExistsException && params.if_not_exists) {
+        addSummary(response, "Table already exists.");
+        return false;
+      }
+      throw new ImpalaRuntimeException(
+          String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e);
+    }
+
+    addSummary(response, "Table has been created.");
+    return true;
+  }
+
+  /**
    * Creates a new table in the metastore based on the definition of an existing table.
    * No data is copied as part of this process, it is a metadata only operation. If the
    * creation succeeds, an entry is added to the metadata cache to lazily load the new
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 37578a4..7c40f69 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -94,6 +94,7 @@ import org.apache.impala.catalog.FeDataSourceTable;
 import org.apache.impala.catalog.FeDb;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeHBaseTable;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
@@ -103,6 +104,7 @@ import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
@@ -1196,6 +1198,12 @@ public class Frontend {
         Preconditions.checkState(op == TShowStatsOp.TABLE_STATS);
         return FeKuduTable.Utils.getTableStats((FeKuduTable) table);
       }
+    } else if (table instanceof FeIcebergTable) {
+      if (op == TShowStatsOp.PARTITIONS) {
+        return FeIcebergTable.Utils.getPartitionSpecs((FeIcebergTable) table);
+      } else {
+        throw new AnalysisException("Iceberg table not supported table stats now.");
+      }
     } else {
       throw new InternalException("Invalid table class: " + table.getClass());
     }
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
new file mode 100644
index 0000000..4e8f2a8
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+import org.apache.impala.catalog.ArrayType;
+import org.apache.impala.catalog.MapType;
+import org.apache.impala.catalog.ScalarType;
+import org.apache.impala.catalog.StructField;
+import org.apache.impala.catalog.StructType;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TCreateTableParams;
+import org.apache.impala.util.IcebergUtil;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This is a helper for the CatalogOpExecutor to provide Iceberg related DDL functionality
+ * such as creating and dropping tables from Iceberg.
+ */
+public class IcebergCatalogOpExecutor {
+  public static final Logger LOG = Logger.getLogger(IcebergCatalogOpExecutor.class);
+
+  // Keep id increase for each thread
+  private static ThreadLocal<Integer> iThreadLocal = new ThreadLocal<>();
+
+  public static void createTable(String metadataLoc, TCreateTableParams params)
+      throws ImpalaRuntimeException {
+    // Each table id increase from zero
+    iThreadLocal.set(0);
+    HadoopTables tables = IcebergUtil.getHadoopTables();
+    Schema schema = createIcebergSchema(params);
+    tables.create(schema, IcebergUtil.createIcebergPartition(schema, params),
+        metadataLoc);
+    LOG.info("Create iceberg table successful.");
+  }
+
+  /**
+   * Transform a StructField to Iceberg NestedField
+   */
+  private static Types.NestedField createIcebergNestedField(StructField structField)
+      throws ImpalaRuntimeException {
+    Preconditions.checkState(structField != null);
+    org.apache.iceberg.types.Type icebergType = createIcebergType(structField.getType());
+    Types.NestedField filed =
+        Types.NestedField.required(getNextId(), structField.getName(), icebergType,
+            structField.getComment());
+    return filed;
+  }
+
+  /**
+   * Transform a TColumn to Iceberg NestedField
+   */
+  private static Types.NestedField createIcebergNestedField(TColumn column)
+      throws ImpalaRuntimeException {
+    Type type = Type.fromThrift(column.getColumnType());
+    Preconditions.checkState(type != null);
+    org.apache.iceberg.types.Type icebergType = createIcebergType(type);
+    Types.NestedField filed =
+        Types.NestedField.required(getNextId(), column.getColumnName(), icebergType,
+            column.getComment());
+    return filed;
+  }
+
+  /**
+   * Build iceberg schema by parameters.
+   */
+  private static Schema createIcebergSchema(TCreateTableParams params)
+      throws ImpalaRuntimeException {
+    List<Types.NestedField> fields = new ArrayList<Types.NestedField>();
+    for (TColumn column : params.getColumns()) {
+      fields.add(createIcebergNestedField(column));
+    }
+    return new Schema(fields);
+  }
+
+  /**
+   * Converts a given Impala catalog type to the Iceberg type, and
+   * id is necessary for each iceberg complex type
+   */
+  public static org.apache.iceberg.types.Type createIcebergType(Type t)
+      throws ImpalaRuntimeException {
+    if (t.isScalarType()) {
+      ScalarType s = (ScalarType) t;
+      switch (s.getPrimitiveType()) {
+        case INT:
+          return Types.IntegerType.get();
+        case BIGINT:
+          return Types.LongType.get();
+        case BOOLEAN:
+          return Types.BooleanType.get();
+        case STRING:
+          return Types.StringType.get();
+        case DOUBLE:
+          return Types.DoubleType.get();
+        case FLOAT:
+          return Types.FloatType.get();
+        case TIMESTAMP:
+          return Types.TimestampType.withZone();
+        case DECIMAL:
+          return Types.DecimalType.of(s.decimalPrecision(), s.decimalScale());
+        case DATE:
+          return Types.DateType.get();
+        case BINARY:
+          return Types.BinaryType.get();
+        /* Fall through below */
+        case INVALID_TYPE:
+        case NULL_TYPE:
+        case DATETIME:
+        case CHAR:
+        case TINYINT:
+        case SMALLINT:
+        case VARCHAR:
+        default:
+          throw new ImpalaRuntimeException(String.format(
+              "Type %s is not supported in Iceberg", s.toSql()));
+      }
+    } else if (t.isArrayType()) {
+      ArrayType arrayType = (ArrayType) t;
+      return Types.ListType.ofRequired(getNextId(),
+          createIcebergType(arrayType.getItemType()));
+    } else if (t.isMapType()) {
+      MapType mapType = (MapType) t;
+      return Types.MapType.ofRequired(getNextId(), getNextId(),
+          createIcebergType(mapType.getKeyType()),
+          createIcebergType(mapType.getValueType()));
+    } else if (t.isStructType()) {
+      StructType structType = (StructType) t;
+      List<Types.NestedField> nestedFields = new ArrayList<Types.NestedField>();
+      List<StructField> structFields = structType.getFields();
+      for (StructField structField : structFields) {
+        nestedFields.add(createIcebergNestedField(structField));
+      }
+      return Types.StructType.of(nestedFields);
+    } else {
+      throw new ImpalaRuntimeException(String.format(
+          "Type %s is not supported in Iceberg", t.toSql()));
+    }
+  }
+
+  private static int getNextId() {
+    int nextId = iThreadLocal.get();
+    iThreadLocal.set(nextId+1);
+    return nextId;
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
new file mode 100644
index 0000000..79963d6
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.hadoop.HadoopTableOperations;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.impala.catalog.ArrayType;
+import org.apache.impala.catalog.MapType;
+import org.apache.impala.catalog.ScalarType;
+import org.apache.impala.catalog.StructField;
+import org.apache.impala.catalog.StructType;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.Type;
+import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TCreateTableParams;
+import org.apache.impala.thrift.TIcebergPartitionField;
+import org.apache.impala.thrift.TIcebergPartitionTransform;
+
+public class IcebergUtil {
+
+  /**
+   * Get HadoopTables by impala cluster related config
+   */
+  public static HadoopTables getHadoopTables() {
+    return new HadoopTables(FileSystemUtil.getConfiguration());
+  }
+
+  /**
+   * Get BaseTable by iceberg file system table location
+   */
+  public static BaseTable getBaseTable(String tableLocation) {
+    HadoopTables tables = IcebergUtil.getHadoopTables();
+    return (BaseTable) tables.load(tableLocation);
+  }
+
+  /**
+   * Get TableMetadata by iceberg file system table location
+   */
+  public static TableMetadata getIcebergTableMetadata(String tableLocation) {
+    HadoopTableOperations operations = (HadoopTableOperations)
+        getBaseTable(tableLocation).operations();
+    return operations.current();
+  }
+
+  /**
+   * Build iceberg PartitionSpec by parameters.
+   * partition columns are all from source columns, this is different from hdfs table.
+   */
+  public static PartitionSpec createIcebergPartition(Schema schema,
+                                                     TCreateTableParams params)
+      throws ImpalaRuntimeException {
+    if (params.getPartition_spec() == null) {
+      return null;
+    }
+    PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+    List<TIcebergPartitionField> partitionFields =
+        params.getPartition_spec().getPartition_fields();
+    for (TIcebergPartitionField partitionField : partitionFields) {
+      if (partitionField.getField_type() == TIcebergPartitionTransform.IDENTITY) {
+        builder.identity(partitionField.getField_name());
+      } else if (partitionField.getField_type() == TIcebergPartitionTransform.HOUR) {
+        builder.hour(partitionField.getField_name());
+      } else if (partitionField.getField_type() == TIcebergPartitionTransform.DAY) {
+        builder.day(partitionField.getField_name());
+      } else if (partitionField.getField_type() == TIcebergPartitionTransform.MONTH) {
+        builder.month(partitionField.getField_name());
+      } else if (partitionField.getField_type() == TIcebergPartitionTransform.YEAR) {
+        builder.year(partitionField.getField_name());
+      } else {
+        throw new ImpalaRuntimeException(String.format("Skip partition: %s, %s",
+            partitionField.getField_name(), partitionField.getField_type()));
+      }
+    }
+    return builder.build();
+  }
+
+  /**
+   * Build TIcebergPartitionTransform by iceberg PartitionField
+   */
+  public static TIcebergPartitionTransform getPartitionTransform(PartitionField field)
+      throws TableLoadingException {
+    String type = field.transform().toString();
+    return getPartitionTransform(type);
+  }
+
+  public static TIcebergPartitionTransform getPartitionTransform(String type)
+      throws TableLoadingException {
+    type = type.toUpperCase();
+    if ("IDENTITY".equals(type)) {
+      return TIcebergPartitionTransform.IDENTITY;
+    } else if ("HOUR".equals(type)) {
+      return TIcebergPartitionTransform.HOUR;
+    } else if ("DAY".equals(type)) {
+      return TIcebergPartitionTransform.DAY;
+    } else if ("MONTH".equals(type)) {
+      return TIcebergPartitionTransform.MONTH;
+    } else if ("YEAR".equals(type)) {
+      return TIcebergPartitionTransform.YEAR;
+    } else if ("BUCKET".equals(type)) {
+      return TIcebergPartitionTransform.BUCKET;
+    } else if ("TRUNCATE".equals(type)) {
+      return TIcebergPartitionTransform.TRUNCATE;
+    } else {
+      throw new TableLoadingException("Unsupported iceberg partition type: " +
+          type);
+    }
+  }
+
+  /**
+   * Transform iceberg type to impala type
+   */
+  public static Type toImpalaType(org.apache.iceberg.types.Type t)
+      throws TableLoadingException {
+    switch (t.typeId()) {
+      case BOOLEAN:
+        return Type.BOOLEAN;
+      case INTEGER:
+        return Type.INT;
+      case LONG:
+        return Type.BIGINT;
+      case FLOAT:
+        return Type.FLOAT;
+      case DOUBLE:
+        return Type.DOUBLE;
+      case STRING:
+        return Type.STRING;
+      case DATE:
+        return Type.DATE;
+      case BINARY:
+        return Type.BINARY;
+      case TIMESTAMP:
+        return Type.TIMESTAMP;
+      case DECIMAL:
+        Types.DecimalType decimal = (Types.DecimalType) t;
+        return ScalarType.createDecimalType(decimal.precision(), decimal.scale());
+      case LIST: {
+        Types.ListType listType = (Types.ListType) t;
+        return new ArrayType(toImpalaType(listType.elementType()));
+      }
+      case MAP: {
+        Types.MapType mapType = (Types.MapType) t;
+        return new MapType(toImpalaType(mapType.keyType()),
+            toImpalaType(mapType.valueType()));
+      }
+      case STRUCT: {
+        Types.StructType structType = (Types.StructType) t;
+        List<StructField> structFields = new ArrayList<>();
+        List<Types.NestedField> nestedFields = structType.fields();
+        for (Types.NestedField nestedField : nestedFields) {
+          structFields.add(new StructField(nestedField.name(),
+              toImpalaType(nestedField.type())));
+        }
+        return new StructType(structFields);
+      }
+      default:
+        throw new TableLoadingException(String.format(
+            "Iceberg type '%s' is not supported in Impala", t.typeId()));
+    }
+  }
+}
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 4c97823..2187dee 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -147,6 +147,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("hash", SqlParserSymbols.KW_HASH);
     keywordMap.put("having", SqlParserSymbols.KW_HAVING);
     keywordMap.put("hudiparquet", SqlParserSymbols.KW_HUDIPARQUET);
+    keywordMap.put("iceberg", SqlParserSymbols.KW_ICEBERG);
     keywordMap.put("if", SqlParserSymbols.KW_IF);
     keywordMap.put("ilike", SqlParserSymbols.KW_ILIKE);
     keywordMap.put("ignore", SqlParserSymbols.KW_IGNORE);
@@ -233,6 +234,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("show", SqlParserSymbols.KW_SHOW);
     keywordMap.put("smallint", SqlParserSymbols.KW_SMALLINT);
     keywordMap.put("sort", SqlParserSymbols.KW_SORT);
+    keywordMap.put("spec", SqlParserSymbols.KW_SPEC);
     keywordMap.put("stats", SqlParserSymbols.KW_STATS);
     keywordMap.put("stored", SqlParserSymbols.KW_STORED);
     keywordMap.put("straight_join", SqlParserSymbols.KW_STRAIGHT_JOIN);
diff --git a/impala-parent/pom.xml b/impala-parent/pom.xml
index e3a0db1..a6ed550 100644
--- a/impala-parent/pom.xml
+++ b/impala-parent/pom.xml
@@ -63,6 +63,7 @@ under the License.
     <derby.version>10.14.2.0</derby.version>
     <jackson-databind.version>2.10.0</jackson-databind.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <iceberg.version>${env.IMPALA_ICEBERG_VERSION}</iceberg.version>
   </properties>
 
   <repositories>
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg_create.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg_create.test
new file mode 100644
index 0000000..c2d2603
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg_create.test
@@ -0,0 +1,154 @@
+====
+---- QUERY
+CREATE TABLE iceberg_test1(
+  level STRING,
+  event_time TIMESTAMP,
+  register_time DATE,
+  message STRING,
+  price DECIMAL(8,1),
+  map_test MAP <STRING, array <STRING>>,
+  struct_test STRUCT <f1: BIGINT, f2: BIGINT>
+)
+PARTITION BY SPEC
+(
+  level IDENTITY,
+  event_time IDENTITY,
+  event_time HOUR,
+  register_time DAY
+)
+STORED AS ICEBERG;
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+DESCRIBE iceberg_test1;
+---- RESULTS
+'level','string',''
+'event_time','timestamp',''
+'register_time','date',''
+'message','string',''
+'price','decimal(8,1)',''
+'map_test','map<string,array<string>>',''
+'struct_test','struct<\n  f1:bigint,\n  f2:bigint\n>',''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+SHOW PARTITIONS iceberg_test1;
+---- RESULTS
+0,1,1000,'level','IDENTITY'
+0,2,1001,'event_time','IDENTITY'
+0,2,1002,'event_time_hour','HOUR'
+0,3,1003,'register_time_day','DAY'
+---- TYPES
+BIGINT,BIGINT,BIGINT,STRING,STRING
+====
+---- QUERY
+DROP TABLE iceberg_test1;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+CREATE TABLE iceberg_test2(
+  level STRING
+)
+STORED AS ICEBERG;
+DESCRIBE iceberg_test2;
+---- RESULTS
+'level','string',''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+SHOW PARTITIONS iceberg_test2;
+---- CATCH
+AnalysisException: Iceberg table does not have PartitionSpec: $DATABASE.iceberg_test2
+====
+---- QUERY
+CREATE TABLE iceberg_test3(
+  level STRING
+)
+PARTITION BY SPEC
+(
+  level IDENTITY,
+  event_time HOUR
+)
+STORED AS ICEBERG;
+---- CATCH
+ImpalaRuntimeException: Error making 'createTable' RPC to Hive Metastore: 
+CAUSED BY: IllegalArgumentException: Cannot find source column: event_time
+====
+---- QUERY
+CREATE TABLE iceberg_test3
+STORED AS ICEBERG;
+---- CATCH
+AnalysisException: Table requires at least 1 column for managed iceberg table.
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_test_external(
+  level STRING
+)
+STORED AS ICEBERG;
+---- RESULTS
+'Location is necessary for external iceberg table.'
+====
+---- QUERY
+CREATE TABLE iceberg_test4(
+  level STRING
+)
+PARTITION BY SPEC
+(
+  level IDENTITY
+)
+STORED AS ICEBERG
+LOCATION '/iceberg_test_with_location';
+CREATE EXTERNAL TABLE iceberg_test_external(
+  level STRING
+)
+STORED AS ICEBERG
+LOCATION '/iceberg_test_with_location';
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+DESCRIBE iceberg_test_external;
+---- RESULTS
+'level','string',''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+SHOW PARTITIONS iceberg_test_external;
+---- RESULTS
+0,1,1000,'level','IDENTITY'
+---- TYPES
+BIGINT,BIGINT,BIGINT,STRING,STRING
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_test_external_empty_column
+STORED AS ICEBERG
+LOCATION '/iceberg_test_with_location';
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+DESCRIBE iceberg_test_external_empty_column;
+---- RESULTS
+'level','string',''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+SHOW PARTITIONS iceberg_test_external_empty_column;
+---- RESULTS
+0,1,1000,'level','IDENTITY'
+---- TYPES
+BIGINT,BIGINT,BIGINT,STRING,STRING
+====
+---- QUERY
+DROP TABLE iceberg_test4;
+DROP TABLE iceberg_test_external;
+DROP TABLE iceberg_test_external_empty_column;
+---- RESULTS
+'Table has been dropped.'
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index cfa1bcc..794a6ab 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -142,7 +142,7 @@ Invalid Kudu read mode: 'bar'. Valid values are DEFAULT(0), READ_LATEST(1), READ
 ---- QUERY
 set default_file_format=bar
 ---- CATCH
-Invalid default file format: 'bar'. Valid values are TEXT(0), RC_FILE(1), SEQUENCE_FILE(2), AVRO(3), PARQUET(4), KUDU(5), ORC(6), HUDI_PARQUET(7).
+Invalid default file format: 'bar'. Valid values are TEXT(0), RC_FILE(1), SEQUENCE_FILE(2), AVRO(3), PARQUET(4), KUDU(5), ORC(6), HUDI_PARQUET(7), ICEBERG(8).
 ====
 ---- QUERY
 set default_transactional_type=bar
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index ddac51c..daaac65 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -556,3 +556,22 @@ WITH SERDEPROPERTIES ('field.delim'=',', 'serialization.format'=',')
 STORED AS TEXTFILE
 LOCATION '$$location_uri$$'
 ====
+---- CREATE_TABLE
+CREATE TABLE iceberg_test1 (
+  level STRING
+)
+STORED AS ICEBERG
+---- RESULTS-HIVE
+CREATE TABLE show_create_table_test_db.iceberg_test1 (
+  level STRING
+)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_test1 (
+  level STRING
+)
+STORED AS ICEBERG
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('external.table.purge'='TRUE')
+====
diff --git a/tests/custom_cluster/test_iceberg.py b/tests/custom_cluster/test_iceberg.py
new file mode 100644
index 0000000..be2857f
--- /dev/null
+++ b/tests/custom_cluster/test_iceberg.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class TestCreatingIcebergTable(CustomClusterTestSuite):
+  """Test creating iceberg managed and external table"""
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  def test_create_iceberg_tables(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg_create', vector, use_db=unique_database)


[impala] 02/05: IMPALA-7538: Support HDFS caching with LocalCatalog

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b02fad2db48b5725483fc52098a0c6c04806394b
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Jun 10 21:09:11 2020 +0800

    IMPALA-7538: Support HDFS caching with LocalCatalog
    
    This patch adds support for HDFS caching in LocalCatalog coordinators.
    We use the same way catalogd propagates HdfsCachePools in catalog-v1.
    They are cached in LocalCatalog coordinators as v1 and are not
    “fetch-on-demand” since only cache pool names are cached.
    
    The isMarkedCached markers of HdfsTable and HdfsPartition are also
    propagated to the LocalCatalog coordinators for correctly handling
    ShowTableStats and ShowPartitions statements with caching information.
    
    Tests:
     - Revive hdfs caching related tests in metadata/test_ddl.py and
       query_test/test_hdfs_caching.py for LocalCatalog.
    
    Change-Id: I661f7b76a9575f6f5b3fa2c6feebda1a5d7c3712
    Reviewed-on: http://gerrit.cloudera.org:8080/16058
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/CatalogService.thrift                |  9 +++
 .../impala/catalog/CatalogServiceCatalog.java      |  4 +-
 .../java/org/apache/impala/catalog/FeCatalog.java  |  1 -
 .../java/org/apache/impala/catalog/HdfsTable.java  |  4 +
 .../impala/catalog/local/CatalogdMetaProvider.java | 89 ++++++++++++++++++++--
 .../impala/catalog/local/DirectMetaProvider.java   | 21 ++++-
 .../apache/impala/catalog/local/LocalCatalog.java  | 12 ++-
 .../impala/catalog/local/LocalFsPartition.java     | 14 ++--
 .../apache/impala/catalog/local/LocalFsTable.java  | 41 +++++++---
 .../apache/impala/catalog/local/MetaProvider.java  |  5 ++
 tests/common/skip.py                               |  7 --
 tests/metadata/test_ddl.py                         |  1 -
 tests/query_test/test_hdfs_caching.py              |  1 -
 13 files changed, 175 insertions(+), 34 deletions(-)

diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 8c42471..b099b1d 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -358,6 +358,11 @@ struct TPartialPartitionInfo {
   // TTableInfoSelector. Incremental stats data can be fetched by setting
   // 'want_partition_stats' in TTableInfoSelector.
   6: optional bool has_incremental_stats
+
+  // Set to true if the partition is marked as cached by hdfs caching. Does not
+  // necessarily mean the data is cached. Set when 'want_partition_metadata' is true in
+  // TTableInfoSelector.
+  7: optional bool is_marked_cached
 }
 
 // Returned information about a Table, as selected by TTableInfoSelector.
@@ -391,6 +396,10 @@ struct TPartialTableInfo {
 
   // Valid write id list of ACID table.
   9: optional CatalogObjects.TValidWriteIdList valid_write_ids;
+
+  // Set if this table is marked as cached by hdfs caching. Does not necessarily mean the
+  // data is cached or that all/any partitions are cached. Only used in analyzing DDLs.
+  10: optional bool is_marked_cached
 }
 
 struct TBriefTableMeta {
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index fafad2c..446c80b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -745,10 +745,12 @@ public class CatalogServiceCatalog extends Catalog {
         min.setFn(fnObject);
         break;
       case DATA_SOURCE:
-      case HDFS_CACHE_POOL:
         // These are currently not cached by v2 impalad.
         // TODO(todd): handle these items.
         return null;
+      case HDFS_CACHE_POOL:
+        // HdfsCachePools just contain the name strings. Publish them as minimal objects.
+        return obj;
       default:
         throw new AssertionError("Unexpected catalog object type: " + obj.type);
       }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalog.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalog.java
index 093ddd7..4b6fc45 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalog.java
@@ -74,7 +74,6 @@ public interface FeCatalog {
   // TODO(todd): introduce FeFunction
   public Function getFunction(Function desc, Function.CompareMode mode);
 
-  // TODO(todd): introduce FeFsCachePool
   /** @see Catalog#getHdfsCachePool(String) */
   public HdfsCachePool getHdfsCachePool(String poolName);
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index a5b231e..cadc269 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1656,6 +1656,7 @@ public class HdfsTable extends Table implements FeFsTable {
           partInfo.setPartition_stats(part.getPartitionStatsCompressed());
         }
 
+        partInfo.setIs_marked_cached(part.isMarkedCached());
         resp.table_info.partitions.add(partInfo);
       }
     }
@@ -1678,6 +1679,9 @@ public class HdfsTable extends Table implements FeFsTable {
           sqlConstraints_.getForeignKeys());
       resp.table_info.setSql_constraints(sqlConstraints);
     }
+    // Publish the isMarkedCached_ marker so coordinators don't need to validate
+    // it again which requires additional HDFS RPCs.
+    resp.table_info.setIs_marked_cached(isMarkedCached_);
     return resp;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 9cf4f2e..b71af6c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -54,6 +54,7 @@ import org.apache.impala.catalog.CatalogDeltaLog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogObjectCache;
 import org.apache.impala.catalog.Function;
+import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer;
 import org.apache.impala.catalog.Principal;
@@ -328,6 +329,11 @@ public class CatalogdMetaProvider implements MetaProvider {
       new CatalogObjectCache<>();
   private AtomicReference<? extends AuthorizationChecker> authzChecker_;
 
+  // Cache of known HDFS cache pools. Allows for checking the existence
+  // of pools without hitting HDFS. This is _not_ "fetch-on-demand".
+  private final CatalogObjectCache<HdfsCachePool> hdfsCachePools_ =
+      new CatalogObjectCache<>(false);
+
   public CatalogdMetaProvider(TBackendGflags flags) {
     Preconditions.checkArgument(flags.isSetLocal_catalog_cache_expiration_s());
     Preconditions.checkArgument(flags.isSetLocal_catalog_cache_mb());
@@ -359,6 +365,11 @@ public class CatalogdMetaProvider implements MetaProvider {
   }
 
   @Override
+  public Iterable<HdfsCachePool> getHdfsCachePools() {
+    return hdfsCachePools_;
+  }
+
+  @Override
   public AuthorizationPolicy getAuthPolicy() {
     return authPolicy_;
   }
@@ -718,7 +729,7 @@ public class CatalogdMetaProvider implements MetaProvider {
             return new TableMetaRefImpl(
                 dbName, tableName, resp.table_info.hms_table, resp.object_version_number,
                 new SqlConstraints(primaryKeys, foreignKeys),
-                resp.table_info.valid_write_ids);
+                resp.table_info.valid_write_ids, resp.table_info.is_marked_cached);
            }
       });
     // The table list is populated based on tables in a given Db in catalogd. If a table
@@ -963,7 +974,7 @@ public class CatalogdMetaProvider implements MetaProvider {
       }
       PartitionMetadataImpl metaImpl = new PartitionMetadataImpl(msPart,
           ImmutableList.copyOf(fds), part.getPartition_stats(),
-          part.has_incremental_stats);
+          part.has_incremental_stats, part.is_marked_cached);
 
       checkResponse(partRef != null, req, "returned unexpected partition id %s", part.id);
 
@@ -1102,6 +1113,8 @@ public class CatalogdMetaProvider implements MetaProvider {
   public synchronized TUpdateCatalogCacheResponse updateCatalogCache(
       TUpdateCatalogCacheRequest req) {
     if (req.isSetCatalog_service_id()) {
+      // Requests from processing statestore updates won't have this. Only requests from
+      // DDLs will set this. See more details in ImpalaServer::ProcessCatalogUpdateResult.
       witnessCatalogServiceId(req.catalog_service_id);
     }
 
@@ -1111,6 +1124,7 @@ public class CatalogdMetaProvider implements MetaProvider {
     Long nextCatalogVersion = null;
 
     ObjectUpdateSequencer authObjectSequencer = new ObjectUpdateSequencer();
+    ObjectUpdateSequencer hdfsCachePoolSequencer = new ObjectUpdateSequencer();
 
     Pair<Boolean, ByteBuffer> update;
     while ((update = FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr))
@@ -1137,6 +1151,16 @@ public class CatalogdMetaProvider implements MetaProvider {
 
       invalidateCacheForObject(obj);
 
+      if (obj.type == TCatalogObjectType.HDFS_CACHE_POOL) {
+        // HdfsCachePools have no dependency to each other. But we can't update
+        // hdfsCachePools_ directly since a new catalog service id (due to catalogd
+        // restart) will cause hdfsCachePools_ to be clear (see mode details in
+        // witnessCatalogServiceId()). So we have to deal with HDFS_CACHE_POOL objects
+        // after the CATALOG object.
+        hdfsCachePoolSequencer.add(obj, isDelete);
+        continue;
+      }
+
       // The sequencing of updates to authorization objects is important since they
       // may be cross-referential. So, just add them to the sequencer which ensures
       // we handle them in the right order later.
@@ -1144,6 +1168,7 @@ public class CatalogdMetaProvider implements MetaProvider {
           obj.type == TCatalogObjectType.PRIVILEGE ||
           obj.type == TCatalogObjectType.AUTHZ_CACHE_INVALIDATION) {
         authObjectSequencer.add(obj, isDelete);
+        continue;
       }
 
       // Handle CATALOG objects. These are sent only via the updates published via
@@ -1160,10 +1185,23 @@ public class CatalogdMetaProvider implements MetaProvider {
           // Detected a new reset() finishes in Catalogd, clear the cache in case some
           // tables are skipped in this topic update.
           cache_.invalidateAll();
+          // Don't need to clear hdfsCachePools_ if this comes from a catalogd restart,
+          // because we already clear it in witnessCatalogServiceId().
+          // Shouldn't clear hdfsCachePools_ if this comes from a global invalidation,
+          // because HdfsCachePool updates may already come in the previous statestore
+          // update (see how the version lock is held in CatalogServiceCatalog.reset()).
+          // Clear hdfsCachePools_ will also clear them.
         }
       }
     }
 
+    for (TCatalogObject obj : hdfsCachePoolSequencer.getUpdatedObjects()) {
+      updateHdfsCachePools(obj, /*isDelete=*/false);
+    }
+    for (TCatalogObject obj : hdfsCachePoolSequencer.getDeletedObjects()) {
+      updateHdfsCachePools(obj, /*isDelete=*/true);
+    }
+
     for (TCatalogObject obj : authObjectSequencer.getUpdatedObjects()) {
       updateAuthPolicy(obj, /*isDelete=*/false);
     }
@@ -1193,6 +1231,29 @@ public class CatalogdMetaProvider implements MetaProvider {
     }
   }
 
+  private void updateHdfsCachePools(TCatalogObject obj, boolean isDelete) {
+    Preconditions.checkState(obj.type == TCatalogObjectType.HDFS_CACHE_POOL);
+    String poolName = obj.getCache_pool().getPool_name();
+    if (isDelete) {
+      HdfsCachePool existingItem = hdfsCachePools_.get(poolName);
+      if (existingItem != null
+          && existingItem.getCatalogVersion() <= obj.getCatalog_version()) {
+        hdfsCachePools_.remove(poolName);
+        LOG.trace("Removed HdfsCachePool {}", poolName);
+      }
+      return;
+    }
+    HdfsCachePool cachePool = new HdfsCachePool(obj.getCache_pool());
+    cachePool.setCatalogVersion(obj.getCatalog_version());
+    if (hdfsCachePools_.add(cachePool)) {
+      LOG.trace("Added HdfsCachePool name={}, version={}", poolName,
+          obj.getCatalog_version());
+      return;
+    }
+    LOG.warn("Ignored stale HdfsCachePool update: name={}, version={}", poolName,
+        obj.getCatalog_version());
+  }
+
   private void updateAuthPolicy(TCatalogObject obj, boolean isDelete) {
     LOG.trace("Updating authorization policy: {} isDelete={}", obj, isDelete);
     switch (obj.type) {
@@ -1260,6 +1321,10 @@ public class CatalogdMetaProvider implements MetaProvider {
         }
         catalogServiceId_ = serviceId;
         cache_.invalidateAll();
+        // Clear cached items from the previous catalogd instance. Otherwise, we'll
+        // ignore new updates from the new catalogd instance since they have lower
+        // versions.
+        hdfsCachePools_.clear();
         // TODO(todd): we probably need to invalidate the auth policy too.
         // we are probably better off detecting this at a higher level and
         // reinstantiating the metaprovider entirely, similar to how ImpaladCatalog
@@ -1402,13 +1467,15 @@ public class CatalogdMetaProvider implements MetaProvider {
     private final ImmutableList<FileDescriptor> fds_;
     private final byte[] partitionStats_;
     private final boolean hasIncrementalStats_;
+    private final boolean isMarkedCached_;
 
     public PartitionMetadataImpl(Partition msPartition, ImmutableList<FileDescriptor> fds,
-        byte[] partitionStats, boolean hasIncrementalStats) {
+        byte[] partitionStats, boolean hasIncrementalStats, boolean isMarkedCached) {
       this.msPartition_ = Preconditions.checkNotNull(msPartition);
       this.fds_ = fds;
       this.partitionStats_ = partitionStats;
       this.hasIncrementalStats_ = hasIncrementalStats;
+      this.isMarkedCached_ = isMarkedCached;
     }
 
     /**
@@ -1423,7 +1490,7 @@ public class CatalogdMetaProvider implements MetaProvider {
         fds.add(fd.cloneWithNewHostIndex(origIndex.getList(), dstIndex));
       }
       return new PartitionMetadataImpl(msPartition_, ImmutableList.copyOf(fds),
-          partitionStats_, hasIncrementalStats_);
+          partitionStats_, hasIncrementalStats_, isMarkedCached_);
     }
 
     @Override
@@ -1441,6 +1508,9 @@ public class CatalogdMetaProvider implements MetaProvider {
 
     @Override
     public boolean hasIncrementalStats() { return hasIncrementalStats_; }
+
+    @Override
+    public boolean isMarkedCached() { return isMarkedCached_; }
   }
 
   /**
@@ -1473,21 +1543,30 @@ public class CatalogdMetaProvider implements MetaProvider {
      */
     private final TValidWriteIdList validWriteIds_;
 
+    /**
+     * True if this table's is marked as cached by hdfs caching. See comments in
+     * LocalFsTable.
+     */
+    private final boolean isMarkedCached_;
+
     public TableMetaRefImpl(String dbName, String tableName,
         Table msTable, long catalogVersion, SqlConstraints sqlConstraints,
-        TValidWriteIdList validWriteIds) {
+        TValidWriteIdList validWriteIds, boolean isMarkedCached) {
       this.dbName_ = dbName;
       this.tableName_ = tableName;
       this.msTable_ = msTable;
       this.catalogVersion_ = catalogVersion;
       this.sqlConstraints_ = sqlConstraints;
       this.validWriteIds_ = validWriteIds;
+      this.isMarkedCached_ = isMarkedCached;
     }
 
     @Override
     public String toString() {
       return String.format("TableMetaRef %s.%s@%d", dbName_, tableName_, catalogVersion_);
     }
+
+    public boolean isMarkedCached() { return isMarkedCached_; }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index e52f399..aad788f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -39,6 +39,7 @@ import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.FileMetadataLoader;
 import org.apache.impala.catalog.Function;
+import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
@@ -87,10 +88,16 @@ class DirectMetaProvider implements MetaProvider {
     }
   }
 
+  @Override
+  public Iterable<HdfsCachePool> getHdfsCachePools() {
+    throw new UnsupportedOperationException(
+        "HDFSCachePools are not supported in DirectMetaProvider");
+  }
 
   @Override
   public AuthorizationPolicy getAuthPolicy() {
-    throw new UnsupportedOperationException("not supported");
+    throw new UnsupportedOperationException(
+        "Authorization is not supported in DirectMetaProvider");
   }
 
   @Override
@@ -393,6 +400,12 @@ class DirectMetaProvider implements MetaProvider {
       throw new UnsupportedOperationException("Incremental stats not supported with " +
           "DirectMetaProvider implementation.");
     }
+
+    @Override
+    public boolean isMarkedCached() {
+      throw new UnsupportedOperationException("Hdfs caching not supported with " +
+          "DirectMetaProvider implementation");
+    }
   }
 
   private class TableMetaRefImpl implements TableMetaRef {
@@ -410,6 +423,12 @@ class DirectMetaProvider implements MetaProvider {
     private boolean isPartitioned() {
       return msTable_.getPartitionKeysSize() != 0;
     }
+
+    @Override
+    public boolean isMarkedCached() {
+      throw new UnsupportedOperationException("Hdfs caching not supported with " +
+          "DirectMetaProvider implementation");
+    }
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 16e6502..8011605 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -70,6 +70,7 @@ import com.google.common.base.Preconditions;
 public class LocalCatalog implements FeCatalog {
   private final MetaProvider metaProvider_;
   private Map<String, FeDb> dbs_ = new HashMap<>();
+  private Map<String, HdfsCachePool> hdfsCachePools_ = null;
   private String nullPartitionKeyValue_;
   private final String defaultKuduMasterHosts_;
 
@@ -213,7 +214,16 @@ public class LocalCatalog implements FeCatalog {
 
   @Override
   public HdfsCachePool getHdfsCachePool(String poolName) {
-    throw new UnsupportedOperationException("TODO");
+    loadHdfsCachePools();
+    return hdfsCachePools_.get(poolName);
+  }
+
+  private void loadHdfsCachePools() {
+    if (hdfsCachePools_ != null) return;
+    hdfsCachePools_ = new HashMap<>();
+    for (HdfsCachePool pool : metaProvider_.getHdfsCachePools()) {
+      hdfsCachePools_.put(pool.getName(), pool);
+    }
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index 5f7ebcc..3848ced 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -59,15 +59,21 @@ public class LocalFsPartition implements FeFsPartition {
   // True if partitionStats_ has intermediate_col_stats populated.
   private final boolean hasIncrementalStats_;
 
+  // True if this partition is marked as cached by hdfs caching. Does not necessarily
+  // mean the data is cached. Only used in analyzing DDLs or constructing results of
+  // SHOW TABLE STATS / SHOW PARTITIONS.
+  private final boolean isMarkedCached_;
+
   public LocalFsPartition(LocalFsTable table, LocalPartitionSpec spec,
       Partition msPartition, ImmutableList<FileDescriptor> fileDescriptors,
-      byte [] partitionStats, boolean hasIncrementalStats) {
+      byte [] partitionStats, boolean hasIncrementalStats, boolean isMarkedCached) {
     table_ = Preconditions.checkNotNull(table);
     spec_ = Preconditions.checkNotNull(spec);
     msPartition_ = Preconditions.checkNotNull(msPartition);
     fileDescriptors_ = fileDescriptors;
     partitionStats_ = partitionStats;
     hasIncrementalStats_ = hasIncrementalStats;
+    isMarkedCached_ = isMarkedCached;
   }
 
   @Override
@@ -137,14 +143,12 @@ public class LocalFsPartition implements FeFsPartition {
 
   @Override
   public boolean isCacheable() {
-    // TODO Auto-generated method stub
-    return false;
+    return FileSystemUtil.isPathCacheable(getLocationPath());
   }
 
   @Override
   public boolean isMarkedCached() {
-    // TODO Auto-generated method stub
-    return false;
+    return isMarkedCached_;
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 38d2fd1..4c4d6e8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -122,6 +122,13 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    */
   private final String avroSchema_;
 
+  /**
+   * True if this table is marked as cached by hdfs caching. Does not necessarily mean
+   * the data is cached or that all/any partitions are cached. Only used in analyzing
+   * DDLs.
+   */
+  private final boolean isMarkedCached_;
+
   public static LocalFsTable load(LocalDb db, Table msTbl, TableMetaRef ref) {
     String fullName = msTbl.getDbName() + "." + msTbl.getTableName();
 
@@ -173,6 +180,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
         FeFsTable.DEFAULT_NULL_COLUMN_VALUE;
 
     avroSchema_ = explicitAvroSchema;
+    isMarkedCached_ = (ref != null && ref.isMarkedCached());
   }
 
   private static String loadAvroSchema(Table msTbl) throws AnalysisException {
@@ -196,23 +204,33 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
         /*explicitAvroSchema=*/null);
   }
 
-
-  @Override
+  @Override // FeFsTable
   public boolean isCacheable() {
-    // TODO Auto-generated method stub
-    return false;
+    if (!isLocationCacheable()) return false;
+    if (!isMarkedCached() && getNumClusteringCols() > 0) {
+      // Check if all partitions are cacheable.
+      // TODO: Currently we load all partitions including their file metadata in order to
+      //  detect whether they are cacheable. This is inefficient since only the partition
+      //  locations are needed. Consider decoupling msPartition and file descriptors in
+      //  LocalFsPartition so we can load the msPartition part individually.
+      loadPartitionSpecs();
+      for (FeFsPartition partition : loadPartitions(partitionSpecs_.keySet())) {
+        if (!partition.isCacheable()) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  @Override
+  @Override // FeFsTable
   public boolean isLocationCacheable() {
-    // TODO Auto-generated method stub
-    return false;
+    return FileSystemUtil.isPathCacheable(new Path(getLocation()));
   }
 
   @Override
   public boolean isMarkedCached() {
-    // TODO Auto-generated method stub
-    return false;
+    return isMarkedCached_;
   }
 
   @Override
@@ -365,7 +383,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
         this, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
     LocalFsPartition prototypePartition = new LocalFsPartition(
         this, spec, protoMsPartition, /*fileDescriptors=*/null, /*partitionStats=*/null,
-        /*hasIncrementalStats=*/false);
+        /*hasIncrementalStats=*/false, /*isMarkedCached=*/false);
     return prototypePartition;
   }
 
@@ -440,7 +458,8 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
       }
 
       LocalFsPartition part = new LocalFsPartition(this, spec, p.getHmsPartition(),
-          p.getFileDescriptors(), p.getPartitionStats(), p.hasIncrementalStats());
+          p.getFileDescriptors(), p.getPartitionStats(), p.hasIncrementalStats(),
+          p.isMarkedCached());
       ret.add(part);
     }
     return ret;
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 610d3e0..a0d4419 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.authorization.AuthorizationPolicy;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Function;
+import org.apache.impala.catalog.HdfsCachePool;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.SqlConstraints;
 import org.apache.impala.common.Pair;
@@ -128,6 +129,7 @@ public interface MetaProvider {
    * in order to perform concurrency control checks, etc.
    */
   interface TableMetaRef {
+    boolean isMarkedCached();
   }
 
   /**
@@ -147,7 +149,10 @@ public interface MetaProvider {
     ImmutableList<FileDescriptor> getFileDescriptors();
     byte[] getPartitionStats();
     boolean hasIncrementalStats();
+    boolean isMarkedCached();
   }
 
   public TValidWriteIdList getValidWriteIdList(TableMetaRef ref);
+
+  Iterable<HdfsCachePool> getHdfsCachePools();
 }
diff --git a/tests/common/skip.py b/tests/common/skip.py
index c2cf431..0911b6f 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -253,13 +253,6 @@ class SkipIfCatalogV2:
       IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster(),
       reason="IMPALA-7131: data sources not supported.")
 
-  # TODO: IMPALA-7538: add support or update tests to reflect expected behaviour.
-  @classmethod
-  def hdfs_caching_ddl_unsupported(self):
-    return pytest.mark.skipif(
-      IMPALA_TEST_CLUSTER_PROPERTIES.is_catalog_v2_cluster(),
-      reason="IMPALA-7538: HDFS caching DDL not supported.")
-
   # TODO: IMPALA-8489: fix this bug.
   @classmethod
   def impala_8489(self):
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index fed4ecf..f6282cb 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -445,7 +445,6 @@ class TestDdlStatements(TestDdlBase):
 
   @SkipIf.not_hdfs
   @SkipIfLocal.hdfs_client
-  @SkipIfCatalogV2.hdfs_caching_ddl_unsupported()
   @UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2)
   def test_alter_table_hdfs_caching(self, vector, unique_database):
     self.run_test_case('QueryTest/alter-table-hdfs-caching', vector,
diff --git a/tests/query_test/test_hdfs_caching.py b/tests/query_test/test_hdfs_caching.py
index 107b33a..cf77b6e 100644
--- a/tests/query_test/test_hdfs_caching.py
+++ b/tests/query_test/test_hdfs_caching.py
@@ -179,7 +179,6 @@ class TestHdfsCachingFallbackPath(ImpalaTestSuite):
 @SkipIfADLS.caching
 @SkipIfIsilon.caching
 @SkipIfLocal.caching
-@SkipIfCatalogV2.hdfs_caching_ddl_unsupported()
 class TestHdfsCachingDdl(ImpalaTestSuite):
   @classmethod
   def get_workload(self):


[impala] 05/05: IMPALA-5904: (part 4) Fix more TSAN bugs

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 17fd15c6e4981499932c02d541c76757a5fdf87d
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Thu Apr 30 10:56:44 2020 -0700

    IMPALA-5904: (part 4) Fix more TSAN bugs
    
    Fixes the following TSAN data races that come up when running custom
    cluster tests. The immediate goal is to fix all remaining data races in
    custom cluster tests and then enable custom cluster tests in the TSAN
    builds. This patch fixes about half of the remaining data races reported
    during a TSAN build of custom cluster tests.
    
    SUMMARY: ThreadSanitizer: data race util/stopwatch.h:186:9 in impala::MonotonicStopWatch::RunningTime() const
      Read of size 8 at 0x7b580000dba8 by thread T342:
        #0 impala::MonotonicStopWatch::RunningTime() const util/stopwatch.h:186:9
        #1 impala::MonotonicStopWatch::Reset() util/stopwatch.h:136:20
        #2 impala::StatestoreSubscriber::Heartbeat(impala::TUniqueId const&) statestore/statestore-subscriber.cc:358:35
      Previous write of size 8 at 0x7b580000dba8 by thread T341:
        #0 impala::MonotonicStopWatch::Reset() util/stopwatch.h:139:21 (impalad+0x1f744ab)
        #1 impala::StatestoreSubscriber::Heartbeat(impala::TUniqueId const&) statestore/statestore-subscriber.cc:358:35
    
    SUMMARY: ThreadSanitizer: data race status.h:220:10 in impala::Status::operator=(impala::Status&&)
      Write of size 8 at 0x7b50002e01e0 by thread T341 (mutexes: write M17919):
        #0 impala::Status::operator=(impala::Status&&) common/status.h:220:10
        #1 impala::RuntimeState::SetQueryStatus(std::string const&) runtime/runtime-state.h:250
        #2 impala_udf::FunctionContext::SetError(char const*) udf/udf.cc:423:47
      Previous read of size 8 at 0x7b50002e01e0 by thread T342:
        #0 impala::Status::ok() const common/status.h:236:42
        #1 impala::RuntimeState::GetQueryStatus() runtime/runtime-state.h:15
        #2 impala::HdfsScanner::CommitRows(int, impala::RowBatch*) exec/hdfs-scanner.cc:218:3
    
    SUMMARY: ThreadSanitizer: data race hashtable.h:370:58
      Read of size 8 at 0x7b2400091df8 by thread T338 (mutexes: write M106814410723061456):
    ...
        #3 impala::MetricGroup::CMCompatibleCallback() util/metrics.cc:185:40
    ...
        #9 impala::Webserver::RenderUrlWithTemplate() util/webserver.cc:801:3
        #10 impala::Webserver::BeginRequestCallback(sq_connection*, sq_request_info*) util/webserver.cc:696:5
      Previous write of size 8 at 0x7b2400091df8 by thread T364 (mutexes: write M600803201008047112, write M1046659357959855584):
    ...
        #4 impala::AtomicMetric<(impala::TMetricKind::type)0>* impala::MetricGroup::RegisterMetric<> >() util/metrics.h:366:5
        #5 impala::MetricGroup::AddGauge(std::string const&, long, std::string const&) util/metrics.h:384:12
        #6 impala::AdmissionController::PoolStats::InitMetrics() scheduling/admission-controller.cc:1714:55
    
    Testing:
    * Ran core tests
    * Re-ran TSAN tests and made sure issues were resolved
    * Ran single_node_perf_run for workload TPC-H scale factor 30;
      no regressions detected
    
    +----------+-----------------------+---------+------------+------------+----------------+
    | Workload | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
    +----------+-----------------------+---------+------------+------------+----------------+
    | TPCH(30) | parquet / none / none | 7.36    | -1.77%     | 5.01       | -1.61%         |
    +----------+-----------------------+---------+------------+------------+----------------+
    
    Change-Id: Id4244c9a7f971c96b8b8dc7d5262904a0a4b77c1
    Reviewed-on: http://gerrit.cloudera.org:8080/16079
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/runtime-state.cc            | 11 ++++++-----
 be/src/runtime/runtime-state.h             | 10 ++++++++--
 be/src/statestore/statestore-subscriber.cc |  2 +-
 be/src/statestore/statestore-subscriber.h  |  5 +++--
 be/src/util/metrics.cc                     | 24 +++++++++++++++++++-----
 5 files changed, 37 insertions(+), 15 deletions(-)

diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 05c0d65..54f46fe 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -287,15 +287,16 @@ void RuntimeState::SetMemLimitExceeded(MemTracker* tracker,
   // cannot abort the fragment immediately. It relies on callers checking status
   // periodically. This means that this function could be called a large number of times
   // (e.g. once per row) before the fragment aborts. See IMPALA-6997.
-  {
-    lock_guard<SpinLock> l(query_status_lock_);
-    if (!query_status_.ok()) return;
-  }
+  if (!is_query_status_ok_.Load()) return;
   Status status = tracker->MemLimitExceeded(this, msg == nullptr ? "" : msg->msg(),
       failed_allocation_size);
   {
     lock_guard<SpinLock> l(query_status_lock_);
-    if (query_status_.ok()) query_status_ = status;
+    if (query_status_.ok()) {
+      query_status_ = status;
+      bool set_query_status_ok_ = is_query_status_ok_.CompareAndSwap(true, false);
+      DCHECK(set_query_status_ok_);
+    }
   }
   LogError(status.msg());
   // Add warning about missing stats except for compute stats child queries.
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 27c3704..57f3806 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -152,8 +152,7 @@ class RuntimeState {
   const std::string& GetEffectiveUser() const;
 
   inline Status GetQueryStatus() {
-    // Do a racy check for query_status_ to avoid unnecessary spinlock acquisition.
-    if (UNLIKELY(!query_status_.ok())) {
+    if (UNLIKELY(!is_query_status_ok_.Load())) {
       std::lock_guard<SpinLock> l(query_status_lock_);
       return query_status_;
     }
@@ -248,6 +247,8 @@ class RuntimeState {
     std::lock_guard<SpinLock> l(query_status_lock_);
     if (!query_status_.ok()) return;
     query_status_ = Status(err_msg);
+    bool set_query_status_ok_ = is_query_status_ok_.CompareAndSwap(true, false);
+    DCHECK(set_query_status_ok_);
   }
 
   /// Sets query_status_ to MEM_LIMIT_EXCEEDED and logs all the registered trackers.
@@ -407,6 +408,11 @@ class RuntimeState {
   SpinLock query_status_lock_;
   Status query_status_;
 
+  /// True if the query_status_ is OK, false otherwise. Used to check if the
+  /// query_status_ is OK without incurring the overhead of acquiring the
+  /// query_status_lock_.
+  AtomicBool is_query_status_ok_{true};
+
   /// This is the node id of the root node for this plan fragment.
   ///
   /// This is used as the hash seed within the fragment so we do not run into hash
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 0bf46e5..1d931bd 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -355,7 +355,7 @@ void StatestoreSubscriber::Heartbeat(const RegistrationId& registration_id) {
   const Status& status = CheckRegistrationId(registration_id);
   if (status.ok()) {
     heartbeat_interval_metric_->Update(
-        heartbeat_interval_timer_.Reset() / (1000.0 * 1000.0 * 1000.0));
+        heartbeat_interval_timer_.LapTime() / (1000.0 * 1000.0 * 1000.0));
     failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
   } else {
     VLOG_RPC << "Heartbeat: " << status.GetDetail();
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 4047b10..406022b 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -180,8 +180,9 @@ class StatestoreSubscriber {
   StatsMetric<double>* heartbeat_interval_metric_;
 
   /// Tracks the time between heartbeat messages. Only updated by Heartbeat(), which
-  /// should not run concurrently with itself.
-  MonotonicStopWatch heartbeat_interval_timer_;
+  /// should not run concurrently with itself. Use a ConcurrentStopWatch because the
+  /// watch is started in one thread, but read in another.
+  ConcurrentStopWatch heartbeat_interval_timer_;
 
   /// Current registration ID, in string form.
   StringProperty* registration_id_metric_;
diff --git a/be/src/util/metrics.cc b/be/src/util/metrics.cc
index 087f100..5182543 100644
--- a/be/src/util/metrics.cc
+++ b/be/src/util/metrics.cc
@@ -172,20 +172,34 @@ void MetricGroup::CMCompatibleCallback(const Webserver::WebRequest& req,
     return;
   }
 
+  // Add all metrics in the metric_map_ to the given document.
+  for (const MetricMap::value_type& m : metric_map_) {
+    m.second->ToLegacyJson(document);
+  }
+
+
+  // Depth-first traversal of children to flatten all metrics, which is what was
+  // expected by CM before we introduced metric groups.
   stack<MetricGroup*> groups;
-  groups.push(this);
-  do {
-    // Depth-first traversal of children to flatten all metrics, which is what was
-    // expected by CM before we introduced metric groups.
+  for (const ChildGroupMap::value_type& child : children_) {
+    groups.push(child.second);
+  }
+
+  while (!groups.empty()) {
     MetricGroup* group = groups.top();
     groups.pop();
+
+    // children_ and metric_map_ are protected by lock_, so acquire group->lock_ before
+    // adding the metrics to the given document.
+    lock_guard<SpinLock> l(group->lock_);
     for (const ChildGroupMap::value_type& child: group->children_) {
       groups.push(child.second);
     }
+
     for (const MetricMap::value_type& m: group->metric_map_) {
       m.second->ToLegacyJson(document);
     }
-  } while (!groups.empty());
+  }
 }
 
 void MetricGroup::TemplateCallback(const Webserver::WebRequest& req,