You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/10/01 17:39:06 UTC

[impala] branch master updated (c9f8d25 -> 5b720a4)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from c9f8d25  IMPALA-3335: Allow single-node optimization with joins
     new 6bb3b88  IMPALA-9180 (part 1): Remove legacy ImpalaInternalService
     new a0a25a6  IMPALA-10193: Limit the memory usage for the whole test cluster
     new 5b720a4  IMPALA-10164: Supporting HadoopCatalog for Iceberg table

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/generated-sources/gen-cpp/CMakeLists.txt        |   1 -
 be/src/benchmarks/expr-benchmark.cc                |   7 +-
 be/src/exprs/expr-test.cc                          |   4 +-
 be/src/exprs/utility-functions-ir.cc               |   4 +-
 be/src/rpc/impala-service-pool.cc                  |  16 ++-
 be/src/rpc/impala-service-pool.h                   |  10 ++
 be/src/rpc/rpc-mgr.cc                              |   7 +
 be/src/rpc/rpc-mgr.h                               |   3 +
 be/src/rpc/thrift-server-test.cc                   |  26 ----
 be/src/runtime/backend-client.h                    |  46 ------
 be/src/runtime/client-cache-types.h                |   8 --
 be/src/runtime/coordinator-backend-state.cc        |   1 -
 be/src/runtime/data-stream-test.cc                 |   1 -
 be/src/runtime/exec-env.cc                         |  30 ++--
 be/src/runtime/exec-env.h                          |  19 ++-
 be/src/runtime/fragment-instance-state.cc          |   1 -
 be/src/runtime/fragment-instance-state.h           |   1 -
 be/src/runtime/initial-reservations.cc             |   4 +-
 be/src/runtime/query-exec-mgr.cc                   |   3 +-
 be/src/runtime/query-state.cc                      |   8 +-
 be/src/runtime/runtime-filter-bank.cc              |  12 +-
 be/src/runtime/test-env.cc                         |   4 +-
 be/src/scheduling/scheduler-test-util.h            |   1 -
 be/src/service/CMakeLists.txt                      |   1 -
 be/src/service/client-request-state.cc             |   3 +-
 be/src/service/control-service.cc                  |   3 +-
 be/src/service/impala-internal-service.cc          |  46 ------
 be/src/service/impala-internal-service.h           |  40 ------
 be/src/service/impala-server.cc                    |  66 ++-------
 be/src/service/impala-server.h                     |  17 +--
 be/src/service/impalad-main.cc                     |   6 +-
 be/src/service/session-expiry-test.cc              |   1 -
 be/src/testutil/in-process-servers.cc              |  32 ++---
 be/src/testutil/in-process-servers.h               |   7 +-
 be/src/util/debug-util.cc                          |   4 +-
 bin/generate_minidump_collection_testdata.py       |   1 -
 bin/impala-config.sh                               |   3 +
 bin/start-impala-cluster.py                        |   6 +-
 common/thrift/CatalogObjects.thrift                |   6 +
 common/thrift/ImpalaInternalService.thrift         |  10 +-
 .../apache/impala/analysis/CreateTableStmt.java    |  42 ++++--
 .../org/apache/impala/analysis/ToSqlUtils.java     |  11 +-
 .../org/apache/impala/catalog/FeIcebergTable.java  |  52 ++++++-
 .../org/apache/impala/catalog/IcebergTable.java    |  32 ++++-
 .../impala/catalog/local/LocalIcebergTable.java    |  28 +++-
 .../org/apache/impala/planner/IcebergScanNode.java |   4 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  41 ++++--
 .../impala/service/IcebergCatalogOpExecutor.java   |  43 +++++-
 .../java/org/apache/impala/util/IcebergUtil.java   | 125 +++++++++++++++--
 .../common/etc/hadoop/conf/yarn-site.xml.py        |   3 +-
 ...2da0-b562-4310-9001-06f9b6b0f9ae-00000.parquet} | Bin 1162 -> 1162 bytes
 ...aefa-65fc-4698-8f26-b155fc965cf6-00000.parquet} | Bin 1162 -> 1162 bytes
 ...b016-05e1-43fc-b4a0-0e0df52a5035-00000.parquet} | Bin 1162 -> 1162 bytes
 ...92523-c3b9-401d-b429-363c245dbe9c-00000.parquet | Bin 0 -> 1161 bytes
 ...70cf-10a1-4e49-86dc-b094fe739aa6-00000.parquet} | Bin 1162 -> 1162 bytes
 ...f86fa-286f-4cd3-8337-98685c48176d-00000.parquet | Bin 0 -> 1161 bytes
 ...2bbc-46a2-4040-a4a8-7488447de3b6-00000.parquet} | Bin 1162 -> 1162 bytes
 ...a250-ed1c-4868-bbf1-f2aad65fa80c-00000.parquet} | Bin 1162 -> 1162 bytes
 ...7823-ded1-4a12-9e03-4027cd43966a-00000.parquet} | Bin 1169 -> 1169 bytes
 ...d7a4-245f-44d5-8a59-ed511854c8f8-00000.parquet} | Bin 1169 -> 1169 bytes
 ...5490-91f7-47bd-a3b6-e86caa7fe47d-00000.parquet} | Bin 1169 -> 1169 bytes
 ...5fcf-4346-421f-b2ef-1f9d55fb4c84-00000.parquet} | Bin 1169 -> 1169 bytes
 ...64ed-7a99-4f43-ada7-225c92f6a993-00000.parquet} | Bin 1169 -> 1169 bytes
 ...c862-3d63-42cb-8041-0a0b14b8ca13-00000.parquet} | Bin 1169 -> 1169 bytes
 ...8e68-c862-4248-b3e5-84228a3ec39d-00000.parquet} | Bin 1190 -> 1190 bytes
 ...1dc0-b7eb-424d-9edb-dd2cedc59784-00000.parquet} | Bin 1190 -> 1190 bytes
 ...e5f3-cfa7-4190-bb30-0db1d53202fd-00000.parquet} | Bin 1190 -> 1190 bytes
 ...52b1-dc5b-4417-81b7-8e9fd992280b-00000.parquet} | Bin 1190 -> 1190 bytes
 ...ba9a-9387-4c38-bab8-a0598c400fde-00000.parquet} | Bin 1190 -> 1190 bytes
 ...83a3-b39f-4273-984b-cf7faf39dd9d-00000.parquet} | Bin 1190 -> 1190 bytes
 .../2c2fa00b-eb20-460a-835b-d69b32560e21-m0.avro}  | Bin 5579 -> 5599 bytes
 ...65-1-2c2fa00b-eb20-460a-835b-d69b32560e21.avro} | Bin 2563 -> 2588 bytes
 .../hadoop_catalog_test}/metadata/v1.metadata.json |   6 +-
 .../hadoop_catalog_test}/metadata/v2.metadata.json |  24 ++--
 .../metadata/version-hint.text                     |   0
 .../functional/functional_schema_template.sql      |  18 ++-
 .../queries/QueryTest/iceberg-create.test          | 155 ++++++++++++++++++++-
 .../queries/QueryTest/iceberg-negative.test        |  45 +++++-
 .../queries/QueryTest/iceberg-query.test           | 123 ++++++++++++++++
 .../queries/QueryTest/show-create-table.test       |  54 ++++++-
 tests/custom_cluster/test_blacklist.py             |   8 +-
 tests/custom_cluster/test_process_failures.py      |   2 +-
 tests/custom_cluster/test_query_retries.py         |   2 +-
 tests/custom_cluster/test_restart_services.py      |  11 +-
 tests/webserver/test_web_pages.py                  |   4 +-
 85 files changed, 868 insertions(+), 434 deletions(-)
 delete mode 100644 be/src/runtime/backend-client.h
 delete mode 100644 be/src/service/impala-internal-service.cc
 delete mode 100644 be/src/service/impala-internal-service.h
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00001-1-5dbd44ad-18bc-40f2-9dd6-aeb2cc23457c-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00001-1-bc402da0-b562-4310-9001-06f9b6b0f9ae-00000.parquet} (71%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00006-6-f75530ef-93b6-4994-b3c8-db957d44848c-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00006-6-d253aefa-65fc-4698-8f26-b155fc965cf6-00000.parquet} (71%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00009-9-f029a1f7-9024-4bc3-a030-e20861586146-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00009-9-5d04b016-05e1-43fc-b4a0-0e0df52a5035-00000.parquet} (71%)
 create mode 100644 testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00017-17-20b92523-c3b9-401d-b429-363c245dbe9c-00000.parquet
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00023-23-8cfc9d65-bfc3-47c4-8da7-6610b49b3305-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00023-23-c86370cf-10a1-4e49-86dc-b094fe739aa6-00000.parquet} (71%)
 create mode 100644 testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00027-27-f32f86fa-286f-4cd3-8337-98685c48176d-00000.parquet
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00030-30-e887707d-58db-469c-ab96-b77188f25189-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00030-30-b18d2bbc-46a2-4040-a4a8-7488447de3b6-00000.parquet} (71%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00031-31-facf6a62-c326-44f7-bdc3-b1471bd594d7-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00031-31-c9bda250-ed1c-4868-bbf1-f2aad65fa80c-00000.parquet} (71%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00004-4-f1b55628-0544-4833-8b11-1b4add53dfd6-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00004-4-0ed77823-ded1-4a12-9e03-4027cd43966a-00000.parquet} (71%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00014-14-d0cdca7f-c050-407e-b70c-2bd076f83e4e-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00014-14-f698d7a4-245f-44d5-8a59-ed511854c8f8-00000.parquet} (71%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00015-15-0e931a1f-309e-43b3-a5cf-3ef82fa4a87c-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00015-15-7c1d5490-91f7-47bd-a3b6-e86caa7fe47d-00000.parquet} (74%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00014-14-d0cdca7f-c050-407e-b70c-2bd076f83e4e-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00019-19-d2ef5fcf-4346-421f-b2ef-1f9d55fb4c84-00000.parquet} (66%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00020-20-f160c1ea-a2f5-4109-81ec-3ff9c155430f-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00020-20-a70c64ed-7a99-4f43-ada7-225c92f6a993-00000.parquet} (71%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00028-28-70f51a0c-3d30-4cdb-afcb-e8c1cfa3caa4-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00028-28-bb02c862-3d63-42cb-8041-0a0b14b8ca13-00000.parquet} (71%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00003-3-27db2521-1e8b-40c1-b846-552cd620abce-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00003-3-d5288e68-c862-4248-b3e5-84228a3ec39d-00000.parquet} (72%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00007-7-8d9b22da-5f10-4cbf-8e4d-160f829b5e48-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00007-7-92031dc0-b7eb-424d-9edb-dd2cedc59784-00000.parquet} (72%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00007-7-8d9b22da-5f10-4cbf-8e4d-160f829b5e48-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00011-11-9361e5f3-cfa7-4190-bb30-0db1d53202fd-00000.parquet} (66%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00012-12-967c70a4-bf4d-4a82-8c97-c90e2b4d9dcf-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00012-12-e82b52b1-dc5b-4417-81b7-8e9fd992280b-00000.parquet} (72%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00022-22-c1f61b8c-9d9a-4823-b64e-109770c16696-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00022-22-c646ba9a-9387-4c38-bab8-a0598c400fde-00000.parquet} (72%)
 copy testdata/data/iceberg_test/{iceberg_non_partitioned/data/00025-25-d519b09d-c730-40a6-ad03-8b36096cb234-00000.parquet => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00025-25-7f8283a3-b39f-4273-984b-cf7faf39dd9d-00000.parquet} (72%)
 copy testdata/data/iceberg_test/{iceberg_partitioned/metadata/af797bab-2f2c-44df-a77b-d91c7198fe53-m0.avro => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/2c2fa00b-eb20-460a-835b-d69b32560e21-m0.avro} (78%)
 copy testdata/data/iceberg_test/{iceberg_partitioned/metadata/snap-8270633197658268308-1-af797bab-2f2c-44df-a77b-d91c7198fe53.avro => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/snap-7412008513947276465-1-2c2fa00b-eb20-460a-835b-d69b32560e21.avro} (89%)
 copy testdata/data/iceberg_test/{iceberg_partitioned => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test}/metadata/v1.metadata.json (84%)
 copy testdata/data/iceberg_test/{iceberg_partitioned => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test}/metadata/v2.metadata.json (62%)
 copy testdata/data/iceberg_test/{iceberg_partitioned => hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test}/metadata/version-hint.text (100%)


[impala] 02/03: IMPALA-10193: Limit the memory usage for the whole test cluster

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a0a25a61c302d864315daa7f09827b37a37419d5
Author: fifteencai <fi...@tencent.com>
AuthorDate: Wed Sep 30 13:03:08 2020 +0800

    IMPALA-10193: Limit the memory usage for the whole test cluster
    
    This patch introduces a new approach of limiting the memory usage
    for both mini-cluster and CDH cluster.
    
    Without this limit, clusters are prone to getting killed when running
    in docker containers with a lower mem limit than host's memory size.
    i.e. The mini-cluster may running in a
    container with 32GB limitted by CGROUPS, while the host machine has
    128GB. Under this circumstance, if the container is started with
    '-privileged' command argument, both mini and CDH clusters compute
    their mem_limit according to 128GB rather than 32GB. They will be
    killed when attempting to apply for extra resource.
    
    Currently, the mem-limit estimating algorithms for Impalad and Node
    Manager are different:
    
    for Impalad:  mem_limit = 0.7 * sys_mem / cluster_size (default is 3)
    
    for Node Manager:
            1. Leave aside 24GB, then fit the left into threasholds below.
            2. The bare limit is 4GB and maximum limit 48GB
    
    In headge of over-consumption, we
    
    - Added a new environment variable IMPALA_CLUSTER_MAX_MEM_GB
    - Modified the algorithm in 'bin/start-impala-cluster.py', making it
      taking IMPALA_CLUSTER_MAX_MEM_GB rather than sys_mem into account.
    - Modified the logic in
     'testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py'
      Similarly, making IMPALA_CLUSTER_MAX_MEM_GB substitutes for sys_mem .
    
    Testing: this patch worked in a 32GB docker container running on a 128GB
             host machine. All 1188 unit tests get passed.
    
    Change-Id: I8537fd748e279d5a0e689872aeb4dbfd0c84dc93
    Reviewed-on: http://gerrit.cloudera.org:8080/16522
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/impala-config.sh                                                | 3 +++
 bin/start-impala-cluster.py                                         | 6 ++++--
 .../cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py  | 3 ++-
 3 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index e0998c1..5d9b8a6 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -112,6 +112,9 @@ unset IMPALA_LLVM_URL
 export IMPALA_LLVM_ASAN_VERSION=5.0.1-p3
 unset IMPALA_LLVM_ASAN_URL
 
+# Maximum memory available for mini-cluster and CDH cluster
+export IMPALA_CLUSTER_MAX_MEM_GB
+
 # LLVM stores some files in subdirectories that are named after what
 # version it thinks it is. We might think it is 5.0.1-p1, based on a
 # patch we have applied, but LLVM thinks its version is 5.0.1.
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index c708ce4..452700c 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -430,7 +430,7 @@ def build_kerberos_args(daemon):
 
 def compute_impalad_mem_limit(cluster_size):
   # Set mem_limit of each impalad to the smaller of 12GB or
-  # 1/cluster_size (typically 1/3) of 70% of system memory.
+  # 1/cluster_size (typically 1/3) of 70% of available memory.
   #
   # The default memory limit for an impalad is 80% of the total system memory. On a
   # mini-cluster with 3 impalads that means 240%. Since having an impalad be OOM killed
@@ -442,7 +442,9 @@ def compute_impalad_mem_limit(cluster_size):
   # memory choice here to max out at 12GB. This should be sufficient for tests.
   #
   # Beware that ASAN builds use more memory than regular builds.
-  mem_limit = int(0.7 * psutil.virtual_memory().total / cluster_size)
+  physical_mem_gb = psutil.virtual_memory().total / 1024 / 1024 / 1024
+  available_mem = int(os.getenv("IMPALA_CLUSTER_MAX_MEM_GB", str(physical_mem_gb)))
+  mem_limit = int(0.7 * available_mem * 1024 * 1024 * 1024 / cluster_size)
   return min(12 * 1024 * 1024 * 1024, mem_limit)
 
 class MiniClusterOperations(object):
diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py b/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py
index 0987925..b286da4 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py
@@ -33,11 +33,12 @@ def _get_system_ram_mb():
 
 def _get_yarn_nm_ram_mb():
   sys_ram = _get_system_ram_mb()
+  available_ram_gb = int(os.getenv("IMPALA_CLUSTER_MAX_MEM_GB", str(sys_ram / 1024)))
   # Fit into the following envelope:
   # - need 4GB at a bare minimum
   # - leave at least 24G for other services
   # - don't need more than 48G
-  ret = min(max(sys_ram - 24 * 1024, 4096), 48 * 1024)
+  ret = min(max(available_ram_gb * 1024 - 24 * 1024, 4096), 48 * 1024)
   print >>sys.stderr, "Configuring Yarn NM to use {0}MB RAM".format(ret)
   return ret
 


[impala] 01/03: IMPALA-9180 (part 1): Remove legacy ImpalaInternalService

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6bb3b88d05f89fb7a1a54f302b4d329cbf4f69ec
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Tue Aug 4 17:03:46 2020 -0700

    IMPALA-9180 (part 1): Remove legacy ImpalaInternalService
    
    The legacy Thrift based Impala internal service has been deprecated
    and can be removed now.
    
    This patch removes ImpalaInternalService. All infrastructures around it
    are cleaned up, except one place for flag be_port.
    StatestoreSubscriber::subscriber_id consists be_port, but we cannot
    change format of subscriber_id now. This remaining be_port issue will be
    fixed in a succeeding patch (part 4).
    TQueryCtx.coord_address is changed to TQueryCtx.coord_hostname since the
    port in TQueryCtx.coord_address is set as be_port and is unused now.
    Also Rename TQueryCtx.coord_krpc_address as TQueryCtx.coord_ip_address.
    
    Testing:
     - Passed the exhaustive test.
     - Passed Quasar-L0 test.
    
    Change-Id: I5fa83c8009590124dded4783f77ef70fa30119e6
    Reviewed-on: http://gerrit.cloudera.org:8080/16291
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/generated-sources/gen-cpp/CMakeLists.txt   |  1 -
 be/src/benchmarks/expr-benchmark.cc           |  7 ++-
 be/src/exprs/expr-test.cc                     |  4 +-
 be/src/exprs/utility-functions-ir.cc          |  4 +-
 be/src/rpc/impala-service-pool.cc             | 16 +++++--
 be/src/rpc/impala-service-pool.h              | 10 ++++
 be/src/rpc/rpc-mgr.cc                         |  7 +++
 be/src/rpc/rpc-mgr.h                          |  3 ++
 be/src/rpc/thrift-server-test.cc              | 26 -----------
 be/src/runtime/backend-client.h               | 46 -------------------
 be/src/runtime/client-cache-types.h           |  8 ----
 be/src/runtime/coordinator-backend-state.cc   |  1 -
 be/src/runtime/data-stream-test.cc            |  1 -
 be/src/runtime/exec-env.cc                    | 30 +++++-------
 be/src/runtime/exec-env.h                     | 19 ++++----
 be/src/runtime/fragment-instance-state.cc     |  1 -
 be/src/runtime/fragment-instance-state.h      |  1 -
 be/src/runtime/initial-reservations.cc        |  4 +-
 be/src/runtime/query-exec-mgr.cc              |  3 +-
 be/src/runtime/query-state.cc                 |  8 ++--
 be/src/runtime/runtime-filter-bank.cc         | 12 ++---
 be/src/runtime/test-env.cc                    |  4 +-
 be/src/scheduling/scheduler-test-util.h       |  1 -
 be/src/service/CMakeLists.txt                 |  1 -
 be/src/service/client-request-state.cc        |  3 +-
 be/src/service/control-service.cc             |  3 +-
 be/src/service/impala-internal-service.cc     | 46 -------------------
 be/src/service/impala-internal-service.h      | 40 ----------------
 be/src/service/impala-server.cc               | 66 ++++++---------------------
 be/src/service/impala-server.h                | 17 ++-----
 be/src/service/impalad-main.cc                |  6 +--
 be/src/service/session-expiry-test.cc         |  1 -
 be/src/testutil/in-process-servers.cc         | 32 +++++--------
 be/src/testutil/in-process-servers.h          |  7 ++-
 be/src/util/debug-util.cc                     |  4 +-
 bin/generate_minidump_collection_testdata.py  |  1 -
 common/thrift/ImpalaInternalService.thrift    | 10 ++--
 tests/custom_cluster/test_blacklist.py        |  8 ++--
 tests/custom_cluster/test_process_failures.py |  2 +-
 tests/custom_cluster/test_query_retries.py    |  2 +-
 tests/custom_cluster/test_restart_services.py | 11 ++---
 tests/webserver/test_web_pages.py             |  4 +-
 42 files changed, 130 insertions(+), 351 deletions(-)

diff --git a/be/generated-sources/gen-cpp/CMakeLists.txt b/be/generated-sources/gen-cpp/CMakeLists.txt
index 56093f4..271dcb7 100644
--- a/be/generated-sources/gen-cpp/CMakeLists.txt
+++ b/be/generated-sources/gen-cpp/CMakeLists.txt
@@ -30,7 +30,6 @@ set(SRC_FILES
   CatalogService_types.cpp
   CatalogInternalService_constants.cpp
   CatalogInternalService_types.cpp
-  ImpalaInternalService.cpp
   ImpalaInternalService_constants.cpp
   ImpalaInternalService_types.cpp
   ImpalaService.cpp
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index be42114..689295f 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -40,10 +40,8 @@
 #include "gen-cpp/Types_types.h"
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/ImpalaService_types.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/ImpalaService.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/Frontend_types.h"
 #include "rpc/thrift-server.h"
 #include "codegen/llvm-codegen.h"
@@ -81,8 +79,9 @@ class Planner {
     query_ctx.client_request.stmt = stmt;
     query_ctx.client_request.query_options = query_options_;
     query_ctx.__set_session(session_state_);
-    TNetworkAddress dummy;
-    ImpalaServer::PrepareQueryContext(dummy, dummy, &query_ctx);
+    string dummy_hostname = "";
+    TNetworkAddress dummy_addr;
+    ImpalaServer::PrepareQueryContext(dummy_hostname, dummy_addr, &query_ctx);
     runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_));
     TPlanFragment* fragment = runtime_state_->obj_pool()->Add(new TPlanFragment());
     PlanFragmentCtxPB* fragment_ctx =
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 27fdee8..72e7aac 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -5430,9 +5430,9 @@ TEST_P(ExprTest, UtilityFunctions) {
   TestIsNull("fnv_hash(NULL)", TYPE_BIGINT);
 }
 
