You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/11/21 22:45:00 UTC

[impala] branch master updated (c1244c2 -> 6dd2cfa)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from c1244c2  IMPALA-9152: Clear row filter policies before testColumnMaskEnabled
     new 0c0671e  IMPALA-9104: Support retrieval of PK/FK information through impala-hs2-server.
     new 6dd2cfa  IMPALA-9165: Hack to remove MasterProcWALs directory in kill-hbase.sh

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/rpc/kerberos-test.cc                        |   4 +
 be/src/service/impala-hs2-server.cc                |  36 ++++
 be/src/service/impala-server.h                     |   6 +
 bin/rat_exclude_files.txt                          |   3 +
 common/thrift/Frontend.thrift                      |   4 +
 common/thrift/hive-1-api/TCLIService.thrift        |  51 +++++
 .../java/org/apache/impala/analysis/TableDef.java  |  11 +-
 .../java/org/apache/impala/catalog/FeTable.java    |  13 ++
 .../java/org/apache/impala/catalog/HdfsTable.java  |  10 +-
 .../main/java/org/apache/impala/catalog/Table.java |  21 ++
 .../apache/impala/catalog/local/LocalFsTable.java  |  10 -
 .../apache/impala/catalog/local/LocalTable.java    |  15 ++
 .../java/org/apache/impala/service/Frontend.java   |  90 ++++++++
 .../java/org/apache/impala/service/MetadataOp.java | 228 ++++++++++++++++++++-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  41 ++--
 .../java/org/apache/impala/analysis/ToSqlTest.java |  13 +-
 .../impala/authorization/AuthorizationTest.java    |  65 +++++-
 .../org/apache/impala/common/FrontendFixture.java  |   1 -
 .../java/org/apache/impala/service/JdbcTest.java   |  49 +++++
 testdata/bin/kill-hbase.sh                         |   5 +
 testdata/data/README                               |  12 ++
 testdata/data/child_table.txt                      |   6 +
 testdata/data/parent_table.txt                     |   6 +
 testdata/data/parent_table_2.txt                   |   6 +
 .../functional/functional_schema_template.sql      |  48 +++++
 tests/hs2/test_hs2.py                              | 134 ++++++++++++
 26 files changed, 840 insertions(+), 48 deletions(-)
 create mode 100644 testdata/data/child_table.txt
 create mode 100644 testdata/data/parent_table.txt
 create mode 100644 testdata/data/parent_table_2.txt


[impala] 02/02: IMPALA-9165: Hack to remove MasterProcWALs directory in kill-hbase.sh

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6dd2cfa9e0aa709e97e710e8327291021205210e
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Wed Nov 20 14:17:53 2019 -0800

    IMPALA-9165: Hack to remove MasterProcWALs directory in kill-hbase.sh
    
    Some jobs have been hanging in the testdata/bin/create-hbase.sh
    script. Logs during the hang show the HBase Master is stuck
    unitialized:
    Master startup cannot progress, in holding-pattern until region onlined.
    ...
    ERROR master.HMaster: Master failed to complete initialization after 900000ms.
    
    Anecdotally, the HBase Master doesn't have this problem if we remove
    the /hbase/MasterProcWALs directory in kill-hbase.sh. This patch
    does exactly that. It is a hack, and we should update this code
    once we know what is going on.
    
    Testing:
     - test-with-docker.py fails without this patch and passes with it
     - Hand testing on my minicluster shows that this allows HBase to
       restart and be consistently usable
    
    Change-Id: Icef3d30e6b539a175e03f63fcdbfb2d4608c08fa
    Reviewed-on: http://gerrit.cloudera.org:8080/14757
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 testdata/bin/kill-hbase.sh | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/testdata/bin/kill-hbase.sh b/testdata/bin/kill-hbase.sh
index 902e869..e404aef 100755
--- a/testdata/bin/kill-hbase.sh
+++ b/testdata/bin/kill-hbase.sh
@@ -30,3 +30,8 @@ ${HBASE_HOME}/bin/stop-hbase.sh
 # Clear up data so that zookeeper/hbase won't do recovery when it starts.
 # TODO: is this still needed when using bin/stop-hbase.sh?
 rm -rf /tmp/hbase-*
+
+# HACK: Some jobs have seen the HBase master fail to initialize with mesages like:
+# "Master startup cannot progress, in holding-pattern until region onlined."
+# Anecdotally, removing the MasterProcWALs directory avoids the issue.
+hdfs dfs -rm /hbase/MasterProcWALs/* || true


[impala] 01/02: IMPALA-9104: Support retrieval of PK/FK information through impala-hs2-server.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0c0671e04e9a71ebcba0e64d28b15f5c332f35ff
Author: Anurag Mantripragada <an...@cloudera.com>
AuthorDate: Wed Nov 6 12:00:07 2019 -0800

    IMPALA-9104: Support retrieval of PK/FK information through impala-hs2-server.
    
    The goal is to let JDBC clients get constraint information
    from Impala tables. We implement two new metadata operations in
    impala-hs2-server, GetPrimaryKeys and GetCrossReference, which are
    already implemented in Hive's HS2. The thrift
    definitions are copied from Hive's TCLIService.thrift. In FE, these
    two operations are implemented to get the information from tables
    in the catalog.
    
    Much like GetColumns(), tables need to be loaded in order to be able to get
    PK/FK information. We wait for the PK table/FK table to load.
    In the implementation, PK/FK information is returned
    ONLY if the user has access to ALL the columns involved in the PK/FK
    relationship.
    
    Testing:
    - Added three test tables to our test datasets since most of our FE tests
      relied on dummy tables or testdata. It was difficult to test PK/FK with
      these methods. Also, we can build on this testdata in future when we make
      optimizer improvements.
    - Added unit tests in AuthorizationTest and JDBCtest.
    - Added e2e test in test_hs2.py
    - This patch modifies AnalyzeDDLTests and ToSqlTests to rely on the newly
      added dataset instead of dummy tables for pk/fk tests.
    
    Caveats:
    - Ranger needs OWNER user information for authorization. Since this is HMS
      metadata that we do not aggresively load, this information is not available
      for IncompleteTables. Some foreign key tables (fact tables for example)
      might have FK/PK relationships with several PK tables some of which might
      not be loaded in catalog. Currently we have no way to check column
      previleges without owner user information tables. We do not return keys
      involving such columns. Therefore, when Ranger is used, there maybe missing
      PK/FK relationships for parent tables that are not loaded. This can be
      tracked in IMPALA-9172.
    - Retrieval of constraints is not yet supported in LocalCatalog mode. See
      IMPALA-9158.
    
    Change-Id: I8942dfbbd4a3be244eed1c61ac2ce17069960477
    Reviewed-on: http://gerrit.cloudera.org:8080/14720
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/kerberos-test.cc                        |   4 +
 be/src/service/impala-hs2-server.cc                |  36 ++++
 be/src/service/impala-server.h                     |   6 +
 bin/rat_exclude_files.txt                          |   3 +
 common/thrift/Frontend.thrift                      |   4 +
 common/thrift/hive-1-api/TCLIService.thrift        |  51 +++++
 .../java/org/apache/impala/analysis/TableDef.java  |  11 +-
 .../java/org/apache/impala/catalog/FeTable.java    |  13 ++
 .../java/org/apache/impala/catalog/HdfsTable.java  |  10 +-
 .../main/java/org/apache/impala/catalog/Table.java |  21 ++
 .../apache/impala/catalog/local/LocalFsTable.java  |  10 -
 .../apache/impala/catalog/local/LocalTable.java    |  15 ++
 .../java/org/apache/impala/service/Frontend.java   |  90 ++++++++
 .../java/org/apache/impala/service/MetadataOp.java | 228 ++++++++++++++++++++-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |  41 ++--
 .../java/org/apache/impala/analysis/ToSqlTest.java |  13 +-
 .../impala/authorization/AuthorizationTest.java    |  65 +++++-
 .../org/apache/impala/common/FrontendFixture.java  |   1 -
 .../java/org/apache/impala/service/JdbcTest.java   |  49 +++++
 testdata/data/README                               |  12 ++
 testdata/data/child_table.txt                      |   6 +
 testdata/data/parent_table.txt                     |   6 +
 testdata/data/parent_table_2.txt                   |   6 +
 .../functional/functional_schema_template.sql      |  48 +++++
 tests/hs2/test_hs2.py                              | 134 ++++++++++++
 25 files changed, 835 insertions(+), 48 deletions(-)

diff --git a/be/src/rpc/kerberos-test.cc b/be/src/rpc/kerberos-test.cc
index 45408e1..a394fc0 100644
--- a/be/src/rpc/kerberos-test.cc
+++ b/be/src/rpc/kerberos-test.cc
@@ -45,6 +45,10 @@ class TestHS2Service : public ImpalaHiveServer2ServiceIf {
   virtual void GetTableTypes(TGetTableTypesResp& _return, const TGetTableTypesReq& req) {}
   virtual void GetColumns(TGetColumnsResp& _return, const TGetColumnsReq& req) {}
   virtual void GetFunctions(TGetFunctionsResp& _return, const TGetFunctionsReq& req) {}
+  virtual void GetPrimaryKeys(
+      TGetPrimaryKeysResp& _return, const TGetPrimaryKeysReq& req) {}
+  virtual void GetCrossReference(
+      TGetCrossReferenceResp& _return, const TGetCrossReferenceReq& req) {}
   virtual void GetOperationStatus(
       TGetOperationStatusResp& _return, const TGetOperationStatusReq& req) {}
   virtual void CancelOperation(
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 31b5fa9..df4d9f0 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -650,6 +650,42 @@ void ImpalaServer::GetFunctions(TGetFunctionsResp& return_val,
   VLOG_QUERY << "GetFunctions(): return_val=" << ThriftDebugString(return_val);
 }
 
+void ImpalaServer::GetPrimaryKeys(TGetPrimaryKeysResp& return_val,
+    const TGetPrimaryKeysReq& request) {
+  VLOG_QUERY << "GetPrimaryKeys(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
+
+  TMetadataOpRequest req;
+  req.__set_opcode(TMetadataOpcode::GET_PRIMARY_KEYS);
+  req.__set_get_primary_keys_req(request);
+
+  TOperationHandle handle;
+  thrift::TStatus status;
+  ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
+  return_val.__set_operationHandle(handle);
+  return_val.__set_status(status);
+
+  VLOG_QUERY << "GetPrimaryKeys(): return_val=" << ThriftDebugString(return_val);
+}
+
+void ImpalaServer::GetCrossReference(TGetCrossReferenceResp& return_val,
+    const TGetCrossReferenceReq& request) {
+  VLOG_QUERY << "GetCrossReference(): request=" << ThriftDebugString(request);
+  HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
+
+  TMetadataOpRequest req;
+  req.__set_opcode(TMetadataOpcode::GET_CROSS_REFERENCE);
+  req.__set_get_cross_reference_req(request);
+
+  TOperationHandle handle;
+  thrift::TStatus status;
+  ExecuteMetadataOp(request.sessionHandle.sessionId, &req, &handle, &status);
+  return_val.__set_operationHandle(handle);
+  return_val.__set_status(status);
+
+  VLOG_QUERY << "GetCrossReference(): return_val=" << ThriftDebugString(return_val);
+}
+
 void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
     const TGetOperationStatusReq& request) {
   if (request.operationHandle.operationId.guid.size() == 0) {
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 9b85766..7ed0f34 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -298,6 +298,12 @@ class ImpalaServer : public ImpalaServiceIf,
   virtual void GetFunctions(
       apache::hive::service::cli::thrift::TGetFunctionsResp& return_val,
       const apache::hive::service::cli::thrift::TGetFunctionsReq& request);
+  virtual void GetPrimaryKeys(
+      apache::hive::service::cli::thrift::TGetPrimaryKeysResp& return_val,
+      const apache::hive::service::cli::thrift::TGetPrimaryKeysReq& request);
+  virtual void GetCrossReference(
+      apache::hive::service::cli::thrift::TGetCrossReferenceResp& return_val,
+      const apache::hive::service::cli::thrift::TGetCrossReferenceReq& request);
   virtual void GetOperationStatus(
       apache::hive::service::cli::thrift::TGetOperationStatusResp& return_val,
       const apache::hive::service::cli::thrift::TGetOperationStatusReq& request);
diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index e7859ab..913e40b 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -118,6 +118,9 @@ testdata/cluster/node_templates/common/etc/hadoop/conf/*.xml.tmpl
 testdata/cluster/ranger/setup/*.json.template
 testdata/data/chars-formats.txt
 testdata/data/chars-tiny.txt
+testdata/data/parent_table.txt
+testdata/data/parent_table_2.txt
+testdata/data/child_table.txt
 testdata/data/date_tbl/*.txt
 testdata/data/date_tbl_error/*.txt
 testdata/data/decimal-tiny.txt
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index bfbc9bc..e6e1fd8 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -572,6 +572,8 @@ enum TMetadataOpcode {
   GET_TABLE_TYPES = 4
   GET_COLUMNS = 5
   GET_FUNCTIONS = 6
+  GET_PRIMARY_KEYS = 7
+  GET_CROSS_REFERENCE = 8
 }
 
 // Input parameter to JniFrontend.hiveServer2MetadataOperation
@@ -594,6 +596,8 @@ struct TMetadataOpRequest {
   // enabled, only the server objects this user has access to will be returned.
   // If not set, access checks will be skipped (used for internal Impala requests)
   10: optional ImpalaInternalService.TSessionState session
+  11: optional TCLIService.TGetPrimaryKeysReq get_primary_keys_req
+  12: optional TCLIService.TGetCrossReferenceReq get_cross_reference_req
 }
 
 // Tracks accesses to Catalog objects for use during auditing. This information, paired
diff --git a/common/thrift/hive-1-api/TCLIService.thrift b/common/thrift/hive-1-api/TCLIService.thrift
index f95e2f8..421e6b8 100644
--- a/common/thrift/hive-1-api/TCLIService.thrift
+++ b/common/thrift/hive-1-api/TCLIService.thrift
@@ -946,6 +946,53 @@ struct TGetFunctionsResp {
   2: optional TOperationHandle operationHandle
 }
 
+struct TGetPrimaryKeysReq {
+  // Session to run this request against
+  1: required TSessionHandle sessionHandle
+
+  // Name of the catalog.
+  2: optional TIdentifier catalogName
+
+  // Name of the schema.
+  3: optional TIdentifier schemaName
+
+  // Name of the table.
+  4: optional TIdentifier tableName
+}
+
+struct TGetPrimaryKeysResp {
+  1: required TStatus status
+  2: optional TOperationHandle operationHandle
+}
+
+struct TGetCrossReferenceReq {
+  // Session to run this request against
+  1: required TSessionHandle sessionHandle
+
+  // Name of the parent catalog.
+  2: optional TIdentifier parentCatalogName
+
+  // Name of the parent schema.
+  3: optional TIdentifier parentSchemaName
+
+  // Name of the parent table.
+  4: optional TIdentifier parentTableName
+
+  // Name of the foreign catalog.
+  5: optional TIdentifier foreignCatalogName
+
+  // Name of the foreign schema.
+  6: optional TIdentifier foreignSchemaName
+
+  // Name of the foreign table.
+  7: optional TIdentifier foreignTableName
+}
+
+struct TGetCrossReferenceResp {
+  1: required TStatus status
+  2: optional TOperationHandle operationHandle
+}
+
 
 // GetOperationStatus()
 //
@@ -1159,6 +1206,10 @@ service TCLIService {
 
   TGetFunctionsResp GetFunctions(1:TGetFunctionsReq req);
 
+  TGetPrimaryKeysResp GetPrimaryKeys(1:TGetPrimaryKeysReq req);
+
+  TGetCrossReferenceResp GetCrossReference(1:TGetCrossReferenceReq req);
+
   TGetOperationStatusResp GetOperationStatus(1:TGetOperationStatusReq req);
 
   TCancelOperationResp CancelOperation(1:TCancelOperationReq req);
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 0bff247..14dfe05 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -542,9 +542,14 @@ class TableDef {
         // Hive has a bug that prevents foreign keys from being added when pk column is
         // not part of primary key. This can be confusing. Till this bug is fixed, we
         // will not allow foreign keys definition on such columns.
-        if (!((HdfsTable) parentTable).getPrimaryKeysSql().contains(pkCol)) {
-          throw new AnalysisException(String.format("Parent column %s is not part of "
-              + "primary key.", pkCol));
+        if (parentTable instanceof HdfsTable) {
+          // TODO (IMPALA-9158): Modify this check to call FeFsTable.getPrimaryKeysSql()
+          // instead of HdfsTable.getPrimaryKeysSql() when we implement PK/FK feature
+          // for LocalCatalog.
+          if (!((HdfsTable) parentTable).getPrimaryKeysSql().contains(pkCol)) {
+            throw new AnalysisException(String.format("Parent column %s is not part of "
+                + "primary key.", pkCol));
+          }
         }
       }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeTable.java b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
index 8ac4186..8a69d53 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
@@ -20,8 +20,11 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.impala.analysis.TableName;
+import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableStats;
@@ -91,6 +94,16 @@ public interface FeTable {
   List<String> getColumnNames();
 
   /**
+   * @return the list of primary keys for this table.
+   */
+  List<SQLPrimaryKey> getPrimaryKeys();
+
+  /**
+   * @return the list of foreign keys for this table.
+   */
+  List<SQLForeignKey> getForeignKeys();
+
+  /**
    * @return an unmodifiable list of all partition columns.
    */
   List<Column> getClusteringColumns();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 2b8e0d8..96856a8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1621,10 +1621,14 @@ public class HdfsTable extends Table implements FeFsTable {
   public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }
 
   @Override