-// Test that UtilityFunctions::Coordinator() will return null if coord_address is unset
+// Test that UtilityFunctions::Coordinator() will return null if coord_hostname is unset
 TEST_P(ExprTest, CoordinatorFunction) {
-  // Make a RuntimeState where the query context does not have coord_address set.
+  // Make a RuntimeState where the query context does not have coord_hostname set.
   // Note that this should never happen in a real impalad.
   RuntimeState state(TQueryCtx(), ExecEnv::GetInstance());
   MemTracker tracker;
diff --git a/be/src/exprs/utility-functions-ir.cc b/be/src/exprs/utility-functions-ir.cc
index 0006ff2..bfc143d 100644
--- a/be/src/exprs/utility-functions-ir.cc
+++ b/be/src/exprs/utility-functions-ir.cc
@@ -178,8 +178,8 @@ StringVal UtilityFunctions::CurrentSession(FunctionContext* ctx) {
 StringVal UtilityFunctions::Coordinator(FunctionContext* ctx) {
   const TQueryCtx& query_ctx = ctx->impl()->state()->query_ctx();
   // An empty string indicates the coordinator was not set in the query request.
-  return query_ctx.__isset.coord_address ?
-      AnyValUtil::FromString(ctx, query_ctx.coord_address.hostname) :
+  return query_ctx.__isset.coord_hostname ?
+      AnyValUtil::FromString(ctx, query_ctx.coord_hostname) :
       StringVal::null();
 }
 
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index 5a8b275..29125a8 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -96,16 +96,24 @@ Status ImpalaServicePool::Init(int num_threads) {
   return Status::OK();
 }
 
+void ImpalaServicePool::Join() {
+  VLOG_QUERY << "join Impala Service pool\n";
+  std::lock_guard<std::mutex> l(close_lock_);
+  if (is_joined_) return;
+  // TODO (from KRPC): Use a proper thread pool implementation.
+  for (std::unique_ptr<Thread>& thread : threads_) {
+    thread->Join();
+  }
+  is_joined_ = true;
+}
+
 void ImpalaServicePool::Shutdown() {
   service_queue_.Shutdown();
 
   lock_guard<mutex> lock(shutdown_lock_);
   if (closing_) return;
   closing_ = true;
-  // TODO (from KRPC): Use a proper thread pool implementation.
-  for (std::unique_ptr<Thread>& thread : threads_) {
-    thread->Join();
-  }
+  Join();
 
   // Now we must drain the service queue.
   kudu::Status status = kudu::Status::ServiceUnavailable("Service is shutting down");
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index 9721757..e61a240 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -56,6 +56,9 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   /// Start up the thread pool.
   Status Init(int num_threads);
 
+  /// Wait until all working threads complete execution.
+  void Join();
+
   /// Shut down the queue and the thread pool.
   void Shutdown();
 
@@ -113,6 +116,13 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   std::mutex shutdown_lock_;
   bool closing_ = false;
 
+  /// Protects is_closed_.
+  std::mutex close_lock_;
+
+  /// Set as true when all working threads complete execution.
+  /// Protected by 'close_lock_'.
+  bool is_joined_ = false;
+
   /// The address this service is running on.
   const std::string hostname_;
   const std::string port_;
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index 3e112a5..689121c 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -220,6 +220,13 @@ Status RpcMgr::StartServices() {
   return Status::OK();
 }
 
+void RpcMgr::Join() {
+  if (services_started_) {
+    if (messenger_.get() == nullptr) return;
+    for (auto service_pool : service_pools_) service_pool->Join();
+  }
+}
+
 void RpcMgr::Shutdown() {
   if (messenger_.get() == nullptr) return;
   for (auto service_pool : service_pools_) service_pool->Shutdown();
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index a141dba..113d1c4 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -151,6 +151,9 @@ class RpcMgr {
   Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
       std::unique_ptr<P>* proxy) WARN_UNUSED_RESULT;
 
+  /// Wait until all reactor threads complete execution.
+  void Join();
+
   /// Shut down all previously registered services. All service pools are shut down.
   /// All acceptor and reactor threads within the messenger are also shut down.
   /// All unprocessed incoming requests will be replied with error messages.
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index ff89372..4e30916 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -44,7 +44,6 @@ DECLARE_string(ssl_minimum_version);
 
 DECLARE_int32(state_store_port);
 
-DECLARE_int32(be_port);
 DECLARE_int32(beeswax_port);
 
 static string IMPALA_HOME(getenv("IMPALA_HOME"));
@@ -510,31 +509,6 @@ TEST(ConcurrencyTest, MaxConcurrentConnections) {
   EXPECT_TRUE(did_reach_max);
 }
 
-/// Test disabled because requires a high ulimit -n on build machines. Since the test does
-/// not always fail, we don't lose much coverage by disabling it until we fix the build
-/// infra issue.
-TEST(ConcurrencyTest, DISABLED_ManyConcurrentConnections) {
-  // Test that a large number of concurrent connections will all succeed and not time out
-  // waiting to be accepted. (IMPALA-4135)
-  // Note that without the fix for IMPALA-4135, this test won't always fail, depending on
-  // the hardware that it is run on.
-  int port = GetServerPort();
-  ThriftServer* server;
-  EXPECT_OK(ThriftServerBuilder("DummyServer", MakeProcessor(), port).Build(&server));
-  ASSERT_OK(server->Start());
-
-  ThreadPool<int64_t> pool(
-      "group", "test", 256, 10000, [port](int tid, const int64_t& item) {
-        using Client = ThriftClient<ImpalaInternalServiceClient>;
-        Client* client = new Client("127.0.0.1", port, "", nullptr, false);
-        Status status = client->Open();
-        ASSERT_OK(status);
-      });
-  ASSERT_OK(pool.Init());
-  for (int i = 0; i < 1024 * 16; ++i) pool.Offer(i);
-  pool.DrainAndShutdown();
-}
-
 TEST(NoPasswordPemFile, BadServerCertificate) {
   int port = GetServerPort();
   ThriftServer* server;
diff --git a/be/src/runtime/backend-client.h b/be/src/runtime/backend-client.h
deleted file mode 100644
index 656bbc3..0000000
--- a/be/src/runtime/backend-client.h
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_BACKEND_CLIENT_H
-#define IMPALA_BACKEND_CLIENT_H
-
-#include "runtime/client-cache.h"
-#include "util/runtime-profile-counters.h"
-
-#include "gen-cpp/ImpalaInternalService.h"
-
-namespace impala {
-
-/// Proxy class that extends ImpalaInternalServiceClient to allow callers to time
-/// the wall-clock time taken in TransmitData(), so that the time spent sending data
-/// between backends in a query can be measured.
-class ImpalaBackendClient : public ImpalaInternalServiceClient {
- public:
-  ImpalaBackendClient(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot)
-    : ImpalaInternalServiceClient(prot) {
-  }
-
-  ImpalaBackendClient(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot,
-      boost::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot)
-    : ImpalaInternalServiceClient(iprot, oprot) {
-  }
-
-};
-
-}
-
-#endif // IMPALA_BACKEND_CLIENT_H
diff --git a/be/src/runtime/client-cache-types.h b/be/src/runtime/client-cache-types.h
index df9a0e7..ed528a5 100644
--- a/be/src/runtime/client-cache-types.h
+++ b/be/src/runtime/client-cache-types.h
@@ -29,17 +29,9 @@ template<class T>
 class ClientConnection;
 
 /// Common cache / connection types
-class ImpalaInternalServiceClient;
-typedef ClientCache<ImpalaInternalServiceClient> ImpalaInternalServiceClientCache;
-typedef ClientConnection<ImpalaInternalServiceClient> ImpalaInternalServiceConnection;
-
 class CatalogServiceClientWrapper;
 typedef ClientCache<CatalogServiceClientWrapper> CatalogServiceClientCache;
 typedef ClientConnection<CatalogServiceClientWrapper> CatalogServiceConnection;
-
-class ImpalaBackendClient;
-typedef ClientCache<ImpalaBackendClient> ImpalaBackendClientCache;
-typedef ClientConnection<ImpalaBackendClient> ImpalaBackendConnection;
 }
 
 #endif
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 1b969a1..440a5f0 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -31,7 +31,6 @@
 #include "kudu/util/status.h"
 #include "rpc/rpc-mgr.inline.h"
 #include "rpc/sidecar-util.h"
-#include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator-filter-state.h"
 #include "runtime/debug-options.h"
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 57fa713..06b9dd6 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -38,7 +38,6 @@
 #include "runtime/krpc-data-stream-sender.h"
 #include "runtime/descriptors.h"
 #include "runtime/client-cache.h"
-#include "runtime/backend-client.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
 #include "service/data-stream-service.h"
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 9c1e15b..3645504 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -30,7 +30,6 @@
 #include "exec/kudu-util.h"
 #include "kudu/rpc/service_if.h"
 #include "rpc/rpc-mgr.h"
-#include "runtime/backend-client.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/client-cache.h"
@@ -228,23 +227,20 @@ struct ExecEnv::KuduClientPtr {
 ExecEnv* ExecEnv::exec_env_ = nullptr;
 
 ExecEnv::ExecEnv()
-  : ExecEnv(FLAGS_be_port, FLAGS_krpc_port,
-        FLAGS_state_store_subscriber_port, FLAGS_webserver_port,
+  : ExecEnv(FLAGS_krpc_port, FLAGS_state_store_subscriber_port, FLAGS_webserver_port,
         FLAGS_state_store_host, FLAGS_state_store_port) {}
 
-ExecEnv::ExecEnv(int backend_port, int krpc_port,
-    int subscriber_port, int webserver_port, const string& statestore_host,
-    int statestore_port)
+ExecEnv::ExecEnv(int krpc_port, int subscriber_port, int webserver_port,
+    const string& statestore_host, int statestore_port)
   : obj_pool_(new ObjectPool),
     metrics_(new MetricGroup("impala-metrics")),
     // Create the CatalogServiceClientCache with num_retries = 1 and wait_ms = 0.
     // Connections are still retried, but the retry mechanism is driven by
     // DoRpcWithRetry. Clients should always use DoRpcWithRetry rather than DoRpc to
     // ensure that both RPCs and connections are retried.
-    catalogd_client_cache_(
-        new CatalogServiceClientCache(1, 0,
-            FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "",
-            !FLAGS_ssl_client_ca_certificate.empty())),
+    catalogd_client_cache_(new CatalogServiceClientCache(1, 0,
+        FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "",
+        !FLAGS_ssl_client_ca_certificate.empty())),
     htable_factory_(new HBaseTableFactory()),
     disk_io_mgr_(new io::DiskIoMgr()),
     webserver_(new Webserver(FLAGS_webserver_interface, webserver_port, metrics_.get())),
@@ -256,7 +252,7 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
     query_exec_mgr_(new QueryExecMgr()),
     rpc_metrics_(metrics_->GetOrCreateChildGroup("rpc")),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
-    configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, backend_port)) {
+    configured_backend_address_(MakeNetworkAddress(FLAGS_hostname, krpc_port)) {
   UUIDToUniqueIdPB(boost::uuids::random_generator()(), &backend_id_);
 
   // Resolve hostname to IP address.
@@ -275,9 +271,10 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
   TNetworkAddress statestore_address =
       MakeNetworkAddress(statestore_host, statestore_port);
 
-  statestore_subscriber_.reset(new StatestoreSubscriber(
-      Substitute("impalad@$0", TNetworkAddressToString(configured_backend_address_)),
-      subscriber_address, statestore_address, metrics_.get()));
+  // Set StatestoreSubscriber::subscriber_id as hostname + be_port.
+  statestore_subscriber_.reset(
+      new StatestoreSubscriber(Substitute("impalad@$0:$1", FLAGS_hostname, FLAGS_be_port),
+          subscriber_address, statestore_address, metrics_.get()));
 
   if (FLAGS_is_coordinator) {
     hdfs_op_thread_pool_.reset(
@@ -650,11 +647,6 @@ void ExecEnv::InitSystemStateInfo() {
   });
 }
 
-TNetworkAddress ExecEnv::GetThriftBackendAddress() const {
-  DCHECK(impala_server_ != nullptr);
-  return impala_server_->GetThriftBackendAddress();
-}
-
 Status ExecEnv::GetKuduClient(
     const vector<string>& master_addresses, kudu::client::KuduClient** client) {
   string master_addr_concat = join(master_addresses, ",");
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index d0f9125..0d40ac0 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -82,9 +82,8 @@ class ExecEnv {
  public:
   ExecEnv();
 
-  ExecEnv(int backend_port, int krpc_port,
-      int subscriber_port, int webserver_port, const std::string& statestore_host,
-      int statestore_port);
+  ExecEnv(int krpc_port, int subscriber_port, int webserver_port,
+      const std::string& statestore_host, int statestore_port);
 
   /// Returns the most recently created exec env instance. In a normal impalad, this is
   /// the only instance. In test setups with multiple ExecEnv's per process,
@@ -110,10 +109,6 @@ class ExecEnv {
   /// once.
   void SetImpalaServer(ImpalaServer* server);
 
-  /// Get the address of the thrift backend service. Only valid to call if
-  /// StartServices() was successful.
-  TNetworkAddress GetThriftBackendAddress() const;
-
   const BackendIdPB& backend_id() const { return backend_id_; }
 
   KrpcDataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
@@ -149,6 +144,10 @@ class ExecEnv {
   AdmissionController* admission_controller() { return admission_controller_.get(); }
   StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
 
+  const TNetworkAddress& configured_backend_address() const {
+    return configured_backend_address_;
+  }
+
   const IpAddr& ip_address() const { return ip_address_; }
 
   const TNetworkAddress& krpc_address() const { return krpc_address_; }
@@ -230,14 +229,14 @@ class ExecEnv {
   static ExecEnv* exec_env_;
   bool is_fe_tests_ = false;
 
-  /// Address of the thrift based ImpalaInternalService. In backend tests we allow
-  /// wildcard port 0, so this may not be the actual backend address.
+  /// The network address that the backend KRPC service is listening on:
+  /// hostname + krpc_port.
   TNetworkAddress configured_backend_address_;
 
   /// Resolved IP address of the host name.
   IpAddr ip_address_;
 
-  /// Address of the KRPC-based ImpalaInternalService
+  /// IP address of the KRPC backend service: ip_address + krpc_port.
   TNetworkAddress krpc_address_;
 
   /// fs.defaultFs value set in core-site.xml
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 6c566a6..5b865f3 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -35,7 +35,6 @@
 #include "exec/scan-node.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "kudu/rpc/rpc_context.h"
-#include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-state.h"
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index ac8a73e..35f94ba 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -140,7 +140,6 @@ class FragmentInstanceState {
   const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; }
   FInstanceExecStatePB current_state() const { return current_state_.Load(); }
   bool final_report_sent() const { return final_report_sent_; }
-  const TNetworkAddress& coord_address() const { return query_ctx().coord_address; }
   bool IsDone() const { return current_state_.Load() == FInstanceExecStatePB::FINISHED; }
   ObjectPool* obj_pool();
   int64_t scan_ranges_complete() const { return scan_ranges_complete_; }
diff --git a/be/src/runtime/initial-reservations.cc b/be/src/runtime/initial-reservations.cc
index ed2d1f2..262415a 100644
--- a/be/src/runtime/initial-reservations.cc
+++ b/be/src/runtime/initial-reservations.cc
@@ -33,7 +33,7 @@
 
 using std::numeric_limits;
 
-DECLARE_int32(be_port);
+DECLARE_int32(krpc_port);
 DECLARE_string(hostname);
 
 namespace impala {
@@ -59,7 +59,7 @@ Status InitialReservations::Init(
           query_min_reservation, &reservation_status)) {
     return Status(TErrorCode::MINIMUM_RESERVATION_UNAVAILABLE,
         PrettyPrinter::Print(query_min_reservation, TUnit::BYTES), FLAGS_hostname,
-        FLAGS_be_port, PrintId(query_id), reservation_status.GetDetail());
+        FLAGS_krpc_port, PrintId(query_id), reservation_status.GetDetail());
   }
   VLOG(2) << "Successfully claimed initial reservations ("
           << PrettyPrinter::Print(query_min_reservation, TUnit::BYTES) << ") for"
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index d12f47b..8c5e978 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -73,7 +73,8 @@ Status QueryExecMgr::StartQuery(const ExecQueryFInstancesRequestPB* request,
     const TQueryCtx& query_ctx, const TExecPlanFragmentInfo& fragment_info) {
   TUniqueId query_id = query_ctx.query_id;
   VLOG(2) << "StartQueryFInstances() query_id=" << PrintId(query_id)
-          << " coord=" << TNetworkAddressToString(query_ctx.coord_address);
+          << " coord=" << query_ctx.coord_hostname << ":"
+          << query_ctx.coord_ip_address.port;
   bool dummy;
   QueryState* qs =
       GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index a6ded7e..c9768d3 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -29,13 +29,13 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "rpc/rpc-mgr.h"
-#include "runtime/backend-client.h"
+#include "rpc/thrift-util.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/bufferpool/reservation-util.h"
 #include "runtime/exec-env.h"
-#include "runtime/fragment-state.h"
 #include "runtime/fragment-instance-state.h"
+#include "runtime/fragment-state.h"
 #include "runtime/initial-reservations.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
@@ -268,8 +268,8 @@ Status QueryState::Init(const ExecQueryFInstancesRequestPB* exec_rpc_params,
   RETURN_IF_ERROR(InitBufferPoolState());
 
   // Initialize the RPC proxy once and report any error.
-  RETURN_IF_ERROR(ControlService::GetProxy(query_ctx().coord_krpc_address,
-      query_ctx().coord_address.hostname, &proxy_));
+  RETURN_IF_ERROR(ControlService::GetProxy(
+      query_ctx().coord_ip_address, query_ctx().coord_hostname, &proxy_));
 
   // don't copy query_ctx, it's large and we already did that in the c'tor
   exec_rpc_params_.set_coord_state_idx(exec_rpc_params->coord_state_idx());
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 8ed1e38..994715f 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -27,7 +27,6 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_sidecar.h"
-#include "runtime/backend-client.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/client-cache.h"
 #include "runtime/exec-env.h"
@@ -271,18 +270,17 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
       DCHECK_EQ(type, TRuntimeFilterType::MIN_MAX);
       min_max_filter->ToProtobuf(params.mutable_min_max_filter());
     }
-    const TNetworkAddress& krpc_address = query_state_->query_ctx().coord_krpc_address;
-    const TNetworkAddress& host_address = query_state_->query_ctx().coord_address;
+    const TNetworkAddress& krpc_address = query_state_->query_ctx().coord_ip_address;
+    const std::string& hostname = query_state_->query_ctx().coord_hostname;
 
     // Use 'proxy' to send the filter to the coordinator.
     unique_ptr<DataStreamServiceProxy> proxy;
-    Status get_proxy_status =
-        DataStreamService::GetProxy(krpc_address, host_address.hostname, &proxy);
+    Status get_proxy_status = DataStreamService::GetProxy(krpc_address, hostname, &proxy);
     if (!get_proxy_status.ok()) {
       // Failing to send a filter is not a query-wide error - the remote fragment will
       // continue regardless.
-      LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1",
-          host_address.hostname, get_proxy_status.msg().msg());
+      LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1", hostname,
+          get_proxy_status.msg().msg());
       return;
     }
     // Increment 'num_inflight_rpcs_' to make sure that the filter will not be deallocated
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 9b75002..a315144 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -146,8 +146,8 @@ Status TestEnv::CreateQueryState(
   query_ctx.query_id.hi = 0;
   query_ctx.query_id.lo = query_id;
   query_ctx.request_pool = "test-pool";
-  query_ctx.coord_address = exec_env_->configured_backend_address_;
-  query_ctx.coord_krpc_address = exec_env_->krpc_address_;
+  query_ctx.coord_hostname = exec_env_->configured_backend_address_.hostname;
+  query_ctx.coord_ip_address = exec_env_->krpc_address_;
   query_ctx.coord_backend_id.hi = 0;
   query_ctx.coord_backend_id.lo = 0;
   TQueryOptions* query_options_to_use = &query_ctx.client_request.query_options;
diff --git a/be/src/scheduling/scheduler-test-util.h b/be/src/scheduling/scheduler-test-util.h
index 9595b72..2b6cab0 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -25,7 +25,6 @@
 #include <boost/scoped_ptr.hpp>
 
 #include "common/status.h"
-#include "gen-cpp/ImpalaInternalService.h" // for TQueryOptions
 #include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/scheduler.h"
 #include "util/metrics.h"
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index 8f030ad..7b7dfef 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -38,7 +38,6 @@ add_library(Service
   impala-beeswax-server.cc
   impala-hs2-server.cc
   impala-http-handler.cc
-  impala-internal-service.cc
   impalad-main.cc
   impala-server.cc
   query-options.cc
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 931f865..aa8fa72 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -32,7 +32,6 @@
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "rpc/rpc-mgr.inline.h"
-#include "runtime/backend-client.h"
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
@@ -158,7 +157,7 @@ ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* fro
   summary_profile_->AddInfoStringRedacted(
       "Sql Statement", query_ctx_.client_request.stmt);
   summary_profile_->AddInfoString("Coordinator",
-      TNetworkAddressToString(ExecEnv::GetInstance()->GetThriftBackendAddress()));
+      TNetworkAddressToString(ExecEnv::GetInstance()->configured_backend_address()));
 
   summary_profile_->AddChild(frontend_profile_);
 
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index 34c3800..3db1055 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -141,7 +141,8 @@ void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* req
   ScopedThreadContext scoped_tdi(GetThreadDebugInfo(), query_ctx.query_id);
   VLOG_QUERY << "ExecQueryFInstances():"
              << " query_id=" << PrintId(query_ctx.query_id)
-             << " coord=" << TNetworkAddressToString(query_ctx.coord_address)
+             << " coord=" << query_ctx.coord_hostname << ":"
+             << query_ctx.coord_ip_address.port
              << " #instances=" << fragment_info.fragment_instance_ctxs.size();
   Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(
       request, query_ctx, fragment_info);
diff --git a/be/src/service/impala-internal-service.cc b/be/src/service/impala-internal-service.cc
deleted file mode 100644
index 1db4bba..0000000
--- a/be/src/service/impala-internal-service.cc
+++ /dev/null
@@ -1,46 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "service/impala-internal-service.h"
-
-#include <boost/lexical_cast.hpp>
-
-#include "common/status.h"
-#include "gutil/strings/substitute.h"
-#include "service/impala-server.h"
-#include "runtime/query-state.h"
-#include "runtime/fragment-instance-state.h"
-#include "runtime/exec-env.h"
-#include "util/debug-util.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-DECLARE_string(debug_actions);
-
-ImpalaInternalService::ImpalaInternalService() {
-  impala_server_ = ExecEnv::GetInstance()->impala_server();
-  DCHECK(impala_server_ != nullptr);
-}
-
-template <typename T> void SetUnknownIdError(
-    const string& id_type, const TUniqueId& id, T* status_container) {
-  Status status(ErrorMsg(TErrorCode::INTERNAL_ERROR,
-      Substitute("Unknown $0 id: $1", id_type, PrintId(id))));
-  status.SetTStatus(status_container);
-}
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
deleted file mode 100644
index 425678b..0000000
--- a/be/src/service/impala-internal-service.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H
-#define IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H
-
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-
-namespace impala {
-
-class ImpalaServer;
-
-/// Proxies Thrift RPC requests onto their implementing objects for the
-/// ImpalaInternalService service.
-class ImpalaInternalService : public ImpalaInternalServiceIf {
- public:
-  ImpalaInternalService();
-
- private:
-  ImpalaServer* impala_server_;
-};
-
-}
-
-#endif
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 3c75851..ad0c3fc 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -57,6 +57,7 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/util/random_util.h"
 #include "rpc/authentication.h"
+#include "rpc/rpc-mgr.h"
 #include "rpc/rpc-trace.h"
 #include "rpc/thrift-thread.h"
 #include "rpc/thrift-util.h"
@@ -64,16 +65,15 @@
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/lib-cache.h"
+#include "runtime/query-driver.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "runtime/tmp-file-mgr.h"
-#include "runtime/query-driver.h"
 #include "scheduling/admission-controller.h"
 #include "service/cancellation-work.h"
 #include "service/client-request-state.h"
 #include "service/frontend.h"
 #include "service/impala-http-handler.h"
-#include "service/impala-internal-service.h"
 #include "util/auth-util.h"
 #include "util/bit-util.h"
 #include "util/coding-util.h"
@@ -566,15 +566,6 @@ bool ImpalaServer::IsExecutor() { return is_executor_; }
 
 bool ImpalaServer::IsHealthy() { return services_started_.load(); }
 
-int ImpalaServer::GetThriftBackendPort() {
-  DCHECK(thrift_be_server_ != nullptr);
-  return thrift_be_server_->port();
-}
-
-TNetworkAddress ImpalaServer::GetThriftBackendAddress() {
-  return MakeNetworkAddress(FLAGS_hostname, GetThriftBackendPort());
-}
-
 int ImpalaServer::GetBeeswaxPort() {
   DCHECK(beeswax_server_ != nullptr);
   return beeswax_server_->port();
@@ -1145,12 +1136,12 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
 }
 
 void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
-  PrepareQueryContext(GetThriftBackendAddress(),
-      ExecEnv::GetInstance()->krpc_address(), query_ctx);
+  PrepareQueryContext(exec_env_->configured_backend_address().hostname,
+      exec_env_->krpc_address(), query_ctx);
 }
 
-void ImpalaServer::PrepareQueryContext(const TNetworkAddress& backend_addr,
-    const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx) {
+void ImpalaServer::PrepareQueryContext(
+    const std::string& hostname, const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx) {
   query_ctx->__set_pid(getpid());
   int64_t now_us = UnixMicros();
   const Timezone& utc_tz = TimezoneDatabase::GetUtcTimezone();
@@ -1177,8 +1168,8 @@ void ImpalaServer::PrepareQueryContext(const TNetworkAddress& backend_addr,
     query_ctx->__set_now_string(query_ctx->client_request.query_options.now_string);
   }
   query_ctx->__set_start_unix_millis(now_us / MICROS_PER_MILLI);
-  query_ctx->__set_coord_address(backend_addr);
-  query_ctx->__set_coord_krpc_address(krpc_addr);
+  query_ctx->__set_coord_hostname(hostname);
+  query_ctx->__set_coord_ip_address(krpc_addr);
   TUniqueId backend_id;
   UniqueIdPBToTUniqueId(ExecEnv::GetInstance()->backend_id(), &backend_id);
   query_ctx->__set_coord_backend_id(backend_id);
@@ -2131,7 +2122,8 @@ void ImpalaServer::BuildLocalBackendDescriptorInternal(BackendDescriptorPB* be_d
   bool is_quiescing = shutting_down_.Load() != 0;
 
   *be_desc->mutable_backend_id() = exec_env_->backend_id();
-  *be_desc->mutable_address() = FromTNetworkAddress(exec_env_->GetThriftBackendAddress());
+  *be_desc->mutable_address() =
+      FromTNetworkAddress(exec_env_->configured_backend_address());
   be_desc->set_ip_address(exec_env_->ip_address());
   be_desc->set_is_coordinator(FLAGS_is_coordinator);
   be_desc->set_is_executor(FLAGS_is_executor);
@@ -2661,8 +2653,8 @@ void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status) {
   crs->set_expired();
 }
 
-Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port,
-    int32_t hs2_http_port) {
+Status ImpalaServer::Start(
+    int32_t beeswax_port, int32_t hs2_port, int32_t hs2_http_port) {
   exec_env_->SetImpalaServer(this);
 
   // We must register the HTTP handlers after registering the ImpalaServer with the
@@ -2695,32 +2687,9 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
         SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
   }
 
-  // Start the internal service.
-  if (thrift_be_port > 0 || (TestInfo::is_test() && thrift_be_port == 0)) {
-    boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService());
-    boost::shared_ptr<TProcessor> be_processor(
-        new ImpalaInternalServiceProcessor(thrift_if));
-    boost::shared_ptr<TProcessorEventHandler> event_handler(
-        new RpcEventHandler("backend", exec_env_->metrics()));
-    be_processor->setEventHandler(event_handler);
-
-    ThriftServerBuilder be_builder("backend", be_processor, thrift_be_port);
-
-    if (IsInternalTlsConfigured()) {
-      LOG(INFO) << "Enabling SSL for backend";
-      be_builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
-          .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
-          .ssl_version(ssl_version)
-          .cipher_list(FLAGS_ssl_cipher_list);
-    }
-    ThriftServer* server;
-    RETURN_IF_ERROR(be_builder.metrics(exec_env_->metrics()).Build(&server));
-    thrift_be_server_.reset(server);
-  }
-
   if (!FLAGS_is_coordinator) {
     LOG(INFO) << "Initialized executor Impala server on "
-              << TNetworkAddressToString(GetThriftBackendAddress());
+              << TNetworkAddressToString(exec_env_->configured_backend_address());
   } else {
     // Initialize the client servers.
     boost::shared_ptr<ImpalaServer> handler = shared_from_this();
@@ -2813,14 +2782,10 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
     }
   }
   LOG(INFO) << "Initialized coordinator/executor Impala server on "
-      << TNetworkAddressToString(GetThriftBackendAddress());
+            << TNetworkAddressToString(exec_env_->configured_backend_address());
 
   // Start the RPC services.
   RETURN_IF_ERROR(exec_env_->StartKrpcService());
-  if (thrift_be_server_.get()) {
-    RETURN_IF_ERROR(thrift_be_server_->Start());
-    LOG(INFO) << "Impala InternalService listening on " << thrift_be_server_->port();
-  }
   if (hs2_server_.get()) {
     RETURN_IF_ERROR(hs2_server_->Start());
     LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_server_->port();
@@ -2845,8 +2810,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
 void ImpalaServer::Join() {
   // The server shuts down by exiting the process, so just block here until the process
   // exits.
-  thrift_be_server_->Join();
-  thrift_be_server_.reset();
+  exec_env_->rpc_mgr()->Join();
 
   if (FLAGS_is_coordinator) {
     beeswax_server_->Join();
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 52e0a85..03b0612 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -31,7 +31,6 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/ImpalaHiveServer2Service.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/control_service.pb.h"
 #include "kudu/util/random.h"
@@ -209,9 +208,8 @@ class ImpalaServer : public ImpalaServiceIf,
   /// the port the server run on. A port value of 0 means to choose an arbitrary
   /// ephemeral port in tests and to not start the service in a daemon. A port < 0
   /// always means to not start the service. The port values can be obtained after
-  /// Start() by calling GetThriftBackendPort(), GetBeeswaxPort() or GetHS2Port().
-  Status Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t hs2_port,
-      int32_t hs2_http_port);
+  /// Start() by calling GetBeeswaxPort() or GetHS2Port().
+  Status Start(int32_t beeswax_port, int32_t hs2_port, int32_t hs2_http_port);
 
   /// Blocks until the server shuts down.
   void Join();
@@ -361,7 +359,7 @@ class ImpalaServer : public ImpalaServiceIf,
   void PrepareQueryContext(TQueryCtx* query_ctx);
 
   /// Static helper for PrepareQueryContext() that is used from expr-benchmark.
-  static void PrepareQueryContext(const TNetworkAddress& backend_addr,
+  static void PrepareQueryContext(const std::string& hostname,
       const TNetworkAddress& krpc_addr, TQueryCtx* query_ctx);
 
   /// SessionHandlerIf methods
@@ -442,14 +440,6 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Returns whether this backend is healthy, i.e. able to accept queries.
   bool IsHealthy();
 
-  /// Returns the port that the thrift backend server is listening on. Valid to call after
-  /// the server has started successfully.
-  int GetThriftBackendPort();
-
-  /// Returns the network address that the thrift backend server is listening on. Valid
-  /// to call after the server has started successfully.
-  TNetworkAddress GetThriftBackendAddress();
-
   /// Returns the port that the Beeswax server is listening on. Valid to call after
   /// the server has started successfully.
   int GetBeeswaxPort();
@@ -1579,7 +1569,6 @@ class ImpalaServer : public ImpalaServiceIf,
   boost::scoped_ptr<ThriftServer> beeswax_server_;
   boost::scoped_ptr<ThriftServer> hs2_server_;
   boost::scoped_ptr<ThriftServer> hs2_http_server_;
-  boost::scoped_ptr<ThriftServer> thrift_be_server_;
 
   /// Flag that records if backend and/or client services have been started. The flag is
   /// set after all services required for the server have been started.
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 22db7fe..9704067 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -31,7 +31,6 @@
 #include "exec/hbase-table-writer.h"
 #include "exprs/hive-udf-call.h"
 #include "exprs/timezone_db.h"
-#include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaService.h"
 #include "rpc/rpc-trace.h"
 #include "rpc/thrift-server.h"
@@ -54,7 +53,6 @@ using namespace impala;
 DECLARE_int32(beeswax_port);
 DECLARE_int32(hs2_port);
 DECLARE_int32(hs2_http_port);
-DECLARE_int32(be_port);
 DECLARE_bool(is_coordinator);
 
 int ImpaladMain(int argc, char** argv) {
@@ -85,8 +83,8 @@ int ImpaladMain(int argc, char** argv) {
   InitRpcEventTracing(exec_env.webserver(), exec_env.rpc_mgr());
 
   boost::shared_ptr<ImpalaServer> impala_server(new ImpalaServer(&exec_env));
-  Status status = impala_server->Start(
-      FLAGS_be_port, FLAGS_beeswax_port, FLAGS_hs2_port, FLAGS_hs2_http_port);
+  Status status =
+      impala_server->Start(FLAGS_beeswax_port, FLAGS_hs2_port, FLAGS_hs2_http_port);
   if (!status.ok()) {
     LOG(ERROR) << "Impalad services did not start correctly, exiting.  Error: "
         << status.GetDetail();
diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc
index 7dd68ad..7877a86 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -37,7 +37,6 @@ using namespace impala;
 
 DECLARE_bool(abort_on_config_error);
 DECLARE_int32(idle_session_timeout);
-DECLARE_int32(be_port);
 DECLARE_int32(beeswax_port);
 
 // TODO: When sleep(..) queries can be cancelled, write a test that confirms long-running
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index a9bdfeb..337932d 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -37,7 +37,6 @@
 
 DECLARE_string(ssl_server_certificate);
 DECLARE_string(ssl_private_key);
-DECLARE_int32(be_port);
 DECLARE_int32(krpc_port);
 
 using namespace apache::thrift;
@@ -45,31 +44,27 @@ using namespace impala;
 
 Status InProcessImpalaServer::StartWithEphemeralPorts(const string& statestore_host,
     int statestore_port, InProcessImpalaServer** server) {
-  // These flags are read directly in several places to find the address of the local
-  // backend interface.
-  FLAGS_be_port = 0;
-  // Thrift server ctor allows port to be set to 0. Not supported with KRPC.
-  // So KRPC port must be explicitly set here.
+  // This flag is read directly in several places to find the address of the backend
+  // interface, so we must set it here.
   FLAGS_krpc_port = FindUnusedEphemeralPort();
 
-  // Use wildcard addresses of 0 so that the Thrift servers will pick their own port.
-  *server = new InProcessImpalaServer(FLAGS_hostname, 0, FLAGS_krpc_port, 0, 0,
-      statestore_host, statestore_port);
+  *server = new InProcessImpalaServer(
+      FLAGS_hostname, FLAGS_krpc_port, 0, 0, statestore_host, statestore_port);
   // Start the daemon and check if it works, if not delete the current server object and
   // pick a new set of ports
   return (*server)->StartWithClientServers(0, 0, 0);
 }
 
-InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int backend_port,
-    int krpc_port, int subscriber_port, int webserver_port, const string& statestore_host,
+InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int krpc_port,
+    int subscriber_port, int webserver_port, const string& statestore_host,
     int statestore_port)
-  : backend_port_(backend_port),
+  : krpc_port_(krpc_port),
     beeswax_port_(0),
     hs2_port_(0),
     hs2_http_port_(0),
     impala_server_(NULL),
-    exec_env_(new ExecEnv(backend_port, krpc_port, subscriber_port, webserver_port,
-        statestore_host, statestore_port)) {}
+    exec_env_(new ExecEnv(
+        krpc_port, subscriber_port, webserver_port, statestore_host, statestore_port)) {}
 
 void InProcessImpalaServer::SetCatalogIsReady() {
   DCHECK(impala_server_ != NULL) << "Call Start*() first.";
@@ -85,15 +80,10 @@ Status InProcessImpalaServer::StartWithClientServers(
 
   impala_server_.reset(new ImpalaServer(exec_env_.get()));
   SetCatalogIsReady();
-  RETURN_IF_ERROR(
-      impala_server_->Start(backend_port_, beeswax_port, hs2_port, hs2_http_port_));
-
-  // This flag is read directly in several places to find the address of the local
-  // backend interface.
-  FLAGS_be_port = impala_server_->GetThriftBackendPort();
+  RETURN_IF_ERROR(impala_server_->Start(beeswax_port, hs2_port, hs2_http_port));
 
   // Wait for up to 1s for the backend server to start
-  RETURN_IF_ERROR(WaitForServer(FLAGS_hostname, FLAGS_be_port, 10, 100));
+  RETURN_IF_ERROR(WaitForServer(FLAGS_hostname, krpc_port_, 10, 100));
   return Status::OK();
 }
 
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index 3902520..71f480c 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -42,9 +42,8 @@ class InProcessImpalaServer {
  public:
   /// Initialises the server, but does not start any network-attached
   /// services or run any threads.
-  InProcessImpalaServer(const std::string& hostname, int backend_port, int krpc_port,
-                        int subscriber_port, int webserver_port,
-                        const std::string& statestore_host, int statestore_port);
+  InProcessImpalaServer(const std::string& hostname, int krpc_port, int subscriber_port,
+      int webserver_port, const std::string& statestore_host, int statestore_port);
 
   /// Starts an in-process Impala server with ephemeral ports that are independent of the
   /// ports used by a concurrently running normal Impala daemon. The hostname is set to
@@ -80,7 +79,7 @@ class InProcessImpalaServer {
   int GetHS2Port() const;
 
  private:
-  uint32_t backend_port_;
+  uint32_t krpc_port_;
 
   uint32_t beeswax_port_;
 
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index f31f895..d2d7eb2 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -64,7 +64,7 @@ using boost::tokenizer;
 using namespace beeswax;
 using namespace parquet;
 
-DECLARE_int32(be_port);
+DECLARE_int32(krpc_port);
 DECLARE_string(hostname);
 
 namespace impala {
@@ -314,7 +314,7 @@ string GetStackTrace() {
 }
 
 string GetBackendString() {
-  return Substitute("$0:$1", FLAGS_hostname, FLAGS_be_port);
+  return Substitute("$0:$1", FLAGS_hostname, FLAGS_krpc_port);
 }
 
 DebugActionTokens TokenizeDebugActions(const string& debug_actions) {
diff --git a/bin/generate_minidump_collection_testdata.py b/bin/generate_minidump_collection_testdata.py
index 021941b..2cee8b9 100755
--- a/bin/generate_minidump_collection_testdata.py
+++ b/bin/generate_minidump_collection_testdata.py
@@ -49,7 +49,6 @@ options, args = parser.parse_args()
 
 CONFIG_FILE = '''-beeswax_port=21000
 -fe_port=21000
--be_port=22000
 -hs2_port=21050
 -enable_webserver=true
 -mem_limit=108232130560
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index fe0e16d..27716ed 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -533,12 +533,12 @@ struct TQueryCtx {
   // Process ID of the impalad to which the user is connected.
   5: required i32 pid
 
-  // The initiating coordinator's address of its thrift based ImpalaInternalService.
+  // The coordinator's hostname.
   // TODO: determine whether we can get this somehow via the Thrift rpc mechanism.
-  6: optional Types.TNetworkAddress coord_address
+  6: optional string coord_hostname
 
   // The initiating coordinator's address of its KRPC based ImpalaInternalService.
-  7: optional Types.TNetworkAddress coord_krpc_address
+  7: optional Types.TNetworkAddress coord_ip_address
 
   // List of tables missing relevant table and/or column stats. Used for
   // populating query-profile fields consumed by CM as well as warning messages.
@@ -785,7 +785,3 @@ struct TParseDateStringResult {
   // parsed date string was not in a canonical form.
   3: optional string canonical_date_string
 }
-
-service ImpalaInternalService {
-
-}
diff --git a/tests/custom_cluster/test_blacklist.py b/tests/custom_cluster/test_blacklist.py
index 78ec93e..8d24161 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -64,7 +64,8 @@ class TestBlacklist(CustomClusterTestSuite):
     backends_json = self.cluster.impalads[0].service.get_debug_webpage_json("/backends")
     match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
     assert match.group(1) == "%s:%s" % \
-        (killed_impalad.hostname, killed_impalad.service.be_port), result.runtime_profile
+        (killed_impalad.hostname, killed_impalad.service.krpc_port), \
+        result.runtime_profile
     assert backends_json["num_blacklisted_backends"] == 1, backends_json
     assert backends_json["num_active_backends"] == 2, backends_json
     assert len(backends_json["backends"]) == 3, backends_json
@@ -111,7 +112,8 @@ class TestBlacklist(CustomClusterTestSuite):
     result = self.execute_query("select count(*) from tpch.lineitem")
     match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
     assert match.group(1) == "%s:%s" % \
-        (killed_impalad.hostname, killed_impalad.service.be_port), result.runtime_profile
+        (killed_impalad.hostname, killed_impalad.service.krpc_port), \
+        result.runtime_profile
 
     # Restart the impalad.
     killed_impalad.start()
@@ -165,5 +167,5 @@ class TestBlacklist(CustomClusterTestSuite):
     result = self.execute_query("select count(*) from tpch.lineitem")
     match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
     assert match is not None and match.group(1) == "%s:%s" % \
-      (killed_impalad.hostname, killed_impalad.service.be_port), \
+      (killed_impalad.hostname, killed_impalad.service.krpc_port), \
       result.runtime_profile
diff --git a/tests/custom_cluster/test_process_failures.py b/tests/custom_cluster/test_process_failures.py
index 4e106ef..0bf5b2a 100644
--- a/tests/custom_cluster/test_process_failures.py
+++ b/tests/custom_cluster/test_process_failures.py
@@ -155,7 +155,7 @@ class TestProcessFailures(CustomClusterTestSuite):
     # Assert that the query status on the query profile web page contains the expected
     # failed hostport.
     failed_hostport = "%s:%s" % (worker_impalad.service.hostname,
-                                 worker_impalad.service.be_port)
+                                 worker_impalad.service.krpc_port)
     query_profile_page = impalad.service.read_query_profile_page(query_id)
     assert failed_hostport in query_profile_page,\
         "Query status did not contain expected hostport %s\n\n%s" % (failed_hostport,
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 95ee8df..45caa25 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -944,7 +944,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     """Validate that the given profile indicates that the given impalad was blacklisted
     during query execution."""
     assert "Blacklisted Executors: {0}:{1}".format(blacklisted_impalad.hostname,
-        blacklisted_impalad.service.be_port) in profile, profile
+        blacklisted_impalad.service.krpc_port) in profile, profile
 
   def __assert_executors_not_blacklisted(self, impalad, profile):
     """Validate that the given profile indicates that the given impalad was not
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index 22bbbf9..d08c293 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -277,12 +277,11 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite):
     assert ("This may be because the port specified is wrong.") not in str(ex)
 
     # Test that pointing to the wrong thrift service (the HS2 port) fails gracefully-ish.
-    thrift_ports = [21051, 22001]  # HS2 port, old backend port.
-    for port in thrift_ports:
-      ex = self.execute_query_expect_failure(self.client,
-          ":shutdown('localhost:{0}')".format(port))
-      assert ("failed with error 'RemoteShutdown() RPC failed") in str(ex)
-      assert ("This may be because the port specified is wrong.") in str(ex)
+    thrift_port = 21051  # HS2 port.
+    ex = self.execute_query_expect_failure(self.client,
+        ":shutdown('localhost:{0}')".format(thrift_port))
+    assert ("failed with error 'RemoteShutdown() RPC failed") in str(ex)
+    assert ("This may be because the port specified is wrong.") in str(ex)
 
     # Test RPC error handling with debug action.
     ex = self.execute_query_expect_failure(self.client, ":shutdown('localhost:27001')",
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 7cc693c..4026e67 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -661,8 +661,8 @@ class TestWebPage(ImpalaTestSuite):
 
     # The 'address' column is the backend port of the impalad.
     assert len(backend_row['address']) > 0
-    be_ports = ('22000', '22001', '22002')
-    assert backend_row['address'].endswith(be_ports)
+    krpc_ports = ('27000', '27001', '27002')
+    assert backend_row['address'].endswith(krpc_ports)
 
     # The 'krpc_address' is the krpc address of the impalad.
     assert len(backend_row['krpc_address']) > 0


[impala] 03/03: IMPALA-10164: Supporting HadoopCatalog for Iceberg table

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 5b720a4d18cc2f2ade54ab223663521a3822343f
Author: skyyws <sk...@163.com>
AuthorDate: Fri Sep 11 13:40:38 2020 +0800

    IMPALA-10164: Supporting HadoopCatalog for Iceberg table
    
    This patch mainly realizes creating Iceberg table by HadoopCatalog.
    We only supported HadoopTables api before this patch, but now we can
    use HadoopCatalog to create Iceberg table. When creating managed table,
    we can use SQL like this:
      CREATE TABLE default.iceberg_test (
        level string,
        event_time timestamp,
        message string,
      )
      STORED AS ICEBERG
      TBLPROPERTIES ('iceberg.catalog'='hadoop.catalog',
        'iceberg.catalog_location'='hdfs://test-warehouse/iceberg_test');
    We supported two values ('hadoop.catalog', 'hadoop.tables') for
    'iceberg.catalog' now. If you don't specify this property in your SQL,
    default catalog type is 'hadoop.catalog'.
    As for external Iceberg table, you can use SQL like this:
      CREATE EXTERNAL TABLE default.iceberg_test_external
      STORED AS ICEBERG
      TBLPROPERTIES ('iceberg.catalog'='hadoop.catalog',
        'iceberg.catalog_location'='hdfs://test-warehouse/iceberg_test',
        'iceberg.table_identifier'='default.iceberg_test');
    We cannot set table location for both managed and external Iceberg
    table with 'hadoop.catalog', and 'SHOW CREATE TABLE' will not display
    table location yet. We need to use 'DESCRIBE FORMATTED/EXTENDED' to
    get this location info.
    'iceberg.catalog_location' is necessary for 'hadoop.catalog' table,
    which used to reserved Iceberg table metadata and data, and we use this
    location to load table metadata from Iceberg.
    'iceberg.table_identifier' is used for Icebreg TableIdentifier.If this
    property not been specified in SQL, Impala will use database and table name
    to load Iceberg table, which is 'default.iceberg_test_external' in above SQL.
    This property value is splitted by '.', you can alse set this value like this:
    'org.my_db.my_tbl'. And this property is valid for both managed and external
    table.
    
    Testing:
    - Create table tests in functional_schema_template.sql
    - Iceberg table create test in test_iceberg.py
    - Iceberg table query test in test_scanners.py
    - Iceberg table show create table test in test_show_create_table.py
    
    Change-Id: Ic1893c50a633ca22d4bca6726c9937b026f5d5ef
    Reviewed-on: http://gerrit.cloudera.org:8080/16446
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/CatalogObjects.thrift                |   6 +
 .../apache/impala/analysis/CreateTableStmt.java    |  42 ++++--
 .../org/apache/impala/analysis/ToSqlUtils.java     |  11 +-
 .../org/apache/impala/catalog/FeIcebergTable.java  |  52 ++++++-
 .../org/apache/impala/catalog/IcebergTable.java    |  32 ++++-
 .../impala/catalog/local/LocalIcebergTable.java    |  28 +++-
 .../org/apache/impala/planner/IcebergScanNode.java |   4 +-
 .../apache/impala/service/CatalogOpExecutor.java   |  41 ++++--
 .../impala/service/IcebergCatalogOpExecutor.java   |  43 +++++-
 .../java/org/apache/impala/util/IcebergUtil.java   | 125 +++++++++++++++--
 ...02da0-b562-4310-9001-06f9b6b0f9ae-00000.parquet | Bin 0 -> 1162 bytes
 ...3aefa-65fc-4698-8f26-b155fc965cf6-00000.parquet | Bin 0 -> 1162 bytes
 ...4b016-05e1-43fc-b4a0-0e0df52a5035-00000.parquet | Bin 0 -> 1162 bytes
 ...92523-c3b9-401d-b429-363c245dbe9c-00000.parquet | Bin 0 -> 1161 bytes
 ...370cf-10a1-4e49-86dc-b094fe739aa6-00000.parquet | Bin 0 -> 1162 bytes
 ...f86fa-286f-4cd3-8337-98685c48176d-00000.parquet | Bin 0 -> 1161 bytes
 ...d2bbc-46a2-4040-a4a8-7488447de3b6-00000.parquet | Bin 0 -> 1162 bytes
 ...da250-ed1c-4868-bbf1-f2aad65fa80c-00000.parquet | Bin 0 -> 1162 bytes
 ...77823-ded1-4a12-9e03-4027cd43966a-00000.parquet | Bin 0 -> 1169 bytes
 ...8d7a4-245f-44d5-8a59-ed511854c8f8-00000.parquet | Bin 0 -> 1169 bytes
 ...d5490-91f7-47bd-a3b6-e86caa7fe47d-00000.parquet | Bin 0 -> 1169 bytes
 ...f5fcf-4346-421f-b2ef-1f9d55fb4c84-00000.parquet | Bin 0 -> 1169 bytes
 ...c64ed-7a99-4f43-ada7-225c92f6a993-00000.parquet | Bin 0 -> 1169 bytes
 ...2c862-3d63-42cb-8041-0a0b14b8ca13-00000.parquet | Bin 0 -> 1169 bytes
 ...88e68-c862-4248-b3e5-84228a3ec39d-00000.parquet | Bin 0 -> 1190 bytes
 ...31dc0-b7eb-424d-9edb-dd2cedc59784-00000.parquet | Bin 0 -> 1190 bytes
 ...1e5f3-cfa7-4190-bb30-0db1d53202fd-00000.parquet | Bin 0 -> 1190 bytes
 ...b52b1-dc5b-4417-81b7-8e9fd992280b-00000.parquet | Bin 0 -> 1190 bytes
 ...6ba9a-9387-4c38-bab8-a0598c400fde-00000.parquet | Bin 0 -> 1190 bytes
 ...283a3-b39f-4273-984b-cf7faf39dd9d-00000.parquet | Bin 0 -> 1190 bytes
 .../2c2fa00b-eb20-460a-835b-d69b32560e21-m0.avro   | Bin 0 -> 5599 bytes
 ...465-1-2c2fa00b-eb20-460a-835b-d69b32560e21.avro | Bin 0 -> 2588 bytes
 .../hadoop_catalog_test/metadata/v1.metadata.json  |  62 +++++++++
 .../hadoop_catalog_test/metadata/v2.metadata.json  |  81 +++++++++++
 .../hadoop_catalog_test/metadata/version-hint.text |   1 +
 .../functional/functional_schema_template.sql      |  18 ++-
 .../queries/QueryTest/iceberg-create.test          | 155 ++++++++++++++++++++-
 .../queries/QueryTest/iceberg-negative.test        |  45 +++++-
 .../queries/QueryTest/iceberg-query.test           | 123 ++++++++++++++++
 .../queries/QueryTest/show-create-table.test       |  54 ++++++-
 40 files changed, 858 insertions(+), 65 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index d0a1a2d..f020d96 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -93,6 +93,12 @@ enum TIcebergFileFormat {
   ORC = 1
 }
 
+// Iceberg table catalog type identified by table property 'iceberg.catalog'
+enum TIcebergCatalog {
+  HADOOP_TABLES = 0
+  HADOOP_CATALOG = 1
+}
+
 enum TColumnEncoding {
   AUTO = 0
   PLAIN = 1
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 96ac124..518c4c1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -36,11 +36,13 @@ import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
+import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.AvroSchemaConverter;
 import org.apache.impala.util.AvroSchemaParser;
 import org.apache.impala.util.AvroSchemaUtils;
+import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.MetaStoreUtil;
 
@@ -572,16 +574,16 @@ public class CreateTableStmt extends StatementBase {
           IcebergTable.TBL_PROP_EXTERNAL_TABLE_PURGE));
     }
 
-    if ((!isExternal() || Boolean.parseBoolean(getTblProperties().get(
-        Table.TBL_PROP_EXTERNAL_TABLE_PURGE))) && getColumnDefs().isEmpty()) {
-      // External iceberg table can have empty column, but managed iceberg table
-      // requires at least one column.
-      throw new AnalysisException("Table requires at least 1 column for " +
-          "managed iceberg table.");
-    }
+    // Check for managed table
+    if (!isExternal() || Boolean.parseBoolean(getTblProperties().get(
+        Table.TBL_PROP_EXTERNAL_TABLE_PURGE))) {
+      if (getColumnDefs().isEmpty()) {
+        // External iceberg table can have empty column, but managed iceberg table
+        // requires at least one column.
+        throw new AnalysisException("Table requires at least 1 column for " +
+            "managed iceberg table.");
+      }
 
-    if ((!isExternal() || Boolean.parseBoolean(getTblProperties().get(
-        Table.TBL_PROP_EXTERNAL_TABLE_PURGE)))) {
       // Check partition columns for managed iceberg table
       checkPartitionColumns();
     }
@@ -598,6 +600,28 @@ public class CreateTableStmt extends StatementBase {
     if (fileformat == null || fileformat.isEmpty()) {
       putGeneratedKuduProperty(IcebergTable.ICEBERG_FILE_FORMAT, "parquet");
     }
+
+    String catalog = getTblProperties().get(IcebergTable.ICEBERG_CATALOG);
+    if (catalog == null || catalog.isEmpty()) {
+      putGeneratedKuduProperty(IcebergTable.ICEBERG_CATALOG, "hadoop.catalog");
+    }
+
+    // Some constraints for Iceberg table with 'hadoop.catalog'
+    if (catalog == null || catalog.isEmpty() ||
+        IcebergUtil.getIcebergCatalog(catalog) == TIcebergCatalog.HADOOP_CATALOG) {
+      // Table location cannot be set in SQL when using 'hadoop.catalog'
+      if (getLocation() != null) {
+        throw new AnalysisException(String.format("Location cannot be set for Iceberg " +
+            "table with 'hadoop.catalog'."));
+      }
+
+      String catalogLoc = getTblProperties().get(IcebergTable.ICEBERG_CATALOG_LOCATION);
+      if (catalogLoc == null || catalogLoc.isEmpty()) {
+        throw new AnalysisException(String.format("Table property '%s' is necessary " +
+            "for Iceberg table with 'hadoop.catalog'.",
+            IcebergTable.ICEBERG_CATALOG_LOCATION));
+      }
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index b4c5310..7687208 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -49,8 +49,10 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.KuduUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -514,7 +516,14 @@ public class ToSqlUtils {
             "WITH SERDEPROPERTIES " + propertyMapToSql(serdeParameters) + "\n");
       }
     }
-    if (location != null) {
+
+    // Iceberg table with 'hadoop.catalog' do not display table LOCATION when using
+    // 'show create table', user can use 'describe formatted/extended' to get location
+    TIcebergCatalog icebergCatalog =
+        IcebergUtil.getIcebergCatalog(tblProperties.get(IcebergTable.ICEBERG_CATALOG));
+    boolean isHadoopCatalog = fileFormat == HdfsFileFormat.ICEBERG &&
+        icebergCatalog == TIcebergCatalog.HADOOP_CATALOG;
+    if (location != null && !isHadoopCatalog) {
       sb.append("LOCATION '" + location.toString() + "'\n");
     }
     if (tblProperties != null && !tblProperties.isEmpty()) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index c216a72..5230eb5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -46,6 +46,7 @@ import org.apache.impala.compat.HdfsShim;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.THdfsPartition;
+import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TIcebergTable;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -71,12 +72,26 @@ public interface FeIcebergTable extends FeFsTable {
   FeFsTable getFeFsTable();
 
   /**
+   * Return iceberg catalog type from table properties
+   */
+  TIcebergCatalog getIcebergCatalog();
+
+  /**
+   * Return Iceberg catalog location, we use this location to load metadata from Iceberg
+   * When using 'hadoop.tables', this value equals to table location
+   * When using 'hadoop.catalog', this value equals to 'iceberg.catalog_location'
+   */
+  String getIcebergCatalogLocation();
+
+  /**
    * Return iceberg file format from table properties
    */
   TIcebergFileFormat getIcebergFileFormat();
 
   /**
-   * Return the name of iceberg table name, usually a hdfs location path
+   * Return the table location of Iceberg table
+   * When using 'hadoop.tables', this value is a normal table location
+   * When using 'hadoop.catalog', this value is 'iceberg.catalog_location' + identifier
    */
   String getIcebergTableLocation();
 
@@ -215,8 +230,7 @@ public interface FeIcebergTable extends FeFsTable {
       resultSchema.addToColumns(new TColumn("Field Partition Transform",
           Type.STRING.toThrift()));
 
-      TableMetadata metadata = IcebergUtil.
-          getIcebergTableMetadata(table.getIcebergTableLocation());
+      TableMetadata metadata = IcebergUtil.getIcebergTableMetadata(table);
       if (!metadata.specs().isEmpty()) {
         // Just show the latest PartitionSpec from iceberg table metadata
         PartitionSpec latestSpec = metadata.specs().get(metadata.specs().size() - 1);
@@ -234,6 +248,29 @@ public interface FeIcebergTable extends FeFsTable {
     }
 
     /**
+     * Get Iceberg table catalog location by table properties
+     */
+    public static String getIcebergCatalogLocation(FeIcebergTable table) {
+      if (table.getIcebergCatalog() == TIcebergCatalog.HADOOP_CATALOG) {
+        return getIcebergCatalogLocation(table.getMetaStoreTable());
+      } else {
+        return table.getIcebergTableLocation();
+      }
+    }
+
+    /**
+     * When using 'hadoop.catalog', we need to use this method to get qualified catalog
+     * location, for example: transform '/test-warehouse/hadoop_catalog_test' to
+     * 'hdfs://localhost:20500/test-warehouse/hadoop_catalog_test'
+     */
+    public static String getIcebergCatalogLocation(
+        org.apache.hadoop.hive.metastore.api.Table msTable) {
+      String location =
+          msTable.getParameters().get(IcebergTable.ICEBERG_CATALOG_LOCATION);
+      return FileSystemUtil.createFullyQualifiedPath(new Path(location)).toString();
+    }
+
+    /**
      * Get iceberg table file format from hms table properties
      */
     public static TIcebergFileFormat getIcebergFileFormat(
@@ -295,15 +332,16 @@ public interface FeIcebergTable extends FeFsTable {
      * Get all FileDescriptor from iceberg table without any predicates.
      */
     public static Map<String, HdfsPartition.FileDescriptor> loadAllPartition(
-        String location, FeIcebergTable table) throws IOException {
+        FeIcebergTable table) throws IOException {
       // Empty predicates
-      List<DataFile> dataFileList = IcebergUtil.getIcebergDataFiles(
-          location, new ArrayList<>());
+      List<DataFile> dataFileList = IcebergUtil.getIcebergDataFiles(table,
+          new ArrayList<>());
 
       Map<String, HdfsPartition.FileDescriptor> fileDescMap = new HashMap<>();
       for (DataFile file : dataFileList) {
         HdfsPartition.FileDescriptor fileDesc = getFileDescriptor(
-            new Path(file.path().toString()), new Path(location), table.getHostIndex());
+            new Path(file.path().toString()),
+            new Path(table.getIcebergTableLocation()), table.getHostIndex());
         fileDescMap.put(IcebergUtil.getDataFileMD5(file), fileDesc);
       }
       return fileDescMap;
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 9ffc12f..afe1ba2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -36,6 +36,7 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsTable;
+import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TIcebergPartitionField;
 import org.apache.impala.thrift.TIcebergPartitionSpec;
@@ -67,6 +68,21 @@ public class IcebergTable extends Table implements FeIcebergTable {
   // Iceberg file format key in tblproperties
   public static final String ICEBERG_FILE_FORMAT = "iceberg_file_format";
 
+  // Iceberg catalog type key in tblproperties
+  public static final String ICEBERG_CATALOG = "iceberg.catalog";
+
+  // Iceberg table catalog location key in tblproperties when using HadoopCatalog
+  // This property is necessary for both managed and external Iceberg table with
+  // 'hadoop.catalog'
+  public static final String ICEBERG_CATALOG_LOCATION = "iceberg.catalog_location";
+
+  // Iceberg table namespace key in tblproperties when using HadoopCatalog,
+  // We use database.table instead if this property not been set in SQL
+  public static final String ICEBERG_TABLE_IDENTIFIER = "iceberg.table_identifier";
+
+  // Iceberg catalog type dependend on table properties
+  private TIcebergCatalog icebergCatalog_;
+
   // Iceberg file format dependend on table properties
   private TIcebergFileFormat icebergFileFormat_;
 
@@ -89,6 +105,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
       Db db, String name, String owner) {
     super(msTable, db, name, owner);
     icebergTableLocation_ = msTable.getSd().getLocation();
+    icebergCatalog_ = IcebergUtil.getIcebergCatalog(msTable);
     icebergFileFormat_ = Utils.getIcebergFileFormat(msTable);
     hdfsTable_ = new HdfsTable(msTable, db, name, owner);
   }
@@ -134,6 +151,16 @@ public class IcebergTable extends Table implements FeIcebergTable {
   }
 
   @Override
+  public TIcebergCatalog getIcebergCatalog() {
+    return icebergCatalog_;
+  }
+
+  @Override
+  public String getIcebergCatalogLocation() {
+    return Utils.getIcebergCatalogLocation(this);
+  }
+
+  @Override
   public TIcebergFileFormat getIcebergFileFormat() {
     return icebergFileFormat_;
   }
@@ -192,8 +219,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
         // Loading hdfs table after loaded schema from Iceberg,
         // in case we create external Iceberg table skipping column info in sql.
         hdfsTable_.load(false, msClient, msTable_, true, true, false, null, reason);
-        pathMD5ToFileDescMap_ = Utils.loadAllPartition(msTable_.getSd().getLocation(),
-            this);
+        pathMD5ToFileDescMap_ = Utils.loadAllPartition(this);
         loadAllColumnStats(msClient);
       } catch (Exception e) {
         throw new TableLoadingException("Error loading metadata for Iceberg table " +
@@ -225,7 +251,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
    * Load schema and partitioning schemes directly from Iceberg.
    */
   public void loadSchemaFromIceberg() throws TableLoadingException {
-    TableMetadata metadata = IcebergUtil.getIcebergTableMetadata(icebergTableLocation_);
+    TableMetadata metadata = IcebergUtil.getIcebergTableMetadata(this);
     icebergSchema_ = metadata.schema();
     loadSchema();
     partitionSpecs_ = Utils.loadPartitionSpecByIceberg(metadata);
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 388e3a3..4df8d46 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -36,6 +36,7 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
+import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
@@ -60,8 +61,10 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
     Preconditions.checkNotNull(msTable);
     try {
       TableParams params = new TableParams(msTable);
+      String tableName = IcebergUtil.getIcebergTableIdentifier(msTable);
       TableMetadata metadata =
-          IcebergUtil.getIcebergTableMetadata(params.icebergTableLocation_);
+          IcebergUtil.getIcebergTableMetadata(params.icebergCatalog_, tableName,
+              params.icebergCatalogLocation_);
 
       List<IcebergPartitionSpec> partitionSpecs =
           Utils.loadPartitionSpecByIceberg(metadata);
@@ -84,13 +87,13 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
     partitionSpecs_ = partitionSpecs;
     localFsTable_ = LocalFsTable.load(db, msTable, ref);
     try {
-      pathMD5ToFileDescMap_ = Utils.loadAllPartition(tableParams_.icebergTableLocation_,
-          this);
+      pathMD5ToFileDescMap_ = Utils.loadAllPartition(this);
     } catch (IOException e) {
       throw new TableLoadingException(e.getMessage());
     }
 
     icebergFileFormat_ = Utils.getIcebergFileFormat(msTable);
+
   }
 
   @Override
@@ -104,6 +107,16 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   }
 
   @Override
+  public TIcebergCatalog getIcebergCatalog() {
+    return tableParams_.icebergCatalog_;
+  }
+
+  @Override
+  public String getIcebergCatalogLocation() {
+    return tableParams_.icebergCatalogLocation_;
+  }
+
+  @Override
   public FeFsTable getFeFsTable() {
     return localFsTable_;
   }
@@ -160,6 +173,8 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   @Immutable
   private static class TableParams {
     private final String icebergTableLocation_;
+    private final TIcebergCatalog icebergCatalog_;
+    private final String icebergCatalogLocation_;
 
     TableParams(Table msTable) {
       String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
@@ -169,6 +184,13 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
         throw new LocalCatalogException("Cannot find iceberg table location for table "
             + fullTableName);
       }
+      icebergCatalog_ = IcebergUtil.getIcebergCatalog(msTable);
+
+      if (icebergCatalog_ == TIcebergCatalog.HADOOP_CATALOG) {
+        icebergCatalogLocation_ = Utils.getIcebergCatalogLocation(msTable);
+      } else {
+        icebergCatalogLocation_ = icebergTableLocation_;
+      }
     }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
index 5da888f..0be0ebb 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
@@ -89,8 +89,8 @@ public class IcebergScanNode extends HdfsScanNode {
    */
   public List<FileDescriptor> getFileDescriptorByIcebergPredicates()
       throws ImpalaRuntimeException{
-    List<DataFile> dataFileList = IcebergUtil.getIcebergDataFiles(
-        icebergTable_.getIcebergTableLocation(), icebergPredicates_);
+    List<DataFile> dataFileList = IcebergUtil.getIcebergDataFiles(icebergTable_,
+        icebergPredicates_);
 
     List<FileDescriptor> fileDescList = new ArrayList<>();
     for (DataFile dataFile : dataFileList) {
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 88801a5..0df927e 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -173,6 +173,8 @@ import org.apache.impala.thrift.TGrantRevokePrivParams;
 import org.apache.impala.thrift.TGrantRevokeRoleParams;
 import org.apache.impala.thrift.THdfsCachingOp;
 import org.apache.impala.thrift.THdfsFileFormat;
+import org.apache.impala.thrift.TCopyTestCaseReq;
+import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TPartitionDef;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPartitionStats;
@@ -197,6 +199,7 @@ import org.apache.impala.util.AcidUtils.TblTransaction;
 import org.apache.impala.util.CompressionUtil;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.HdfsCachingUtil;
+import org.apache.impala.util.IcebergUtil;
 import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.MetaStoreUtil.TableInsertEventInfo;
@@ -2599,8 +2602,7 @@ public class CatalogOpExecutor {
    * Creates a new Iceberg table.
    */
   private boolean createIcebergTable(org.apache.hadoop.hive.metastore.api.Table newTable,
-                                  TCreateTableParams params, TDdlExecResponse response)
-      throws ImpalaException {
+      TCreateTableParams params, TDdlExecResponse response) throws ImpalaException {
     Preconditions.checkState(IcebergTable.isIcebergTable(newTable));
 
     try {
@@ -2612,20 +2614,41 @@ public class CatalogOpExecutor {
               msClient.getHiveClient().tableExists(newTable.getDbName(),
                   newTable.getTableName());
           if (!tableInMetastore) {
+            TIcebergCatalog catalog = IcebergUtil.getIcebergCatalog(newTable);
             String location = newTable.getSd().getLocation();
             //Create table in iceberg if necessary
             if (IcebergTable.needsCreateInIceberg(newTable)) {
+              //Set location here if not been specified in sql
               if (location == null) {
-                //Set location here if not been specified in sql
-                location = MetastoreShim.getPathForNewTable(
-                    msClient.getHiveClient().getDatabase(newTable.getDbName()), newTable);
-                newTable.getSd().setLocation(location);
+                if (catalog == TIcebergCatalog.HADOOP_CATALOG) {
+                  // Using catalog location to create table
+                  // We cannot set location for 'hadoop.catalog' table in SQL
+                  location = IcebergUtil.getIcebergCatalogLocation(newTable);
+                } else {
+                  // Using normal location as 'hadoop.tables' table location and create
+                  // table
+                  location = MetastoreShim.getPathForNewTable(
+                      msClient.getHiveClient().getDatabase(newTable.getDbName()),
+                      newTable);
+                }
               }
-              IcebergCatalogOpExecutor.createTable(location, params);
+              String tableLoc = IcebergCatalogOpExecutor.createTable(catalog,
+                  IcebergUtil.getIcebergTableIdentifier(newTable), location, params);
+              newTable.getSd().setLocation(tableLoc);
             } else {
               if (location == null) {
-                addSummary(response, "Location is necessary for external iceberg table.");
-                return false;
+                if (catalog == TIcebergCatalog.HADOOP_CATALOG) {
+                  // When creating external Iceberg table with 'hadoop.catalog'
+                  // We use catalog location and table identifier as location
+                  String identifier = IcebergUtil.getIcebergTableIdentifier(newTable);
+                  newTable.getSd().setLocation(String.format("%s/%s/%s",
+                      IcebergUtil.getIcebergCatalogLocation(newTable),
+                      identifier.split("\\.")[0], identifier.split("\\.")[1]));
+                } else {
+                  addSummary(response,
+                      "Location is necessary for external iceberg table.");
+                  return false;
+                }
               }
             }
 
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 4e8f2a8..6cac985 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -20,7 +20,11 @@ package org.apache.impala.service;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.types.Types;
 import org.apache.impala.catalog.ArrayType;
@@ -32,6 +36,7 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TCreateTableParams;
+import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.util.IcebergUtil;
 import org.apache.log4j.Logger;
 
@@ -47,15 +52,43 @@ public class IcebergCatalogOpExecutor {
   // Keep id increase for each thread
   private static ThreadLocal<Integer> iThreadLocal = new ThreadLocal<>();
 
-  public static void createTable(String metadataLoc, TCreateTableParams params)
-      throws ImpalaRuntimeException {
+  /**
+   * Create Iceberg table by Iceberg api
+   * Return value is table location from Iceberg
+   */
+  public static String createTable(TIcebergCatalog catalog, String identifier,
+      String location, TCreateTableParams params) throws ImpalaRuntimeException {
     // Each table id increase from zero
     iThreadLocal.set(0);
-    HadoopTables tables = IcebergUtil.getHadoopTables();
     Schema schema = createIcebergSchema(params);
-    tables.create(schema, IcebergUtil.createIcebergPartition(schema, params),
-        metadataLoc);
+    PartitionSpec spec = IcebergUtil.createIcebergPartition(schema, params);
+    String tableLoc = null;
+    if (catalog == TIcebergCatalog.HADOOP_CATALOG) {
+      tableLoc = createTableByHadoopCatalog(location, schema, spec, identifier);
+    } else {
+      Preconditions.checkArgument(catalog == TIcebergCatalog.HADOOP_TABLES);
+      tableLoc = createTableByHadoopTables(location, schema, spec);
+    }
     LOG.info("Create iceberg table successful.");
+    return tableLoc;
+  }
+
+  // Create Iceberg table by HadoopTables
+  private static String createTableByHadoopTables(String metadataLoc, Schema schema,
+      PartitionSpec spec) {
+    HadoopTables tables = IcebergUtil.getHadoopTables();
+    BaseTable table = (BaseTable) tables.create(schema, spec, null, metadataLoc);
+    return table.location();
+  }
+
+  // Create Iceberg table by HadoopCatalog
+  private static String createTableByHadoopCatalog(String catalogLoc, Schema schema,
+      PartitionSpec spec, String identifier) {
+    // Each table id increase from zero
+    HadoopCatalog catalog = IcebergUtil.getHadoopCatalog(catalogLoc);
+    BaseTable table = (BaseTable) catalog.createTable(TableIdentifier.parse(identifier),
+        schema, spec, null);
+    return table.location();
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index 5e79b76..6b206c7 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -23,11 +23,12 @@ import java.util.List;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.expressions.UnboundPredicate;
-import org.apache.iceberg.hadoop.HadoopTableOperations;
+import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
@@ -35,7 +36,10 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.types.Types;
 import org.apache.impala.catalog.ArrayType;
+import org.apache.impala.catalog.Catalog;
+import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.MapType;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.StructField;
@@ -46,6 +50,7 @@ import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.THdfsFileFormat;
+import org.apache.impala.thrift.TIcebergCatalog;
 import org.apache.impala.thrift.TIcebergFileFormat;
 import org.apache.impala.thrift.TIcebergPartitionField;
 import org.apache.impala.thrift.TIcebergPartitionTransform;
@@ -60,20 +65,80 @@ public class IcebergUtil {
   }
 
   /**
-   * Get BaseTable by iceberg file system table location
+   * Get HadoopCatalog by impala cluster related config
    */
-  public static BaseTable getBaseTable(String tableLocation) {
-    HadoopTables tables = IcebergUtil.getHadoopTables();
-    return (BaseTable) tables.load(tableLocation);
+  public static HadoopCatalog getHadoopCatalog(String location) {
+    return new HadoopCatalog(FileSystemUtil.getConfiguration(), location);
   }
 
   /**
-   * Get TableMetadata by iceberg file system table location
+   * Get BaseTable from FeIcebergTable
    */
-  public static TableMetadata getIcebergTableMetadata(String tableLocation) {
-    HadoopTableOperations operations = (HadoopTableOperations)
-        getBaseTable(tableLocation).operations();
-    return operations.current();
+  public static BaseTable getBaseTable(FeIcebergTable table) {
+    return getBaseTable(table.getIcebergCatalog(), getIcebergTableIdentifier(table),
+        table.getIcebergCatalogLocation());
+  }
+
+  /**
+   * Get BaseTable from each parameters
+   */
+  public static BaseTable getBaseTable(TIcebergCatalog catalog, String tableName,
+      String location) {
+    if (catalog == TIcebergCatalog.HADOOP_CATALOG) {
+      return getBaseTableByHadoopCatalog(tableName, location);
+    } else {
+      // We use HadoopTables as default Iceberg catalog type
+      HadoopTables hadoopTables = IcebergUtil.getHadoopTables();
+      return (BaseTable) hadoopTables.load(location);
+    }
+  }
+
+  /**
+   * Use location, namespace(database) and name(table) to get BaseTable by HadoopCatalog
+   */
+  private static BaseTable getBaseTableByHadoopCatalog(String tableName,
+      String catalogLoc) {
+    HadoopCatalog hadoopCatalog = IcebergUtil.getHadoopCatalog(catalogLoc);
+    return (BaseTable) hadoopCatalog.loadTable(TableIdentifier.parse(tableName));
+  }
+
+  /**
+   * Get TableMetadata by FeIcebergTable
+   */
+  public static TableMetadata getIcebergTableMetadata(FeIcebergTable table) {
+    return getIcebergTableMetadata(table.getIcebergCatalog(),
+        getIcebergTableIdentifier(table), table.getIcebergCatalogLocation());
+  }
+
+  /**
+   * Get TableMetadata by related info
+   * tableName is table full name, usually database.table
+   */
+  public static TableMetadata getIcebergTableMetadata(TIcebergCatalog catalog,
+      String tableName, String location) {
+    BaseTable baseTable = getBaseTable(catalog, tableName, location);
+    return baseTable.operations().current();
+  }
+
+  /**
+   * Get Iceberg table identifier by table property
+   */
+  public static String getIcebergTableIdentifier(FeIcebergTable table) {
+    return getIcebergTableIdentifier(table.getMetaStoreTable());
+  }
+
+  public static String getIcebergTableIdentifier(
+      org.apache.hadoop.hive.metastore.api.Table msTable) {
+    String name = msTable.getParameters().get(IcebergTable.ICEBERG_TABLE_IDENTIFIER);
+    if (name == null || name.isEmpty()) {
+      return msTable.getDbName() + "." + msTable.getTableName();
+    }
+
+    // If database not been specified in property, use default
+    if (!name.contains(".")) {
+      return Catalog.DEFAULT_DB + "." + name;
+    }
+    return name;
   }
 
   /**
@@ -83,7 +148,7 @@ public class IcebergUtil {
   public static PartitionSpec createIcebergPartition(Schema schema,
       TCreateTableParams params) throws ImpalaRuntimeException {
     if (params.getPartition_spec() == null) {
-      return null;
+      return PartitionSpec.unpartitioned();
     }
     PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
     List<TIcebergPartitionField> partitionFields =
@@ -108,6 +173,38 @@ public class IcebergUtil {
   }
 
   /**
+   * Get iceberg table catalog type from hms table properties
+   * use HadoopCatalog as default
+   */
+  public static TIcebergCatalog getIcebergCatalog(
+      org.apache.hadoop.hive.metastore.api.Table msTable) {
+    TIcebergCatalog catalog = getIcebergCatalog(
+        msTable.getParameters().get(IcebergTable.ICEBERG_CATALOG));
+    return catalog == null ? TIcebergCatalog.HADOOP_CATALOG : catalog;
+  }
+
+  /**
+   * Get TIcebergCatalog from a string, usually from table properties
+   */
+  public static TIcebergCatalog getIcebergCatalog(String catalog){
+    if ("hadoop.tables".equalsIgnoreCase(catalog)) {
+      return TIcebergCatalog.HADOOP_TABLES;
+    } else if ("hadoop.catalog".equalsIgnoreCase(catalog)) {
+      return TIcebergCatalog.HADOOP_CATALOG;
+    }
+    return null;
+  }
+
+  /**
+   * Get Iceberg table catalog location with 'iceberg.catalog_location' when using
+   * 'hadoop.catalog'
+   */
+  public static String getIcebergCatalogLocation(
+      org.apache.hadoop.hive.metastore.api.Table msTable) {
+    return msTable.getParameters().get(IcebergTable.ICEBERG_CATALOG_LOCATION);
+  }
+
+  /**
    * Get TIcebergFileFormat from a string, usually from table properties
    */
   public static TIcebergFileFormat getIcebergFileFormat(String format){
@@ -229,10 +326,10 @@ public class IcebergUtil {
   /**
    * Get iceberg data file by file system table location and iceberg predicates
    */
-  public static List<DataFile> getIcebergDataFiles(String location,
+  public static List<DataFile> getIcebergDataFiles(FeIcebergTable table,
       List<UnboundPredicate> predicates) {
-    BaseTable table = IcebergUtil.getBaseTable(location);
-    TableScan scan = table.newScan();
+    BaseTable baseTable = IcebergUtil.getBaseTable(table);
+    TableScan scan = baseTable.newScan();
     for (UnboundPredicate predicate : predicates) {
       scan = scan.filter(predicate);
     }
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00001-1-bc402da0-b562-4310-9001-06f9b6b0f9ae-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00001-1-bc402da0-b562-4310-9001-06f9b6b0f9ae-00000.parquet
new file mode 100644
index 0000000..dc6e432
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00001-1-bc402da0-b562-4310-9001-06f9b6b0f9ae-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00006-6-d253aefa-65fc-4698-8f26-b155fc965cf6-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00006-6-d253aefa-65fc-4698-8f26-b155fc965cf6-00000.parquet
new file mode 100644
index 0000000..b7a7af6
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00006-6-d253aefa-65fc-4698-8f26-b155fc965cf6-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00009-9-5d04b016-05e1-43fc-b4a0-0e0df52a5035-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00009-9-5d04b016-05e1-43fc-b4a0-0e0df52a5035-00000.parquet
new file mode 100644
index 0000000..624ac44
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00009-9-5d04b016-05e1-43fc-b4a0-0e0df52a5035-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00017-17-20b92523-c3b9-401d-b429-363c245dbe9c-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00017-17-20b92523-c3b9-401d-b429-363c245dbe9c-00000.parquet
new file mode 100644
index 0000000..4ca438d
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00017-17-20b92523-c3b9-401d-b429-363c245dbe9c-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00023-23-c86370cf-10a1-4e49-86dc-b094fe739aa6-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00023-23-c86370cf-10a1-4e49-86dc-b094fe739aa6-00000.parquet
new file mode 100644
index 0000000..d0df694
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00023-23-c86370cf-10a1-4e49-86dc-b094fe739aa6-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00027-27-f32f86fa-286f-4cd3-8337-98685c48176d-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00027-27-f32f86fa-286f-4cd3-8337-98685c48176d-00000.parquet
new file mode 100644
index 0000000..0f42b9f
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00027-27-f32f86fa-286f-4cd3-8337-98685c48176d-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00030-30-b18d2bbc-46a2-4040-a4a8-7488447de3b6-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00030-30-b18d2bbc-46a2-4040-a4a8-7488447de3b6-00000.parquet
new file mode 100644
index 0000000..dfc2271
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00030-30-b18d2bbc-46a2-4040-a4a8-7488447de3b6-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00031-31-c9bda250-ed1c-4868-bbf1-f2aad65fa80c-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00031-31-c9bda250-ed1c-4868-bbf1-f2aad65fa80c-00000.parquet
new file mode 100644
index 0000000..9d64ad4
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00031-31-c9bda250-ed1c-4868-bbf1-f2aad65fa80c-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00004-4-0ed77823-ded1-4a12-9e03-4027cd43966a-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00004-4-0ed77823-ded1-4a12-9e03-4027cd43966a-00000.parquet
new file mode 100644
index 0000000..c645129
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00004-4-0ed77823-ded1-4a12-9e03-4027cd43966a-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00014-14-f698d7a4-245f-44d5-8a59-ed511854c8f8-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00014-14-f698d7a4-245f-44d5-8a59-ed511854c8f8-00000.parquet
new file mode 100644
index 0000000..38ed618
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00014-14-f698d7a4-245f-44d5-8a59-ed511854c8f8-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00015-15-7c1d5490-91f7-47bd-a3b6-e86caa7fe47d-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00015-15-7c1d5490-91f7-47bd-a3b6-e86caa7fe47d-00000.parquet
new file mode 100644
index 0000000..df0b0aa
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00015-15-7c1d5490-91f7-47bd-a3b6-e86caa7fe47d-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00019-19-d2ef5fcf-4346-421f-b2ef-1f9d55fb4c84-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00019-19-d2ef5fcf-4346-421f-b2ef-1f9d55fb4c84-00000.parquet
new file mode 100644
index 0000000..74fce5a
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00019-19-d2ef5fcf-4346-421f-b2ef-1f9d55fb4c84-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00020-20-a70c64ed-7a99-4f43-ada7-225c92f6a993-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00020-20-a70c64ed-7a99-4f43-ada7-225c92f6a993-00000.parquet
new file mode 100644
index 0000000..7f5d5fa
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00020-20-a70c64ed-7a99-4f43-ada7-225c92f6a993-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00028-28-bb02c862-3d63-42cb-8041-0a0b14b8ca13-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00028-28-bb02c862-3d63-42cb-8041-0a0b14b8ca13-00000.parquet
new file mode 100644
index 0000000..14abc4b
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00028-28-bb02c862-3d63-42cb-8041-0a0b14b8ca13-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00003-3-d5288e68-c862-4248-b3e5-84228a3ec39d-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00003-3-d5288e68-c862-4248-b3e5-84228a3ec39d-00000.parquet
new file mode 100644
index 0000000..6330ba9
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00003-3-d5288e68-c862-4248-b3e5-84228a3ec39d-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00007-7-92031dc0-b7eb-424d-9edb-dd2cedc59784-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00007-7-92031dc0-b7eb-424d-9edb-dd2cedc59784-00000.parquet
new file mode 100644
index 0000000..455a63b
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00007-7-92031dc0-b7eb-424d-9edb-dd2cedc59784-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00011-11-9361e5f3-cfa7-4190-bb30-0db1d53202fd-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00011-11-9361e5f3-cfa7-4190-bb30-0db1d53202fd-00000.parquet
new file mode 100644
index 0000000..fe53192
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00011-11-9361e5f3-cfa7-4190-bb30-0db1d53202fd-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00012-12-e82b52b1-dc5b-4417-81b7-8e9fd992280b-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00012-12-e82b52b1-dc5b-4417-81b7-8e9fd992280b-00000.parquet
new file mode 100644
index 0000000..6cf2286
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00012-12-e82b52b1-dc5b-4417-81b7-8e9fd992280b-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00022-22-c646ba9a-9387-4c38-bab8-a0598c400fde-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00022-22-c646ba9a-9387-4c38-bab8-a0598c400fde-00000.parquet
new file mode 100644
index 0000000..fb9987a
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00022-22-c646ba9a-9387-4c38-bab8-a0598c400fde-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00025-25-7f8283a3-b39f-4273-984b-cf7faf39dd9d-00000.parquet b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00025-25-7f8283a3-b39f-4273-984b-cf7faf39dd9d-00000.parquet
new file mode 100644
index 0000000..0ad15bc
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00025-25-7f8283a3-b39f-4273-984b-cf7faf39dd9d-00000.parquet differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/2c2fa00b-eb20-460a-835b-d69b32560e21-m0.avro b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/2c2fa00b-eb20-460a-835b-d69b32560e21-m0.avro
new file mode 100644
index 0000000..cdd8d4a
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/2c2fa00b-eb20-460a-835b-d69b32560e21-m0.avro differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/snap-7412008513947276465-1-2c2fa00b-eb20-460a-835b-d69b32560e21.avro b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/snap-7412008513947276465-1-2c2fa00b-eb20-460a-835b-d69b32560e21.avro
new file mode 100644
index 0000000..5f6724c
Binary files /dev/null and b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/snap-7412008513947276465-1-2c2fa00b-eb20-460a-835b-d69b32560e21.avro differ
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/v1.metadata.json b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/v1.metadata.json
new file mode 100644
index 0000000..ecd4a1b
--- /dev/null
+++ b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/v1.metadata.json
@@ -0,0 +1,62 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "93723c31-c8f6-4af9-9a4e-fab24d6a141e",
+  "location" : "/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test",
+  "last-updated-ms" : 1600170874988,
+  "last-column-id" : 4,
+  "schema" : {
+    "type" : "struct",
+    "fields" : [ {
+      "id" : 1,
+      "name" : "id",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "user",
+      "required" : true,
+      "type" : "string"
+    }, {
+      "id" : 3,
+      "name" : "action",
+      "required" : true,
+      "type" : "string"
+    }, {
+      "id" : 4,
+      "name" : "event_time",
+      "required" : true,
+      "type" : "timestamptz"
+    } ]
+  },
+  "partition-spec" : [ {
+    "name" : "event_time_hour",
+    "transform" : "hour",
+    "source-id" : 4,
+    "field-id" : 1000
+  }, {
+    "name" : "action",
+    "transform" : "identity",
+    "source-id" : 3,
+    "field-id" : 1001
+  } ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ {
+      "name" : "event_time_hour",
+      "transform" : "hour",
+      "source-id" : 4,
+      "field-id" : 1000
+    }, {
+      "name" : "action",
+      "transform" : "identity",
+      "source-id" : 3,
+      "field-id" : 1001
+    } ]
+  } ],
+  "properties" : { },
+  "current-snapshot-id" : -1,
+  "snapshots" : [ ],
+  "snapshot-log" : [ ],
+  "metadata-log" : [ ]
+}
\ No newline at end of file
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/v2.metadata.json b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/v2.metadata.json
new file mode 100644
index 0000000..26b3969
--- /dev/null
+++ b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/v2.metadata.json
@@ -0,0 +1,81 @@
+{
+  "format-version" : 1,
+  "table-uuid" : "93723c31-c8f6-4af9-9a4e-fab24d6a141e",
+  "location" : "/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test",
+  "last-updated-ms" : 1600170879287,
+  "last-column-id" : 4,
+  "schema" : {
+    "type" : "struct",
+    "fields" : [ {
+      "id" : 1,
+      "name" : "id",
+      "required" : false,
+      "type" : "int"
+    }, {
+      "id" : 2,
+      "name" : "user",
+      "required" : true,
+      "type" : "string"
+    }, {
+      "id" : 3,
+      "name" : "action",
+      "required" : true,
+      "type" : "string"
+    }, {
+      "id" : 4,
+      "name" : "event_time",
+      "required" : true,
+      "type" : "timestamptz"
+    } ]
+  },
+  "partition-spec" : [ {
+    "name" : "event_time_hour",
+    "transform" : "hour",
+    "source-id" : 4,
+    "field-id" : 1000
+  }, {
+    "name" : "action",
+    "transform" : "identity",
+    "source-id" : 3,
+    "field-id" : 1001
+  } ],
+  "default-spec-id" : 0,
+  "partition-specs" : [ {
+    "spec-id" : 0,
+    "fields" : [ {
+      "name" : "event_time_hour",
+      "transform" : "hour",
+      "source-id" : 4,
+      "field-id" : 1000
+    }, {
+      "name" : "action",
+      "transform" : "identity",
+      "source-id" : 3,
+      "field-id" : 1001
+    } ]
+  } ],
+  "properties" : { },
+  "current-snapshot-id" : 7412008513947276465,
+  "snapshots" : [ {
+    "snapshot-id" : 7412008513947276465,
+    "timestamp-ms" : 1600170879287,
+    "summary" : {
+      "operation" : "append",
+      "spark.app.id" : "local-1600170868953",
+      "added-data-files" : "20",
+      "added-records" : "20",
+      "changed-partition-count" : "3",
+      "total-records" : "20",
+      "total-data-files" : "20"
+    },
+    "manifest-list" : "/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/snap-7412008513947276465-1-2c2fa00b-eb20-460a-835b-d69b32560e21.avro"
+  } ],
+  "snapshot-log" : [ {
+    "timestamp-ms" : 1600170879287,
+    "snapshot-id" : 7412008513947276465
+  } ],
+  "metadata-log" : [ {
+    "timestamp-ms" : 1600170874988,
+    "metadata-file" : "/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/v1.metadata.json"
+  } ]
+}
\ No newline at end of file
diff --git a/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/version-hint.text b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/version-hint.text
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++ b/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/version-hint.text
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 0432e6d..97b6d33 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2941,7 +2941,7 @@ iceberg_partitioned
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 STORED AS ICEBERG
 LOCATION '/test-warehouse/iceberg_test/iceberg_partitioned'
-TBLPROPERTIES('iceberg_file_format'='parquet');
+TBLPROPERTIES('iceberg_file_format'='parquet', 'iceberg.catalog'='hadoop.tables');
 ---- DEPENDENT_LOAD
 `hadoop fs -mkdir -p /test-warehouse/iceberg_test && \
 hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/iceberg_partitioned /test-warehouse/iceberg_test/
@@ -2954,8 +2954,22 @@ iceberg_non_partitioned
 CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
 STORED AS ICEBERG
 LOCATION '/test-warehouse/iceberg_test/iceberg_non_partitioned'
-TBLPROPERTIES('iceberg_file_format'='parquet');
+TBLPROPERTIES('iceberg_file_format'='parquet', 'iceberg.catalog'='hadoop.tables');
 ---- DEPENDENT_LOAD
 `hadoop fs -mkdir -p /test-warehouse/iceberg_test && \
 hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/iceberg_non_partitioned /test-warehouse/iceberg_test/
 ====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+hadoop_catalog_test_external
+---- CREATE
+CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name}
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg_file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test',
+'iceberg.table_identifier'='functional_parquet.hadoop_catalog_test');
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p /test-warehouse/iceberg_test/hadoop_catalog && \
+hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/hadoop_catalog_test /test-warehouse/iceberg_test/hadoop_catalog/
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
index f751335..10a6f6a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-create.test
@@ -16,7 +16,8 @@ PARTITION BY SPEC
   event_time HOUR,
   register_time DAY
 )
-STORED AS ICEBERG;
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 ---- RESULTS
 'Table has been created.'
 ====
@@ -52,7 +53,8 @@ DROP TABLE iceberg_test1;
 CREATE TABLE iceberg_test2(
   level STRING
 )
-STORED AS ICEBERG;
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 DESCRIBE iceberg_test2;
 ---- RESULTS
 'level','string',''
@@ -63,7 +65,8 @@ STRING,STRING,STRING
 CREATE EXTERNAL TABLE iceberg_test_external(
   level STRING
 )
-STORED AS ICEBERG;
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 ---- RESULTS
 'Location is necessary for external iceberg table.'
 ====
@@ -76,12 +79,14 @@ PARTITION BY SPEC
   level IDENTITY
 )
 STORED AS ICEBERG
-LOCATION '/$DATABASE.iceberg_test_with_location';
+LOCATION '/$DATABASE.iceberg_test_with_location'
+TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 CREATE EXTERNAL TABLE iceberg_test_external(
   level STRING
 )
 STORED AS ICEBERG
-LOCATION '/$DATABASE.iceberg_test_with_location';
+LOCATION '/$DATABASE.iceberg_test_with_location'
+TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 ---- RESULTS
 'Table has been created.'
 ====
@@ -102,7 +107,8 @@ BIGINT,BIGINT,BIGINT,STRING,STRING
 ---- QUERY
 CREATE EXTERNAL TABLE iceberg_test_external_empty_column
 STORED AS ICEBERG
-LOCATION '/$DATABASE.iceberg_test_with_location';
+LOCATION '/$DATABASE.iceberg_test_with_location'
+TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 ---- RESULTS
 'Table has been created.'
 ====
@@ -127,3 +133,140 @@ DROP TABLE iceberg_test_external_empty_column;
 ---- RESULTS
 'Table has been dropped.'
 ====
+---- QUERY
+CREATE TABLE iceberg_test5(
+  level STRING,
+  event_time TIMESTAMP,
+  register_time DATE,
+  message STRING,
+  price DECIMAL(8,1),
+  map_test MAP <STRING, array <STRING>>,
+  struct_test STRUCT <f1: BIGINT, f2: BIGINT>
+)
+PARTITION BY SPEC
+(
+  level IDENTITY,
+  event_time IDENTITY,
+  event_time HOUR,
+  register_time DAY
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/$DATABASE/hadoop_catalog_test');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+DESCRIBE iceberg_test5;
+---- RESULTS
+'level','string',''
+'event_time','timestamp',''
+'register_time','date',''
+'message','string',''
+'price','decimal(8,1)',''
+'map_test','map<string,array<string>>',''
+'struct_test','struct<\n  f1:bigint,\n  f2:bigint\n>',''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+SHOW PARTITIONS iceberg_test5;
+---- RESULTS
+0,1,1000,'level','IDENTITY'
+0,2,1001,'event_time','IDENTITY'
+0,2,1002,'event_time_hour','HOUR'
+0,3,1003,'register_time_day','DAY'
+---- TYPES
+BIGINT,BIGINT,BIGINT,STRING,STRING
+====
+---- QUERY
+DROP TABLE iceberg_test5;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+CREATE TABLE iceberg_test6(
+  level STRING
+)
+PARTITION BY SPEC
+(
+  level IDENTITY
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/$DATABASE/hadoop_catalog_test');
+CREATE EXTERNAL TABLE iceberg_test6_external(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/$DATABASE/hadoop_catalog_test', 'iceberg.table_identifier'='$DATABASE.iceberg_test6');
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+DESCRIBE iceberg_test6_external;
+---- RESULTS
+'level','string',''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+SHOW PARTITIONS iceberg_test6_external;
+---- RESULTS
+0,1,1000,'level','IDENTITY'
+---- TYPES
+BIGINT,BIGINT,BIGINT,STRING,STRING
+====
+---- QUERY
+DROP TABLE iceberg_test6;
+DROP TABLE iceberg_test6_external;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+CREATE TABLE iceberg_test7(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test');
+CREATE TABLE iceberg_test8(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test');
+DROP TABLE iceberg_test7;
+SELECT * FROM iceberg_test8;
+---- TYPES
+string
+---- RESULTS
+====
+---- QUERY
+DESCRIBE FORMATTED iceberg_test8;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test/$DATABASE/iceberg_test8','NULL'
+'','iceberg.catalog_location','/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test'
+'','iceberg_file_format ','parquet             '
+'','iceberg.catalog     ','hadoop.catalog      '
+---- TYPES
+string, string, string
+====
+---- QUERY
+CREATE TABLE iceberg_test9(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test',
+'iceberg.table_identifier'='org.db.tbl');
+DESCRIBE FORMATTED iceberg_test9;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test/org/db/tbl','NULL'
+'','iceberg.catalog_location','/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test'
+'','iceberg_file_format ','parquet             '
+'','iceberg.catalog     ','hadoop.catalog      '
+---- TYPES
+string, string, string
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 759880e..d68fc3f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -22,7 +22,8 @@ AnalysisException: Cannot find source column: event_time
 CREATE TABLE iceberg_test3(
   level STRING
 )
-STORED AS ICEBERG;
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 INSERT INTO iceberg_test3 values('1');
 ---- CATCH
 AnalysisException: Impala does not support INSERTing into iceberg table: $DATABASE.iceberg_test3
@@ -43,3 +44,45 @@ SHOW PARTITIONS functional_parquet.iceberg_non_partitioned
 ---- CATCH
 AnalysisException: Table is not partitioned: functional_parquet.iceberg_non_partitioned
 ====
+---- QUERY
+CREATE TABLE iceberg_test4(
+  level STRING
+)
+STORED AS ICEBERG
+LOCATION '/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test'
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog');
+---- CATCH
+AnalysisException: Location cannot be set for Iceberg table with 'hadoop.catalog'.
+====
+---- QUERY
+CREATE TABLE iceberg_test5(
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog'='hadoop.catalog');
+---- CATCH
+AnalysisException: Table property 'iceberg.catalog_location' is necessary for Iceberg table with 'hadoop.catalog'.
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_test6
+STORED AS ICEBERG
+LOCATION '/test-warehouse/$DATABASE/hadoop_catalog_test/iceberg_test'
+TBLPROPERTIES('iceberg.catalog_location'='/test-warehouse/fake_table', 'iceberg.table_identifier'='fake_db.fake_table');
+---- CATCH
+AnalysisException: Location cannot be set for Iceberg table with 'hadoop.catalog'.
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_test7
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.table_identifier'='fake_db.fake_table');
+---- CATCH
+AnalysisException: Table property 'iceberg.catalog_location' is necessary for Iceberg table with 'hadoop.catalog'.
+====
+---- QUERY
+CREATE EXTERNAL TABLE iceberg_test8
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg.catalog_location'='/test-warehouse/fake_table', 'iceberg.table_identifier'='fake_db.fake_table');
+SHOW CREATE TABLE iceberg_test8;
+---- CATCH
+row_regex:.*CAUSED BY: NoSuchTableException: Table does not exist: fake_db.fake_table*
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-query.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-query.test
index dd63bd6..4c4e654 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-query.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-query.test
@@ -187,3 +187,126 @@ Path,Size,Partition
 ---- TYPES
 STRING,STRING,STRING
 ====
+---- QUERY
+SELECT count(*) from hadoop_catalog_test_external
+---- TYPES
+bigint
+---- RESULTS
+20
+====
+---- QUERY
+SELECT count(*) from hadoop_catalog_test_external
+where id > 10
+---- TYPES
+bigint
+---- RESULTS
+10
+====
+---- QUERY
+SELECT count(*) from hadoop_catalog_test_external
+where action <> 'click'
+---- TYPES
+bigint
+---- RESULTS
+14
+---- QUERY
+SELECT count(distinct id),count(distinct action) from hadoop_catalog_test_external
+---- TYPES
+bigint,bigint
+---- RESULTS
+20,3
+====
+---- QUERY
+SET TIMEZONE=UTC;
+SELECT count(*) from hadoop_catalog_test_external
+where event_time > to_timestamp('2020-01-01 09:00:00','yyyy-MM-dd HH:mm:ss')
+---- TYPES
+bigint
+---- RESULTS
+6
+====
+---- QUERY
+SET TIMEZONE=UTC;
+SELECT * from hadoop_catalog_test_external
+where event_time > to_timestamp('2020-01-01 09:00:00','yyyy-MM-dd HH:mm:ss')
+ORDER BY id
+---- TYPES
+int,string,string,timestamp
+---- RESULTS
+2,'Lisa','download',2020-01-01 10:00:00
+5,'Lisa','download',2020-01-01 10:00:00
+7,'Lisa','download',2020-01-01 10:00:00
+8,'Lisa','download',2020-01-01 10:00:00
+14,'Lisa','download',2020-01-01 10:00:00
+16,'Lisa','download',2020-01-01 10:00:00
+====
+---- QUERY
+SHOW PARTITIONS hadoop_catalog_test_external
+---- TYPES
+bigint,bigint,bigint,string,string
+---- RESULTS
+0,4,1000,'event_time_hour','HOUR'
+0,3,1001,'action','IDENTITY'
+====
+---- QUERY
+SHOW FILES in hadoop_catalog_test_external
+---- LABELS
+Path,Size,Partition
+---- RESULTS
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00001-1-bc402da0-b562-4310-9001-06f9b6b0f9ae-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00006-6-d253aefa-65fc-4698-8f26-b155fc965cf6-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00009-9-5d04b016-05e1-43fc-b4a0-0e0df52a5035-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00017-17-20b92523-c3b9-401d-b429-363c245dbe9c-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00023-23-c86370cf-10a1-4e49-86dc-b094fe739aa6-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00027-27-f32f86fa-286f-4cd3-8337-98685c48176d-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00030-30-b18d2bbc-46a2-4040-a4a8-7488447de3b6-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-08/action=view/00031-31-c9bda250-ed1c-4868-bbf1-f2aad65fa80c-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00004-4-0ed77823-ded1-4a12-9e03-4027cd43966a-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00014-14-f698d7a4-245f-44d5-8a59-ed511854c8f8-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00015-15-7c1d5490-91f7-47bd-a3b6-e86caa7fe47d-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00019-19-d2ef5fcf-4346-421f-b2ef-1f9d55fb4c84-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00020-20-a70c64ed-7a99-4f43-ada7-225c92f6a993-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-09/action=click/00028-28-bb02c862-3d63-42cb-8041-0a0b14b8ca13-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00003-3-d5288e68-c862-4248-b3e5-84228a3ec39d-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00007-7-92031dc0-b7eb-424d-9edb-dd2cedc59784-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00011-11-9361e5f3-cfa7-4190-bb30-0db1d53202fd-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00012-12-e82b52b1-dc5b-4417-81b7-8e9fd992280b-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00022-22-c646ba9a-9387-4c38-bab8-a0598c400fde-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/data/event_time_hour=2020-01-01-10/action=download/00025-25-7f8283a3-b39f-4273-984b-cf7faf39dd9d-00000.parquet',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/2c2fa00b-eb20-460a-835b-d69b32560e21-m0.avro',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/snap-7412008513947276465-1-2c2fa00b-eb20-460a-835b-d69b32560e21.avro',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/v1.metadata.json',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/v2.metadata.json',regex:.*,''
+'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test/metadata/version-hint.text',regex:.*,''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+describe formatted iceberg_partitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/iceberg_test/iceberg_partitioned','NULL'
+'','iceberg_file_format ','parquet             '
+'','iceberg.catalog     ','hadoop.tables       '
+---- TYPES
+string, string, string
+====
+---- QUERY
+describe formatted iceberg_non_partitioned;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/iceberg_test/iceberg_non_partitioned','NULL'
+'','iceberg_file_format ','parquet             '
+'','iceberg.catalog     ','hadoop.tables       '
+---- TYPES
+string, string, string
+====
+---- QUERY
+describe formatted hadoop_catalog_test_external;
+---- RESULTS: VERIFY_IS_SUBSET
+'Location:           ','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test/functional_parquet/hadoop_catalog_test','NULL'
+'','iceberg.catalog_location','/test-warehouse/iceberg_test/hadoop_catalog/hadoop_catalog_test'
+'','iceberg.table_identifier','functional_parquet.hadoop_catalog_test'
+'','iceberg_file_format ','parquet             '
+'','iceberg.catalog     ','hadoop.catalog      '
+---- TYPES
+string, string, string
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
index 5ad9bd2..37bff48 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table.test
@@ -561,19 +561,67 @@ CREATE TABLE iceberg_test1 (
   level STRING
 )
 STORED AS ICEBERG
-TBLPROPERTIES('iceberg_file_format'='parquet')
+TBLPROPERTIES('iceberg_file_format'='parquet', 'iceberg.catalog'='hadoop.tables')
 ---- RESULTS-HIVE
 CREATE TABLE show_create_table_test_db.iceberg_test1 (
   level STRING
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES('iceberg_file_format'='parquet')
+TBLPROPERTIES('iceberg_file_format'='parquet', 'iceberg.catalog'='hadoop.tables')
 ---- RESULTS-HIVE-3
 CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_test1 (
   level STRING
 )
 STORED AS ICEBERG
 LOCATION '$$location_uri$$'
-TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg_file_format'='parquet')
+TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg_file_format'='parquet',
+'iceberg.catalog'='hadoop.tables')
+====
+---- CREATE_TABLE
+CREATE TABLE iceberg_test2 (
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg_file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
+---- RESULTS-HIVE
+CREATE TABLE show_create_table_test_db.iceberg_test2 (
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg_file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_test2 (
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg_file_format'='parquet',
+'iceberg.catalog'='hadoop.catalog', 'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test')
+====
+---- CREATE_TABLE
+CREATE TABLE iceberg_test3 (
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg_file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test',
+'iceberg.table_identifier'='org.my_db.my_table')
+---- RESULTS-HIVE
+CREATE TABLE show_create_table_test_db.iceberg_test3 (
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES('iceberg_file_format'='parquet', 'iceberg.catalog'='hadoop.catalog',
+'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test',
+'iceberg.table_identifier'='org.my_db.my_table')
+---- RESULTS-HIVE-3
+CREATE EXTERNAL TABLE show_create_table_test_db.iceberg_test3 (
+  level STRING
+)
+STORED AS ICEBERG
+TBLPROPERTIES ('external.table.purge'='TRUE', 'iceberg_file_format'='parquet',
+'iceberg.catalog'='hadoop.catalog', 'iceberg.catalog_location'='/test-warehouse/hadoop_catalog_test',
+'iceberg.table_identifier'='org.my_db.my_table')
 ====