-  public List<SQLPrimaryKey> getPrimaryKeys() { return primaryKeys_; }
+  public List<SQLPrimaryKey> getPrimaryKeys() {
+    return ImmutableList.copyOf(primaryKeys_);
+  }
 
   @Override
-  public List<SQLForeignKey> getForeignKeys() { return foreignKeys_; }
+  public List<SQLForeignKey> getForeignKeys() {
+    return ImmutableList.copyOf(foreignKeys_);
+  }
 
   /**
    * Get primary keys column names, useful for toSqlUtils.
@@ -1645,7 +1649,7 @@ public class HdfsTable extends Table implements FeFsTable {
   public List<String> getForeignKeysSql() {
     List<String> foreignKeysSql = new ArrayList<>();
     // Iterate through foreign keys list. This list may contain multiple foreign keys
-    // and each foreign key may contain multiple columns. The outerloop collects
+    // and each foreign key may contain multiple columns. The outer loop collects
     // information common to a foreign key (pk table information). The inner
     // loop collects column information.
     List<SQLForeignKey> foreignKeys = getForeignKeys();
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index ea20241..a8587c6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -57,6 +59,7 @@ import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 /**
@@ -109,6 +112,12 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
   // map from lowercase column name to Column object.
   private final Map<String, Column> colsByName_ = new HashMap<>();
 
+  // List of primary keys associated with the table.
+  protected final List<SQLPrimaryKey> primaryKeys_ = new ArrayList<>();
+
+  // List of foreign keys associated with the table.
+  protected final List<SQLForeignKey> foreignKeys_ = new ArrayList<>();
+
   // Type of this table (array of struct) that mirrors the columns. Useful for analysis.
   protected final ArrayType type_ = new ArrayType(new StructType());
 
@@ -600,6 +609,18 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
   public List<Column> getColumns() { return colsByPos_; }
 
   @Override // FeTable
+  public List<SQLPrimaryKey> getPrimaryKeys() {
+    // Prevent clients from modifying the primary keys list.
+    return ImmutableList.copyOf(primaryKeys_);
+  }
+
+  @Override // FeTable
+  public List<SQLForeignKey> getForeignKeys() {
+    // Prevent clients from modifying the foreign keys list.
+    return ImmutableList.copyOf(foreignKeys_);
+  }
+
+  @Override // FeTable
   public List<String> getColumnNames() { return Column.toColumnNames(colsByPos_); }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index a59ab00..f49e8af 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -523,14 +523,4 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   public ListMap<TNetworkAddress> getHostIndex() {
     return hostIndex_;
   }
-
-  @Override
-  public List<SQLPrimaryKey> getPrimaryKeys() {
-    return null;
-  }
-
-  @Override
-  public List<SQLForeignKey> getForeignKeys() {
-    return null;
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index cecd46c..68f406e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -18,12 +18,15 @@
 package org.apache.impala.catalog.local;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.catalog.ArrayType;
@@ -208,6 +211,18 @@ abstract class LocalTable implements FeTable {
   }
 
   @Override
+  public List<SQLPrimaryKey> getPrimaryKeys() {
+    // TODO: return primary keys after IMPALA-9158
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<SQLForeignKey> getForeignKeys() {
+    // TODO: return foreign keys after IMPALA-9158
+    return Collections.emptyList();
+  }
+
+  @Override
   public List<Column> getColumnsInHiveOrder() {
     List<Column> columns = Lists.newArrayList(getNonClusteringColumns());
     columns.addAll(getClusteringColumns());
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index f59c4cc..2bf1e98 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -39,6 +40,8 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockLevel;
 import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.impala.analysis.AlterDbStmt;
 import org.apache.impala.analysis.AnalysisContext;
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
@@ -827,6 +830,89 @@ public class Frontend {
   }
 
   /**
+   * Returns a list of primary keys for a given table only if the user has access to all
+   * the columns that form the primary key. This is because all SQLPrimaryKeys for a
+   * given table together form the primary key.
+   */
+  public List<SQLPrimaryKey> getPrimaryKeys(FeTable table, User user)
+      throws InternalException {
+    Preconditions.checkNotNull(table);
+    List<SQLPrimaryKey> pkList;
+    pkList = table.getPrimaryKeys();
+    for (SQLPrimaryKey pk : pkList) {
+      if (authzFactory_.getAuthorizationConfig().isEnabled()) {
+        PrivilegeRequest privilegeRequest = new PrivilegeRequestBuilder(
+            authzFactory_.getAuthorizableFactory())
+            .any().onColumn(table.getTableName().getDb(), table.getTableName().getTbl(),
+            pk.getColumn_name(), table.getOwnerUser()).build();
+        // If any of the pk columns is not accessible to the user, we return an empty
+        // list.
+        if (!authzChecker_.get().hasAccess(user, privilegeRequest)) {
+          return new ArrayList<>();
+        }
+      }
+    }
+    return pkList;
+  }
+
+  /**
+   * Returns a list of foreign keys for a given table only if both the primary key
+   * column and the foreign key columns are accessible to user.
+   */
+  public List<SQLForeignKey> getForeignKeys(FeTable table, User user)
+      throws InternalException {
+    Preconditions.checkNotNull(table);
+    // Consider an example:
+    // A child table has the following foreign keys.
+    // 1) A composite foreign key (col1, col2) referencing parent_table_1 columns (a, b).
+    // 2) A foreign key (col3) referencing a different parent_table_2 column (c).
+    // In the above scenario, three "SQLForeignKey" structures are stored in HMS. Two
+    // SQLForiegnKey for 1) above which share the same FkName and will have key_seq 1
+    // and 2 respectively and one for 2) above. In other words, within a foreign key
+    // definition, we will have one "SQLForeignKey" structure for each column in the
+    // definition. They share fkName but will have different key_seq numbers. For the
+    // purpose of authorization, we do not want to show only a part of a sequence of
+    // keys. So if any of the keys in a sequence has incorrect privileges, we omit the
+    // entire sequence. For instance, in a request for all the foreign keys on
+    // child_table above, if we discover that the user does not have privilege on col1
+    // in the child_table, we omit both the "SQLForeignKey" associated with col1 and col2
+    // but we return the "SQLFOreignKey" for col3.
+    Set<String> omitList = new HashSet<>();
+    List<SQLForeignKey> fkList = new ArrayList<>();
+    for (SQLForeignKey fk : table.getForeignKeys()) {
+      String fkName = fk.getFk_name();
+      if (!omitList.contains(fkName)) {
+        if (authzFactory_.getAuthorizationConfig().isEnabled()) {
+          PrivilegeRequest fkPrivilegeRequest = new PrivilegeRequestBuilder(
+              authzFactory_.getAuthorizableFactory())
+              .any()
+              .onColumn(table.getTableName().getDb(), table.getTableName().getTbl(),
+              fk.getFkcolumn_name(), table.getOwnerUser()).build();
+
+          // Build privilege request for PK table.
+          FeTable pkTable =
+              getCatalog().getTableNoThrow(fk.getPktable_db(), fk.getPktable_name());
+          PrivilegeRequest pkPrivilegeRequest = new PrivilegeRequestBuilder(
+              authzFactory_.getAuthorizableFactory())
+              .any().onColumn(pkTable.getTableName().getDb(),
+              pkTable.getTableName().getTbl(), fk.getPkcolumn_name(),
+              pkTable.getOwnerUser()).build();
+          if (!authzChecker_.get().hasAccess(user, fkPrivilegeRequest) ||
+              !authzChecker_.get().hasAccess(user, pkPrivilegeRequest)) {
+            omitList.add(fkName);
+          }
+        }
+      }
+    }
+    for (SQLForeignKey fk : table.getForeignKeys()) {
+      if (!omitList.contains(fk.getFk_name())) {
+        fkList.add(fk);
+      }
+    }
+    return fkList;
+  }
+
+  /**
    * Returns all databases in catalog cache that match the pattern of 'matcher' and are
    * accessible to 'user'.
    */
@@ -1572,6 +1658,10 @@ public class Frontend {
       case GET_CATALOGS: return MetadataOp.getCatalogs();
       case GET_TABLE_TYPES: return MetadataOp.getTableTypes();
       case GET_FUNCTIONS: return MetastoreShim.execGetFunctions(this, request, user);
+      case GET_PRIMARY_KEYS: return MetadataOp.getPrimaryKeys(this, request,
+          user);
+      case GET_CROSS_REFERENCE: return MetadataOp.getCrossReference(this,
+          request, user);
       default:
         throw new NotImplementedException(request.opcode + " has not been implemented.");
     }
diff --git a/fe/src/main/java/org/apache/impala/service/MetadataOp.java b/fe/src/main/java/org/apache/impala/service/MetadataOp.java
index b82d1b5..4123072 100644
--- a/fe/src/main/java/org/apache/impala/service/MetadataOp.java
+++ b/fe/src/main/java/org/apache/impala/service/MetadataOp.java
@@ -18,11 +18,15 @@
 package org.apache.impala.service;
 
 import java.sql.DatabaseMetaData;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceReq;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
 import org.apache.impala.analysis.StmtMetadataLoader;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.User;
@@ -44,6 +48,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnValue;
+import org.apache.impala.thrift.TMetadataOpRequest;
 import org.apache.impala.thrift.TResultRow;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
@@ -77,6 +82,9 @@ public class MetadataOp {
   private static final TResultSetMetadata GET_TYPEINFO_MD = new TResultSetMetadata();
   private static final TResultSetMetadata GET_TABLE_TYPES_MD = new TResultSetMetadata();
   private static final TResultSetMetadata GET_FUNCTIONS_MD = new TResultSetMetadata();
+  private static final TResultSetMetadata GET_PRIMARY_KEYS_MD = new TResultSetMetadata();
+  private static final TResultSetMetadata GET_CROSS_REFERENCE_MD =
+      new TResultSetMetadata();
 
   // GetTypeInfo contains all primitive types supported by Impala.
   private static final List<TResultRow> GET_TYPEINFO_RESULTS = Lists.newArrayList();
@@ -212,11 +220,54 @@ public class MetadataOp {
         new TColumn("FUNCTION_TYPE", Type.INT.toThrift()));
     GET_FUNCTIONS_MD.addToColumns(
         new TColumn("SPECIFIC_NAME", Type.STRING.toThrift()));
+
+    GET_PRIMARY_KEYS_MD.addToColumns(
+        new TColumn("TABLE_CAT", Type.STRING.toThrift()));
+    GET_PRIMARY_KEYS_MD.addToColumns(
+        new TColumn("TABLE_SCHEM", Type.STRING.toThrift()));
+    GET_PRIMARY_KEYS_MD.addToColumns(
+        new TColumn("TABLE_NAME", Type.STRING.toThrift()));
+    GET_PRIMARY_KEYS_MD.addToColumns(
+        new TColumn("COLUMN_NAME", Type.STRING.toThrift()));
+    GET_PRIMARY_KEYS_MD.addToColumns(
+        new TColumn("KEQ_SEQ", Type.INT.toThrift()));
+    GET_PRIMARY_KEYS_MD.addToColumns(
+        new TColumn("PK_NAME", Type.STRING.toThrift()));
+
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("PKTABLE_CAT", Type.STRING.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("PKTABLE_SCHEM", Type.STRING.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("PKTABLE_NAME", Type.STRING.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("PKCOLUMN_NAME", Type.STRING.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("FKTABLE_CAT", Type.STRING.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("FKTABLE_SCHEM", Type.STRING.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("FKTABLE_NAME", Type.STRING.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("FKCOLUMN_NAME", Type.STRING.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("KEQ_SEQ", Type.INT.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("UPDATE_RULE", Type.INT.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("DELETE_RULE", Type.INT.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("FK_NAME", Type.STRING.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("PK_NAME", Type.STRING.toThrift()));
+    GET_CROSS_REFERENCE_MD.addToColumns(
+        new TColumn("DEFERRABILITY", Type.INT.toThrift()));
   }
 
   /**
    * Contains lists of databases, lists of table belonging to the dbs, list of columns
-   * belonging to the tables, and list of user functions.
+   * belonging to the tables, primary keys and foreign keys belonging to the tables,
+   * and list of user functions.
    */
   private static class DbsMetadata {
      // the list of database
@@ -238,6 +289,12 @@ public class MetadataOp {
     // functions[i] are the functions within dbs[i]
     public List<List<Function>> functions = Lists.newArrayList();
 
+    // primaryKeys[i][j] are primary keys of tableNames[j] in dbs[i]
+    public List<List<List<SQLPrimaryKey>>> primaryKeys = Lists.newArrayList();
+
+    // foreignKeys[i][j] are primary keys of tableNames[j] in dbs[i]
+    public List<List<List<SQLForeignKey>>> foreignKeys = Lists.newArrayList();
+
     // Set of tables that are missing (not yet loaded).
     public Set<TableName> missingTbls = new HashSet<TableName>();
   }
@@ -302,6 +359,8 @@ public class MetadataOp {
         List<List<Column>> tablesColumnsList = Lists.newArrayList();
         List<String> tableComments = Lists.newArrayList();
         List<String> tableTypes = Lists.newArrayList();
+        List<List<SQLPrimaryKey>> primaryKeysList = Lists.newArrayList();
+        List<List<SQLForeignKey>> foreignKeysList = Lists.newArrayList();
         for (String tabName: fe.getTableNames(db.getName(), tablePatternMatcher, user)) {
           FeTable table;
           if (columnPatternMatcher == PatternMatcher.MATCHER_MATCH_NONE) {
@@ -317,6 +376,8 @@ public class MetadataOp {
 
           String comment = null;
           List<Column> columns = Lists.newArrayList();
+          List<SQLPrimaryKey> primaryKeys = Lists.newArrayList();
+          List<SQLForeignKey> foreignKeys = Lists.newArrayList();
           // If the table is not yet loaded, the columns will be unknown. Add it
           // to the set of missing tables.
           String tableType = TABLE_TYPE_TABLE;
@@ -331,9 +392,17 @@ public class MetadataOp {
                   .getOrDefault(tableTypeStr, TABLE_TYPE_TABLE);
             }
             columns.addAll(fe.getColumns(table, columnPatternMatcher, user));
+            if (columnPatternMatcher != PatternMatcher.MATCHER_MATCH_NONE) {
+              // It is unnecessary to populate pk/fk information if the request does not
+              // want to match any columns.
+              primaryKeys.addAll(fe.getPrimaryKeys(table, user));
+              foreignKeys.addAll(fe.getForeignKeys(table, user));
+            }
           }
           tableList.add(tabName);
           tablesColumnsList.add(columns);
+          primaryKeysList.add(primaryKeys);
+          foreignKeysList.add(foreignKeys);
           tableComments.add(Strings.nullToEmpty(comment));
           tableTypes.add(tableType);
         }
@@ -341,6 +410,8 @@ public class MetadataOp {
         result.tableNames.add(tableList);
         result.comments.add(tableComments);
         result.columns.add(tablesColumnsList);
+        result.primaryKeys.add(primaryKeysList);
+        result.foreignKeys.add(foreignKeysList);
         result.tableTypes.add(tableTypes);
       }
     }
@@ -540,6 +611,159 @@ public class MetadataOp {
   }
 
   /**
+   * Executes the GetPrimaryKeys HiveServer2 operation and returns TResultSet.
+   * This queries the Impala catalog to get the primary keys of a given table.
+   * Similar to getColumns, matching primary key columns requires loading the
+   * table metadata, so if any missing tables are found, an RPC to the CatalogServer
+   * will be executed to request loading these tables. The matching process will be
+   * restarted once the required tables have been loaded in the local Impalad Catalog or
+   * the wait timeout has been reached.
+   */
+  public static TResultSet getPrimaryKeys(Frontend fe, TMetadataOpRequest request,
+      User user) throws ImpalaException {
+    TGetPrimaryKeysReq req = request.getGet_primary_keys_req();
+    String catalogName = req.getCatalogName();
+    String schemaName = req.getSchemaName();
+    String tableName = req.getTableName();
+    TResultSet result = createEmptyResultSet(GET_PRIMARY_KEYS_MD);
+    // Get the list of schemas, tables that satisfy the search conditions.
+    PatternMatcher schemaMatcher = PatternMatcher.createJdbcPatternMatcher(schemaName);
+    PatternMatcher tableMatcher = PatternMatcher.createJdbcPatternMatcher(tableName);
+    DbsMetadata dbsMetadata = getDbsMetadata(fe, catalogName, schemaMatcher,
+        tableMatcher, PatternMatcher.MATCHER_MATCH_ALL,
+        PatternMatcher.MATCHER_MATCH_NONE, user);
+
+    if (!dbsMetadata.missingTbls.isEmpty()) {
+      // Need to load tables for column metadata.
+      StmtMetadataLoader mdLoader = new StmtMetadataLoader(fe, Catalog.DEFAULT_DB, null);
+      mdLoader.loadTables(dbsMetadata.missingTbls);
+      dbsMetadata = getDbsMetadata(fe, catalogName, schemaMatcher,
+          tableMatcher, PatternMatcher.MATCHER_MATCH_ALL,
+          PatternMatcher.MATCHER_MATCH_NONE, user);
+    }
+
+    for (int i = 0; i < dbsMetadata.dbs.size(); ++i) {
+      for (int j = 0; j < dbsMetadata.tableNames.get(i).size(); ++j) {
+        TResultRow row = new TResultRow();
+        for (SQLPrimaryKey pk : dbsMetadata.primaryKeys.get(i).get(j)) {
+          row.colVals = Lists.newArrayList();
+          row.colVals.add(EMPTY_COL_VAL);
+          row.colVals.add(createTColumnValue(pk.getTable_db()));
+          row.colVals.add(createTColumnValue(pk.getTable_name()));
+          row.colVals.add(createTColumnValue(pk.getColumn_name()));
+          row.colVals.add(createTColumnValue(pk.getKey_seq()));
+          row.colVals.add(createTColumnValue(pk.getPk_name()));
+          result.rows.add(row);
+        }
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Returning {} primary keys for table {}.", result.rows.size(), tableName);
+    }
+    return result;
+  }
+
+  /**
+   * Executes the GetCrossReference HiveServer2 operation and returns TResultSet.
+   * This queries the Impala catalog to get the foreign keys of a given table.
+   * Similar to getColumns, matching foreign key columns requires loading the
+   * table metadata, so if any missing tables are found, an RPC to the CatalogServer
+   * will be executed to request loading these tables. The matching process will be
+   * restarted once the required tables have been loaded in the local Impalad Catalog or
+   * the wait timeout has been reached. If parent schema and parent table are specified
+   * in the request, we return only the foreign keys related to the parent table. If
+   * not, we return all foreign keys associated with the foreign table.
+   */
+  public static TResultSet getCrossReference(Frontend fe, TMetadataOpRequest request,
+      User user) throws ImpalaException {
+    TGetCrossReferenceReq req = request.getGet_cross_reference_req();
+    String foreignCatalogName = req.getForeignCatalogName();
+    String foreignSchemaName = req.getForeignSchemaName();
+    String foreignTableName = req.getForeignTableName();
+    String parentSchemaName = req.getParentSchemaName();
+    String parentTableName = req.getParentTableName();
+    TResultSet result = createEmptyResultSet(GET_CROSS_REFERENCE_MD);
+    // Get the list of schemas, tables that satisfy the search conditions.
+    PatternMatcher schemaMatcher =
+        PatternMatcher.createJdbcPatternMatcher(foreignSchemaName);
+    PatternMatcher tableMatcher =
+        PatternMatcher.createJdbcPatternMatcher(foreignTableName);
+    DbsMetadata dbsMetadata = getDbsMetadata(fe, foreignCatalogName, schemaMatcher,
+        tableMatcher, PatternMatcher.MATCHER_MATCH_ALL,
+        PatternMatcher.MATCHER_MATCH_NONE, user);
+
+    if (!dbsMetadata.missingTbls.isEmpty()) {
+      // Need to load tables for column metadata.
+      StmtMetadataLoader mdLoader = new StmtMetadataLoader(fe, Catalog.DEFAULT_DB, null);
+      mdLoader.loadTables(dbsMetadata.missingTbls);
+      dbsMetadata = getDbsMetadata(fe, foreignCatalogName, schemaMatcher,
+          tableMatcher, PatternMatcher.MATCHER_MATCH_ALL,
+          PatternMatcher.MATCHER_MATCH_NONE, user);
+    }
+
+    for (int i = 0; i < dbsMetadata.dbs.size(); ++i) {
+      for (int j = 0; j < dbsMetadata.tableNames.get(i).size(); ++j) {
+        // HMS API allows querying FK information for specific pk/fk columns. In Impala,
+        // we store all foreign keys associated with at table together in the Table
+        // object. For this metadata we filter out the foreignKeys which are not matching
+        // given parent and foreign schema information.
+        List<SQLForeignKey> filteredForeignKeys =
+            filterForeignKeys(dbsMetadata.foreignKeys.get(i).get(j), parentSchemaName,
+                parentTableName);
+
+        TResultRow row = new TResultRow();
+        for (SQLForeignKey fk : filteredForeignKeys) {
+          row.colVals = Lists.newArrayList();
+          row.colVals.add(EMPTY_COL_VAL); // PKTABLE_CAT
+          row.colVals.add(createTColumnValue(fk.getPktable_db()));
+          row.colVals.add(createTColumnValue(fk.getPktable_name()));
+          row.colVals.add(createTColumnValue(fk.getPkcolumn_name()));
+          row.colVals.add(EMPTY_COL_VAL); // FKTABLE_CAT
+          row.colVals.add(createTColumnValue(fk.getFktable_db()));
+          row.colVals.add(createTColumnValue(fk.getFktable_name()));
+          row.colVals.add(createTColumnValue(fk.getFkcolumn_name()));
+          row.colVals.add(createTColumnValue(fk.getKey_seq()));
+          row.colVals.add(createTColumnValue(fk.getUpdate_rule()));
+          row.colVals.add(createTColumnValue(fk.getDelete_rule()));
+          row.colVals.add(createTColumnValue(fk.getFk_name()));
+          row.colVals.add(createTColumnValue(fk.getPk_name()));
+          // DEFERRABILITY is currently not supported.
+          row.colVals.add(EMPTY_COL_VAL); // DEFERRABILITY
+          result.rows.add(row);
+        }
+      }
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Returning {} foreign keys for table {}.", result.rows.size(),
+          foreignTableName);
+    }
+    return result;
+  }
+
+  /**
+   * Helper to filter foreign keys based on given parent table. If both parent schema
+   * name and parent table name are specified, we return only the foreign keys that are
+   * associated with the parent table. Otherwise, we return all the foreign keys for
+   * this table.
+   */
+  private static List<SQLForeignKey> filterForeignKeys(List<SQLForeignKey> dbForeignKeys,
+      String pkSchemaName, String pkTableName) {
+    // If pkSchema or pkTable are not specified (For example: We want to retrieve all
+    // the foreignKeys from the foreign key side.) we return all the foreign keys.
+    if (pkSchemaName == null || pkTableName == null) {
+      return new ArrayList<>(dbForeignKeys);
+    }
+    List<SQLForeignKey> foreignKeys = new ArrayList<>();
+    for (SQLForeignKey fk : dbForeignKeys) {
+      if (fk.getPktable_db().equals(pkSchemaName) &&
+          fk.getPktable_name().equals(pkTableName)) {
+        foreignKeys.add(fk);
+      }
+    }
+    return foreignKeys;
+  }
+
+  /**
    * Executes the GetTypeInfo HiveServer2 operation and returns Impala supported types.
    */
   public static TResultSet getTypeInfo() {
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 7901dc5..dff57a2 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -2364,34 +2364,33 @@ public class AnalyzeDDLTest extends FrontendTestBase {
 
     // Foreign key test needs a valid primary key table to pass.
     addTestDb("test_pk_fk", "Test DB for PK/FK tests");
-    addTestTable("create table test_pk_fk.pk (id int, year string, primary key (id, "
-        + "year) disable novalidate rely)");
-    addTestTable("create table test_pk_fk.non_pk_table(id int)");
     AnalysisContext ctx = createAnalysisCtx("test_pk_fk");
-    AnalysisError("create table foo(id int, year int, foreign key (id) references "
-        + "pk(id) enable novalidate rely)", ctx,"ENABLE feature is "
-        + "not supported yet.");
-    AnalysisError("create table foo(id int, year int, foreign key (id) references "
-        + "pk(id) disable validate rely)", ctx,"VALIDATE feature is "
-        + "not supported yet.");
-    AnalyzesOk("create table fk(id int, year int, primary key (id, year) disable "
-        + "novalidate rely, foreign key(id) REFERENCES pk(id) "
+    AnalysisError("create table foo(seq int, id int, year int, a int, foreign key "
+        + "(id, year) references functional.parent_table(id, year) enable novalidate "
+        + "rely)", ctx,"ENABLE feature is not supported yet.");
+    AnalysisError("create table foo(seq int, id int, year int, a int, foreign key "
+        + "(id, year) references functional.parent_table(id, year) disable validate "
+        + "rely)", ctx,"VALIDATE feature is not supported yet.");
+    AnalyzesOk("create table foo(seq int, id int, year int, a int, "
+        + "foreign key(id, year) references functional.parent_table(id, year) "
         + "DISABLE NOVALIDATE RELY)", ctx);
-    AnalyzesOk("create table foo(id int, year int, foreign key (id) references "
-        + "pk(id) disable novalidate rely)", ctx);
-    AnalyzesOk("create table foo(id int, year int, foreign key (id) references "
-        + "pk(id))", ctx);
-    AnalysisError("create table fk(id int, year string, foreign key(year) references "
-        + "pk2(year))", ctx, "Parent table not found: test_pk_fk.pk2");
+    AnalyzesOk("create table foo(seq int, id int, year int, a int, "
+        + "foreign key(id, year) references functional.parent_table(id, year)"
+        + " disable novalidate rely)", ctx);
+    AnalyzesOk("create table foo(seq int, id int, year int, a int, "
+            + "foreign key(id, year) references functional.parent_table(id, year))", ctx);
+    AnalysisError("create table fk(id int, year string, foreign key(year) "
+        + "references pk2(year))", ctx, "Parent table not found: test_pk_fk.pk2");
     AnalyzesOk("create table fk(id int, year string, foreign key(id, year) references"
-        + " pk(id, year))", ctx);
+        + " functional.parent_table(id, year))", ctx);
     AnalysisError("create table fk(id int, year string, foreign key(id, year) "
         + "references pk(year))", ctx, "The number of foreign key columns should be same"
         + " as the number of parent key columns.");
-    AnalysisError("create table fk(id int, foreign key(id) references pk(foo))", ctx,
-        "Parent column not found: foo");
     AnalysisError("create table fk(id int, foreign key(id) references "
-        + "non_pk_table(id))", ctx, "Parent column id is not part of primary key.");
+            + "functional.parent_table(foo))", ctx, "Parent column not found: foo");
+    AnalysisError("create table fk(id int, foreign key(id) references "
+        + "functional.alltypes(int_col))", ctx, "Parent column int_col is not part of "
+        + "primary key.");
 
     {
       // Check that long_properties fail at the analysis layer
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index 94e3413..165ac75 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -375,15 +375,14 @@ public class ToSqlTest extends FrontendTestBase {
 
     // Foreign Key test requires a valid primary key table.
     addTestDb("test_pk_fk", "Test DB for PK/FK tests");
-    addTestTable("create table test_pk_fk.pk (id int, year string, primary key (id, "
-        + "year))");
     AnalysisContext ctx = createAnalysisCtx("test_pk_fk");
 
-    testToSql(ctx, "create table fk(id int, year string, FOREIGN KEY (id) "
-        + "REFERENCES pk(id), FOREIGN KEY (year) REFERENCES pk"
-        + "(year))", "CREATE TABLE test_pk_fk.fk ( id INT, year STRING, "
-        + "FOREIGN KEY(id) REFERENCES test_pk_fk.pk(id), FOREIGN KEY(year) REFERENCES "
-        + "test_pk_fk.pk(year) ) STORED AS TEXTFILE", true);
+    testToSql(ctx, "create table fk(seq int, id int, year string, a int, "
+        + "FOREIGN KEY(id, year) REFERENCES functional.parent_table(id, year), FOREIGN "
+        + "KEY (a) REFERENCES functional.parent_table_2(a))", "CREATE TABLE "
+        + "test_pk_fk.fk ( seq INT, id INT, year STRING, a INT, FOREIGN KEY(id, year) "
+        + "REFERENCES functional.parent_table(id, year), FOREIGN KEY(a) REFERENCES "
+        + "functional.parent_table_2(a) ) STORED AS TEXTFILE", true);
   }
 
   @Test
diff --git a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTest.java b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTest.java
index b03adb4..b046691 100644
--- a/fe/src/test/java/org/apache/impala/authorization/AuthorizationTest.java
+++ b/fe/src/test/java/org/apache/impala/authorization/AuthorizationTest.java
@@ -30,6 +30,8 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
+import org.apache.hive.service.rpc.thrift.TGetCrossReferenceReq;
+import org.apache.hive.service.rpc.thrift.TGetPrimaryKeysReq;
 import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
 import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.impala.analysis.AnalysisContext;
@@ -74,7 +76,8 @@ public class AuthorizationTest extends FrontendTestBase {
   // column-level SELECT or INSERT permission. I.e. that should be returned by
   // 'SHOW TABLES'.
   private static final List<String> FUNCTIONAL_VISIBLE_TABLES = Lists.newArrayList(
-      "alltypes", "alltypesagg", "alltypeserror", "alltypessmall", "alltypestiny");
+      "alltypes", "alltypesagg", "alltypeserror", "alltypessmall", "alltypestiny",
+      "child_table", "parent_table", "parent_table_2");
 
   private static final SentryAuthorizationConfig AUTHZ_CONFIG =
       SentryAuthorizationConfig.createHadoopGroupAuthConfig("server1",
@@ -168,6 +171,25 @@ public class AuthorizationTest extends FrontendTestBase {
     privilege.setDb_name("tpch");
     addRolePrivilege(catalog, privilege, role);
 
+    // Select on parent_table_2
+    privilege = new TPrivilege(TPrivilegeLevel.SELECT, TPrivilegeScope.TABLE, false);
+    privilege.setDb_name("functional");
+    privilege.setTable_name("parent_table_2");
+    addRolePrivilege(catalog, privilege, role);
+
+    // Add privilege on "year" for parent table.
+    privilege = new TPrivilege(TPrivilegeLevel.SELECT, TPrivilegeScope.COLUMN, false);
+    privilege.setDb_name("functional");
+    privilege.setTable_name("parent_table");
+    privilege.setColumn_name("year");
+    addRolePrivilege(catalog, privilege, role);
+
+    // Add SELECT privilege on child_table
+    privilege = new TPrivilege(TPrivilegeLevel.SELECT, TPrivilegeScope.TABLE, false);
+    privilege.setDb_name("functional");
+    privilege.setTable_name("child_table");
+    addRolePrivilege(catalog, privilege, role);
+
     for (String group: groups) {
       catalog.addRoleGrantGroup(role_, group);
     }
@@ -322,6 +344,47 @@ public class AuthorizationTest extends FrontendTestBase {
   }
 
   @Test
+  public void TestHs2GetPrimaryKeys() throws ImpalaException {
+    // Only one of the pk columns has privileges. This should not return anything.
+    TMetadataOpRequest req = new TMetadataOpRequest();
+    req.setSession(createSessionState("default", USER));
+    req.opcode = TMetadataOpcode.GET_PRIMARY_KEYS;
+    req.get_primary_keys_req = new TGetPrimaryKeysReq();
+    req.get_primary_keys_req.setSchemaName("functional");
+    req.get_primary_keys_req.setTableName("parent_table");
+
+    TResultSet resp = AUTHZ_FE.execHiveServer2MetadataOp(req);
+    assertEquals(0, resp.rows.size());
+
+    // PK column has privileges, should return it.
+    req = new TMetadataOpRequest();
+    req.setSession(createSessionState("default", USER));
+    req.opcode = TMetadataOpcode.GET_PRIMARY_KEYS;
+    req.get_primary_keys_req = new TGetPrimaryKeysReq();
+    req.get_primary_keys_req.setSchemaName("functional");
+    req.get_primary_keys_req.setTableName("parent_table_2");
+
+    resp = AUTHZ_FE.execHiveServer2MetadataOp(req);
+    assertEquals(1, resp.rows.size());
+  }
+
+  @Test
+  public void TestHs2GetCrossReference() throws ImpalaException {
+    // On parent_table, only one of the pk columns has privileges for USER, hence the
+    // entire SQLPrimaryKey sequence will be filtered out.
+    TMetadataOpRequest req = new TMetadataOpRequest();
+    req.setSession(createSessionState("default", USER));
+    req.opcode = TMetadataOpcode.GET_CROSS_REFERENCE;
+    req.get_cross_reference_req = new TGetCrossReferenceReq();
+    req.get_cross_reference_req.setForeignSchemaName("functional");
+    req.get_cross_reference_req.setForeignTableName("child_table");
+
+    TResultSet resp = AUTHZ_FE.execHiveServer2MetadataOp(req);
+    // Returns just 1 SQLForeignKeys instead of 3.
+    assertEquals(1, resp.rows.size());
+  }
+
+  @Test
   public void TestHs2GetSchema() throws ImpalaException {
     TMetadataOpRequest req = new TMetadataOpRequest();
     req.setSession(createSessionState("default", USER));
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
index 1b5f441..606ea39 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendFixture.java
@@ -203,7 +203,6 @@ public class FrontendFixture {
       try {
         HdfsTable hdfsTable = (HdfsTable) dummyTable;
         hdfsTable.initializePartitionMetadata(msTbl);
-        hdfsTable.getPrimaryKeys().addAll(createTableStmt.getPrimaryKeys());
       } catch (CatalogException e) {
         e.printStackTrace();
         fail("Failed to add test table:\n" + createTableSql);
diff --git a/fe/src/test/java/org/apache/impala/service/JdbcTest.java b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
index e11feca..66b06ea 100644
--- a/fe/src/test/java/org/apache/impala/service/JdbcTest.java
+++ b/fe/src/test/java/org/apache/impala/service/JdbcTest.java
@@ -423,6 +423,55 @@ public class JdbcTest extends JdbcTestBase {
   }
 
   @Test
+  public void testMetadataGetPrimaryKeys() throws Exception {
+    List<String> pkList = new ArrayList<>(Arrays.asList("id", "year"));
+    ResultSet rs = con_.getMetaData().getPrimaryKeys(null, "functional", "parent_table");
+    ResultSetMetaData md = rs.getMetaData();
+    assertEquals("Incorrect number of columns seen", 6, md.getColumnCount());
+    // TODO (IMPALA-9158): Remove this check.
+    if (!TestUtils.isCatalogV2Enabled("localhost", 25020)) {
+      int pkCount = 0;
+      while (rs.next()) {
+        pkCount++;
+        assertEquals("", rs.getString("TABLE_CAT"));
+        assertEquals("functional", rs.getString("TABLE_SCHEM"));
+        assertEquals("parent_table", rs.getString("TABLE_NAME"));
+        assertTrue(pkList.contains(rs.getString("COLUMN_NAME")));
+        assertTrue(rs.getString("PK_NAME").length() > 0);
+      }
+      assertEquals(2, pkCount);
+    }
+  }
+
+  @Test
+  public void testMetadataGetCrossReference() throws Exception {
+    ResultSet rs = con_.getMetaData().getCrossReference(null, "functional",
+        "parent_table", null,
+        "functional", "child_table");
+    ResultSetMetaData md = rs.getMetaData();
+    assertEquals("Incorrect number of columns seen for primary key.",
+        14, md.getColumnCount());
+    // TODO (IMPALA-9158): Remove this check.
+    if (!TestUtils.isCatalogV2Enabled("localhost", 25020)) {
+      List<String> colList = new ArrayList<>(Arrays.asList("id", "year"));
+      int fkCount = 0;
+      while (rs.next()) {
+        fkCount++;
+        assertEquals("", rs.getString("PKTABLE_CAT"));
+        assertEquals("functional", rs.getString("PKTABLE_SCHEM"));
+        assertEquals("parent_table", rs.getString("PKTABLE_NAME"));
+        assertTrue(colList.contains(rs.getString("PKCOLUMN_NAME")));
+        assertTrue(rs.getString("FK_NAME").length() > 0);
+        assertEquals("", rs.getString("FKTABLE_CAT"));
+        assertEquals("functional", rs.getString("FKTABLE_SCHEM"));
+        assertEquals("child_table", rs.getString("FKTABLE_NAME"));
+        assertTrue(colList.contains(rs.getString("FKCOLUMN_NAME")));
+      }
+      assertEquals(2, fkCount);
+    }
+  }
+
+  @Test
   public void testDecimalGetColumnTypes() throws SQLException {
     // Table has 5 decimal columns
     ResultSet rs = con_.createStatement().executeQuery(
diff --git a/testdata/data/README b/testdata/data/README
index a4f55b0..bbffeb4 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -439,3 +439,15 @@ I used Hive 2.1.1 with a modified Parquet-MR, see description at decimals_1_10.p
 I used the following commands to create the file:
 hive  --hiveconf parquet.page.row.count.limit=90 --hiveconf parquet.page.size=90 --hiveconf parquet.enable.dictionary=false  --hiveconf parquet.page.size.row.check.min=7
 create table alltypes_tiny_pages_plain stored as parquet as select * from functional_parquet.alltypes
+
+parent_table:
+Created manually. Contains two columns, an INT and a STRING column. Together they form primary key for the table. This table is used to test primary key and foreign key
+relationships along with parent_table_2 and child_table.
+
+parent_table_2:
+Created manually. Contains just one int column which is also the table's primary key. This table is used to test primary key and foreign key
+relationships along with parent_table and child_table.
+
+child_table:
+Created manually. Contains four columns. 'seq' column is the primary key of this table. ('id', 'year') form a foreign key referring to parent_table('id', 'year') and 'a' is a
+foreign key referring to parent_table_2's primary column 'a'.
\ No newline at end of file
diff --git a/testdata/data/child_table.txt b/testdata/data/child_table.txt
new file mode 100644
index 0000000..374e3a2
--- /dev/null
+++ b/testdata/data/child_table.txt
@@ -0,0 +1,6 @@
+101,1,1951,1001
+102,2,1970,2001
+103,3,1965,9001
+104,4,1958,8001
+105,5,1939,5001
+106,6,2008,7001
\ No newline at end of file
diff --git a/testdata/data/parent_table.txt b/testdata/data/parent_table.txt
new file mode 100644
index 0000000..80671da
--- /dev/null
+++ b/testdata/data/parent_table.txt
@@ -0,0 +1,6 @@
+1,1951
+2,1970
+3,1965
+4,1958
+5,1939
+6,2008
\ No newline at end of file
diff --git a/testdata/data/parent_table_2.txt b/testdata/data/parent_table_2.txt
new file mode 100644
index 0000000..48757a2
--- /dev/null
+++ b/testdata/data/parent_table_2.txt
@@ -0,0 +1,6 @@
+1001
+9001
+4001
+3001
+5001
+7001
\ No newline at end of file
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index c98270e..959e143 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2022,6 +2022,54 @@ FROM {db_name}.{table_name};
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+parent_table
+---- CREATE
+CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
+id INT, year string, primary key(id, year) DISABLE NOVALIDATE RELY)
+row format delimited fields terminated by ','
+LOCATION '/test-warehouse/{table_name}';
+---- ROW_FORMAT
+delimited fields terminated by '',''
+---- LOAD
+`hadoop fs -mkdir -p /test-warehouse/parent_table && hadoop fs -put -f \
+${IMPALA_HOME}/testdata/data/parent_table.txt /test-warehouse/parent_table/
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+parent_table_2
+---- CREATE
+CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
+a INT, primary key(a) DISABLE NOVALIDATE RELY)
+row format delimited fields terminated by ','
+LOCATION '/test-warehouse/{table_name}';
+---- ROW_FORMAT
+delimited fields terminated by ','
+---- LOAD
+`hadoop fs -mkdir -p /test-warehouse/parent_table_2 && hadoop fs -put -f \
+${IMPALA_HOME}/testdata/data/parent_table_2.txt /test-warehouse/parent_table_2/
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+child_table
+---- CREATE
+CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} (
+seq int, id int, year string, a int, primary key(seq) DISABLE NOVALIDATE RELY, foreign key
+(id, year) references {db_name}{db_suffix}.parent_table(id, year) DISABLE NOVALIDATE
+RELY, foreign key(a) references {db_name}{db_suffix}.parent_table_2(a) DISABLE
+NOVALIDATE RELY)
+row format delimited fields terminated by ','
+LOCATION '/test-warehouse/{table_name}';
+---- ROW_FORMAT
+delimited fields terminated by ','
+---- LOAD
+`hadoop fs -mkdir -p /test-warehouse/child_table && hadoop fs -put -f \
+${IMPALA_HOME}/testdata/data/child_table.txt /test-warehouse/child_table/
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 chars_tiny
 ---- COLUMNS
 cs CHAR(5)
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index 14d1b19..1928f93 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -489,6 +489,140 @@ class TestHS2(HS2TestSuite):
     finally:
       self.execute_query("drop table {0}".format(table))
 
+  @needs_session_cluster_properties()
+  def test_get_primary_keys(self, cluster_properties, unique_database):
+    table = "pk"
+    self.execute_query("use {0}".format(unique_database))
+    self.execute_query("drop table if exists {0}".format(table))
+    self.execute_query("""
+        create table {0} (id int, year string, primary key(id, year))""".format(
+        table))
+    pks = ["id", "year"]
+    try:
+      req = TCLIService.TGetPrimaryKeysReq()
+      req.sessionHandle = self.session_handle
+      req.schemaName = unique_database
+      req.tableName = table
+
+      get_primary_keys_resp = self.hs2_client.GetPrimaryKeys(req)
+      TestHS2.check_response(get_primary_keys_resp)
+
+      fetch_results_resp = self._fetch_results(
+          get_primary_keys_resp.operationHandle, 100)
+
+      if not cluster_properties.is_catalog_v2_cluster():
+        for i in range(len(pks)):
+          results = fetch_results_resp.results
+          table_cat = results.columns[0].stringVal.values[i]
+          table_schema = results.columns[1].stringVal.values[i]
+          table_name = results.columns[2].stringVal.values[i]
+          pk_col_name = results.columns[3].stringVal.values[i]
+          pk_name = results.columns[5].stringVal.values[i]
+          assert table_cat == ''
+          assert table_schema == unique_database
+          assert table_name == table
+          assert pk_col_name in pks
+          assert len(pk_name) > 0
+
+    finally:
+      self.execute_query("drop table {0}".format(table))
+
+  @needs_session_cluster_properties()
+  def test_get_cross_reference(self, cluster_properties, unique_database):
+    parent_table_1 = "pk_1"
+    parent_table_2 = "pk_2"
+    fk_table = "fk"
+    self.execute_query("use {0}".format(unique_database))
+    self.execute_query("drop table if exists {0}".format(parent_table_1))
+    self.execute_query("""
+        create table {0} (id int, year string, primary key(id, year))""".format(
+        parent_table_1))
+    self.execute_query("drop table if exists {0}".format(parent_table_2))
+    self.execute_query("""
+        create table {0} (a int, b string, primary key(a))""".format(
+        parent_table_2))
+    self.execute_query("drop table if exists {0}".format(fk_table))
+    self.execute_query("""
+        create table {0} (seq int, id int, year string, a int, primary key(seq)
+        , foreign key(id, year) references {1}(id, year), foreign key(a)
+        references {2}(a))
+        """.format(fk_table, parent_table_1, parent_table_2))
+    pk_column_names = ["id", "year", "a"]
+    try:
+      req = TCLIService.TGetCrossReferenceReq()
+      req.sessionHandle = self.session_handle
+      req.parentSchemaName = unique_database
+      req.foreignSchemaName = unique_database
+      req.parentTableName = parent_table_1
+      req.foreignTableName = fk_table
+
+      get_foreign_keys_resp = self.hs2_client.GetCrossReference(req)
+      TestHS2.check_response(get_foreign_keys_resp)
+
+      fetch_results_resp = self._fetch_results(
+          get_foreign_keys_resp.operationHandle, 100)
+
+      results = fetch_results_resp.results
+
+      if not cluster_properties.is_catalog_v2_cluster():
+        for i in range(2):
+          parent_table_cat = results.columns[0].stringVal.values[i]
+          parent_table_schema = results.columns[1].stringVal.values[i]
+          parent_table_name = results.columns[2].stringVal.values[i]
+          parent_col_name = results.columns[3].stringVal.values[i]
+          foreign_table_cat = results.columns[4].stringVal.values[i]
+          foreign_table_schema = results.columns[5].stringVal.values[i]
+          foreign_table_name = results.columns[6].stringVal.values[i]
+          foreign_col_name = results.columns[7].stringVal.values[i]
+
+          assert parent_table_cat == ''
+          assert parent_table_schema == unique_database
+          assert parent_table_name == parent_table_1
+          assert parent_col_name in pk_column_names
+          assert foreign_table_cat == ''
+          assert foreign_table_schema == unique_database
+          assert foreign_table_name == fk_table
+          assert foreign_col_name in pk_column_names
+
+        # Get all foreign keys from the FK side by not setting pkTableSchema
+        # and pkTable name in the request.
+        req = TCLIService.TGetCrossReferenceReq()
+        req.sessionHandle = self.session_handle
+        req.foreignSchemaName = unique_database
+        req.foreignTableName = fk_table
+
+        get_foreign_keys_resp = self.hs2_client.GetCrossReference(req)
+        TestHS2.check_response(get_foreign_keys_resp)
+
+        fetch_results_resp = self._fetch_results(
+            get_foreign_keys_resp.operationHandle, 100)
+
+        results = fetch_results_resp.results
+
+        pk_table_names = [parent_table_1, parent_table_2]
+        for i in range(len(pk_column_names)):
+          parent_table_cat = results.columns[0].stringVal.values[i]
+          parent_table_schema = results.columns[1].stringVal.values[i]
+          parent_table_name = results.columns[2].stringVal.values[i]
+          parent_col_name = results.columns[3].stringVal.values[i]
+          foreign_table_cat = results.columns[4].stringVal.values[i]
+          foreign_table_schema = results.columns[5].stringVal.values[i]
+          foreign_table_name = results.columns[6].stringVal.values[i]
+          foreign_col_name = results.columns[7].stringVal.values[i]
+          assert parent_table_cat == ''
+          assert parent_table_schema == unique_database
+          assert parent_table_name in pk_table_names
+          assert parent_col_name in pk_column_names
+          assert foreign_table_cat == ''
+          assert foreign_table_schema == unique_database
+          assert foreign_table_name == fk_table
+          assert foreign_col_name in pk_column_names
+
+    finally:
+      self.execute_query("drop table {0}".format(parent_table_1))
+      self.execute_query("drop table {0}".format(parent_table_2))
+      self.execute_query("drop table {0}".format(fk_table))
+
   @needs_session(conf_overlay={"idle_session_timeout": "5"})
   def test_get_operation_status_session_timeout(self):
     """Regression test for IMPALA-4488: GetOperationStatus() would not keep a session