You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/04 21:56:33 UTC

[impala] branch master updated (0b564ec -> e78e6f0)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 0b564ec  Disable custom cluster/service FE tests on S3
     new 6bb404d  IMPALA-8504 (part 2): Support CREATE TABLE statement with Kudu/HMS integration
     new e78e6f0  IMPALA-8578: part 1: reduce dependencies on *metrics.h

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/catalog/catalog-server.cc                   |  2 +
 be/src/catalog/catalog-server.h                    |  2 +-
 be/src/exec/exec-node.h                            |  1 +
 be/src/exec/hash-table.h                           |  1 +
 be/src/exec/hdfs-orc-scanner.h                     |  2 +
 be/src/exec/hdfs-scan-node-base.cc                 |  3 +
 be/src/exec/hdfs-table-sink.cc                     |  1 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  2 +
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |  1 +
 be/src/exec/parquet/parquet-column-readers.cc      |  1 +
 be/src/exec/plan-root-sink.cc                      |  1 +
 be/src/exec/scan-node.cc                           |  2 +
 be/src/experiments/data-provider-test.cc           |  1 +
 be/src/experiments/tuple-splitter-test.cc          |  1 +
 be/src/rpc/TAcceptQueueServer.cpp                  |  1 +
 be/src/rpc/TAcceptQueueServer.h                    |  4 +-
 be/src/rpc/thrift-server.h                         |  3 +-
 be/src/rpc/thrift-util.cc                          |  3 +-
 be/src/runtime/buffered-tuple-stream.cc            |  1 +
 be/src/runtime/bufferpool/buffer-allocator-test.cc |  1 +
 be/src/runtime/bufferpool/reservation-tracker.cc   |  1 +
 be/src/runtime/client-cache.cc                     |  1 +
 be/src/runtime/client-cache.h                      |  3 +-
 be/src/runtime/coordinator-backend-state.cc        |  1 +
 be/src/runtime/initial-reservations.cc             |  1 +
 be/src/runtime/io/data-cache.cc                    |  1 +
 be/src/runtime/io/disk-io-mgr.cc                   |  2 +
 be/src/runtime/io/handle-cache.h                   |  1 -
 be/src/runtime/io/handle-cache.inline.h            |  2 +
 be/src/runtime/io/hdfs-file-reader.cc              |  1 +
 be/src/runtime/io/local-file-reader.cc             |  1 +
 be/src/runtime/krpc-data-stream-mgr.cc             |  1 +
 be/src/runtime/krpc-data-stream-mgr.h              |  2 +-
 be/src/runtime/mem-pool.cc                         |  1 -
 be/src/runtime/mem-tracker.cc                      | 13 +++
 be/src/runtime/mem-tracker.h                       | 17 ++--
 be/src/runtime/query-exec-mgr.cc                   |  5 +-
 be/src/runtime/query-state.cc                      |  1 +
 be/src/runtime/sorter.cc                           |  1 +
 be/src/runtime/tmp-file-mgr-test.cc                |  1 +
 be/src/runtime/tmp-file-mgr.cc                     |  1 +
 be/src/runtime/tmp-file-mgr.h                      |  2 +-
 be/src/scheduling/request-pool-service.cc          |  1 +
 be/src/scheduling/request-pool-service.h           |  2 +-
 be/src/scheduling/scheduler.h                      |  2 +-
 be/src/service/client-request-state.cc             |  1 +
 be/src/service/impala-beeswax-server.cc            |  1 -
 be/src/service/impala-hs2-server.cc                |  1 +
 be/src/service/impala-server.cc                    |  1 +
 be/src/service/impalad-main.cc                     |  1 -
 be/src/service/session-expiry-test.cc              |  1 +
 be/src/statestore/statestore-subscriber.cc         |  3 +
 be/src/statestore/statestore-subscriber.h          |  3 +-
 be/src/statestore/statestore.cc                    |  1 +
 be/src/statestore/statestore.h                     |  2 +-
 be/src/util/collection-metrics.h                   | 14 +---
 be/src/util/impalad-metrics.cc                     |  2 +
 be/src/util/impalad-metrics.h                      |  5 +-
 be/src/util/metrics-fwd.h                          | 69 +++++++++++++++
 be/src/util/metrics.h                              | 17 +---
 .../java/org/apache/impala/catalog/KuduTable.java  | 98 ++++++++++++++++++++--
 .../main/java/org/apache/impala/catalog/Table.java |  8 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 44 ++++++++--
 .../impala/service/KuduCatalogOpExecutor.java      | 24 +++++-
 .../java/org/apache/impala/util/MetaStoreUtil.java | 14 ++++
 tests/common/custom_cluster_test_suite.py          | 21 ++++-
 tests/common/kudu_test_suite.py                    |  7 +-
 tests/common/skip.py                               |  3 +
 tests/custom_cluster/test_kudu.py                  | 82 ++++++++++++++++++
 tests/query_test/test_kudu.py                      | 25 ++++++
 70 files changed, 466 insertions(+), 81 deletions(-)
 create mode 100644 be/src/util/metrics-fwd.h


[impala] 01/02: IMPALA-8504 (part 2): Support CREATE TABLE statement with Kudu/HMS integration

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6bb404dc35999c250cc9112c19b74ffcf17fe14b
Author: Hao Hao <ha...@cloudera.com>
AuthorDate: Sun May 19 23:04:38 2019 -0700

    IMPALA-8504 (part 2): Support CREATE TABLE statement with Kudu/HMS integration
    
    This commit supports the actual handling of CREATE TABLE DDL for managed
    Kudu tables when integration with Hive Metastore is enabled. When
    Kudu/HMS integration is enabled, for CREATE TABLE statement, Impala can
    rely on Kudu to create the table in the HMS.
    
    Change-Id: Icffe412395f47f5e07d97bad457020770cfa7502
    Reviewed-on: http://gerrit.cloudera.org:8080/13375
    Reviewed-by: Thomas Marshall <tm...@cloudera.com>
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Thomas Marshall <tm...@cloudera.com>
---
 .../java/org/apache/impala/catalog/KuduTable.java  | 98 ++++++++++++++++++++--
 .../main/java/org/apache/impala/catalog/Table.java |  8 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 44 ++++++++--
 .../impala/service/KuduCatalogOpExecutor.java      | 24 +++++-
 .../java/org/apache/impala/util/MetaStoreUtil.java | 14 ++++
 tests/common/custom_cluster_test_suite.py          | 21 ++++-
 tests/common/kudu_test_suite.py                    |  7 +-
 tests/common/skip.py                               |  3 +
 tests/custom_cluster/test_kudu.py                  | 82 ++++++++++++++++++
 tests/query_test/test_kudu.py                      | 25 ++++++
 10 files changed, 306 insertions(+), 20 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index dfa960e..e059013 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -17,7 +17,10 @@
 
 package org.apache.impala.catalog;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -114,6 +117,13 @@ public class KuduTable extends Table implements FeKuduTable {
     super(msTable, db, name, owner);
     kuduTableName_ = msTable.getParameters().get(KuduTable.KEY_TABLE_NAME);
     kuduMasters_ = msTable.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+    if (kuduTableName_ == null || kuduTableName_.isEmpty()) {
+      // When 'kudu.table_name' property is empty, it implies Kudu/HMS
+      // integration is enabled.
+      // TODO: remove this hack once Kudu support 'kudu.table_name'
+      // property with the new storage handler.
+      populateDefaultTableName(msTable, /* isHMSIntegrationEnabled */true);
+    }
   }
 
   @Override
@@ -156,6 +166,17 @@ public class KuduTable extends Table implements FeKuduTable {
   }
 
   /**
+   * Populates the default table name.
+   */
+  private void populateDefaultTableName(
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      boolean isHMSIntegrationEnabled) {
+    kuduTableName_ = KuduUtil.getDefaultKuduTableName(
+        msTbl.getDbName(), msTbl.getTableName(), isHMSIntegrationEnabled);
+    msTbl.getParameters().put(KuduTable.KEY_TABLE_NAME, kuduTableName_);
+  }
+
+  /**
    * Get the Hive Metastore configuration from Kudu masters.
    */
   private static HiveMetastoreConfig getHiveMetastoreConfig(String kuduMasters)
@@ -184,6 +205,61 @@ public class KuduTable extends Table implements FeKuduTable {
   }
 
   /**
+   * Check with the Kudu master to see if its Kudu-HMS integration is enabled;
+   * if so, validate that it is integrated with the same Hive Metastore that
+   * Impala is configured to use.
+   */
+  public static boolean isHMSIntegrationEnabledAndValidate(String kuduMasters,
+      String hmsUris) throws ImpalaRuntimeException {
+    Preconditions.checkNotNull(hmsUris);
+    // Skip validation if the HMS URIs in impala is empty for some reason.
+    // TODO: Is this a valid case?
+    if (hmsUris.isEmpty()) {
+      return true;
+    }
+    HiveMetastoreConfig hmsConfig = getHiveMetastoreConfig(kuduMasters);
+    if (hmsConfig == null) {
+      return false;
+    }
+    // Validate Kudu is configured to use the same HMS as Impala. We consider
+    // it is the case as long as Kudu and Impala are configured to talk to
+    // the HMS with the same host address(es).
+    final String kuduHmsUris = hmsConfig.getHiveMetastoreUris();
+    Set<String> hmsHosts;
+    Set<String> kuduHmsHosts;
+    try {
+      hmsHosts = parseHosts(hmsUris);
+      kuduHmsHosts = parseHosts(kuduHmsUris);
+    } catch (URISyntaxException e) {
+      throw new ImpalaRuntimeException(
+          String.format("Error parsing URI: %s", e.getMessage()));
+    }
+    if (hmsHosts != null && kuduHmsHosts != null && hmsHosts.equals(kuduHmsHosts)) {
+      return true;
+    }
+    throw new ImpalaRuntimeException(
+       String.format("Kudu is integrated with a different Hive Metastore " +
+           "than that used by Impala, Kudu is configured to use the HMS: " +
+           "%s, while Impala is configured to use the HMS: %s",
+           kuduHmsUris, hmsUris));
+  }
+
+  /**
+   * Parse the given URIs and return a set of hosts in the URIs.
+   */
+  private static Set<String> parseHosts(String uris) throws URISyntaxException {
+    String[] urisString = uris.split(",");
+    Set<String> parsedHosts = new HashSet<>();
+
+    for (String s : urisString) {
+      s.trim();
+      URI tmpUri = new URI(s);
+      parsedHosts.add(tmpUri.getHost());
+    }
+    return parsedHosts;
+  }
+
+  /**
    * Load schema and partitioning schemes directly from Kudu.
    */
   public void loadSchemaFromKudu() throws ImpalaRuntimeException {
@@ -225,8 +301,11 @@ public class KuduTable extends Table implements FeKuduTable {
       msTable_ = msTbl.deepCopy();
       kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
       if (kuduTableName_ == null || kuduTableName_.isEmpty()) {
-        throw new TableLoadingException("No " + KuduTable.KEY_TABLE_NAME +
-            " property found for Kudu table " + kuduTableName_);
+        // When 'kudu.table_name' property is empty, it implies Kudu/HMS
+        // integration is enabled.
+        // TODO: remove this hack once Kudu support 'kudu.table_name'
+        // property with the new storage handler.
+        populateDefaultTableName(msTable_, /* isHMSIntegrationEnabled */true);
       }
       kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
       if (kuduMasters_ == null || kuduMasters_.isEmpty()) {
@@ -274,16 +353,23 @@ public class KuduTable extends Table implements FeKuduTable {
     Preconditions.checkNotNull(kuduTable);
     clearColumns();
     primaryKeyColumnNames_.clear();
+    boolean isHMSIntegrationEnabled = KuduTable.isHMSIntegrationEnabled(kuduMasters_);
     List<FieldSchema> cols = msTable_.getSd().getCols();
-    cols.clear();
+    if (!isHMSIntegrationEnabled) {
+      cols.clear();
+    }
+
     int pos = 0;
     kuduSchema_ = kuduTable.getSchema();
     for (ColumnSchema colSchema: kuduSchema_.getColumns()) {
       KuduColumn kuduCol = KuduColumn.fromColumnSchema(colSchema, pos);
       Preconditions.checkNotNull(kuduCol);
-      // Add the HMS column
-      cols.add(new FieldSchema(kuduCol.getName(), kuduCol.getType().toSql().toLowerCase(),
-          null));
+      // Only update the HMS column definition when Kudu/HMS integration
+      // is disabled.
+      if (!isHMSIntegrationEnabled) {
+        cols.add(new FieldSchema(kuduCol.getName(),
+            kuduCol.getType().toSql().toLowerCase(), null));
+      }
       if (kuduCol.isKey()) primaryKeyColumnNames_.add(kuduCol.getName());
       addColumn(kuduCol);
       ++pos;
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 2effb03..ab44c55 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -145,6 +145,9 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
   public static final String TBL_PROP_LAST_COMPUTE_STATS_TIME =
       "impala.lastComputeStatsTime";
 
+  // Table property key for storing table type externality.
+  private static final String TBL_PROP_EXTERNAL_TABLE = "EXTERNAL";
+
   protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
       String name, String owner) {
     msTable_ = msTable;
@@ -609,7 +612,10 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
 
   public static boolean isExternalTable(
       org.apache.hadoop.hive.metastore.api.Table msTbl) {
-    return msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString());
+    // HIVE-19253: table property can also indicate an external table.
+    // See org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.isExternalTable().
+    return msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()) ||
+           ("TRUE").equalsIgnoreCase(msTbl.getParameters().get(TBL_PROP_EXTERNAL_TABLE));
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 06d55bc..e9cb876 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1888,9 +1888,19 @@ public class CatalogOpExecutor {
   }
 
   /**
-   * Creates a new Kudu table. The Kudu table is first created in the Kudu storage engine
-   * (only applicable to managed tables), then in HMS and finally in the catalog cache.
-   * Failure to add the table in HMS results in the table being dropped from Kudu.
+   * Creates a new Kudu table.
+   *
+   * For managed tables:
+   *  1. If Kudu's integration with the Hive Metastore is not enabled, the Kudu
+   *     table is first created in Kudu, then in the HMS.
+   *  2. Otherwise, when the table is created in Kudu, we rely on Kudu to have
+   *     created the table in the HMS.
+   * For external tables:
+   *  1. We only create the table in the HMS (regardless of Kudu's integration
+   *     with the Hive Metastore).
+   *
+   * After the above is complete, we create the table in the catalog cache.
+   *
    * 'response' is populated with the results of this operation. Returns true if a new
    * table was created as part of this call, false otherwise.
    */
@@ -1898,16 +1908,36 @@ public class CatalogOpExecutor {
       TCreateTableParams params, TDdlExecResponse response) throws ImpalaException {
     Preconditions.checkState(KuduTable.isKuduTable(newTable));
     if (Table.isExternalTable(newTable)) {
-      KuduCatalogOpExecutor.populateColumnsFromKudu(newTable);
+      KuduCatalogOpExecutor.populateExternalTableColsFromKudu(newTable);
     } else {
       KuduCatalogOpExecutor.createManagedTable(newTable, params);
     }
+    boolean createsHMSTable;
+    if (Table.isExternalTable(newTable)) {
+      createsHMSTable = true;
+    } else {
+      String masterHosts = newTable.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+      String hmsUris;
+      // Check if Kudu's integration with the Hive Metastore is enabled for
+      // managed tables, and validate the configuration.
+      try {
+        hmsUris = MetaStoreUtil.getHiveMetastoreUrisKeyValue(
+            catalog_.getMetaStoreClient().getHiveClient());
+      } catch (Exception e) {
+        throw new RuntimeException(String.format("Failed to get the Hive Metastore " +
+            "configuration for table '%s' ", newTable.getTableName()), e);
+      }
+      createsHMSTable =
+         !KuduTable.isHMSIntegrationEnabledAndValidate(masterHosts, hmsUris);
+    }
     try {
-      // Add the table to the HMS and the catalog cache. Aquire metastoreDdlLock_ to
+      // Add the table to the HMS and the catalog cache. Acquire metastoreDdlLock_ to
       // ensure the atomicity of these operations.
       synchronized (metastoreDdlLock_) {
-        try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-          msClient.getHiveClient().createTable(newTable);
+        if (createsHMSTable) {
+          try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+            msClient.getHiveClient().createTable(newTable);
+          }
         }
         // Add the table to the catalog cache
         Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index f4477b4..aa2749c 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -74,6 +74,8 @@ public class KuduCatalogOpExecutor {
   static void createManagedTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
       TCreateTableParams params) throws ImpalaRuntimeException {
     Preconditions.checkState(!Table.isExternalTable(msTbl));
+    Preconditions.checkState(
+        msTbl.getParameters().get(KuduTable.KEY_TABLE_ID) == null);
     String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
     String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
     if (LOG.isTraceEnabled()) {
@@ -91,7 +93,15 @@ public class KuduCatalogOpExecutor {
       }
       Schema schema = createTableSchema(params);
       CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
-      kudu.createTable(kuduTableName, schema, tableOpts);
+      org.apache.kudu.client.KuduTable table =
+          kudu.createTable(kuduTableName, schema, tableOpts);
+      // Populate table ID from Kudu table if Kudu's integration with the Hive
+      // Metastore is enabled.
+      if (KuduTable.isHMSIntegrationEnabled(masterHosts)) {
+        String tableId = table.getTableId();
+        Preconditions.checkNotNull(tableId);
+        msTbl.getParameters().put(KuduTable.KEY_TABLE_ID, tableId);
+      }
     } catch (Exception e) {
       throw new ImpalaRuntimeException(String.format("Error creating Kudu table '%s'",
           kuduTableName), e);
@@ -227,6 +237,9 @@ public class KuduCatalogOpExecutor {
       }
       tableOpts.setNumReplicas(parsedReplicas);
     }
+
+    // Set the table's owner.
+    tableOpts.setOwner(msTbl.getOwner());
     return tableOpts;
   }
 
@@ -263,12 +276,17 @@ public class KuduCatalogOpExecutor {
 
   /**
    * Reads the column definitions from a Kudu table and populates 'msTbl' with
-   * an equivalent schema. Throws an exception if any errors are encountered.
+   * an equivalent schema for external tables. Throws an exception if any errors
+   * are encountered.
    */
-  public static void populateColumnsFromKudu(
+  public static void populateExternalTableColsFromKudu(
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
     org.apache.hadoop.hive.metastore.api.Table msTblCopy = msTbl.deepCopy();
     List<FieldSchema> cols = msTblCopy.getSd().getCols();
+    // External table should not have table ID.
+    Preconditions.checkState(Table.isExternalTable(msTbl));
+    Preconditions.checkState(
+        msTblCopy.getParameters().get(KuduTable.KEY_TABLE_ID) == null);
     String kuduTableName = msTblCopy.getParameters().get(KuduTable.KEY_TABLE_NAME);
     Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
     String masterHosts = msTblCopy.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index 2b5551d..789892d 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -83,6 +83,11 @@ public class MetaStoreUtil {
   public static final String DEFAULT_NULL_PARTITION_KEY_VALUE =
       "__HIVE_DEFAULT_PARTITION__";
 
+  // The configuration key represents thrift URI for the remote Hive Metastore.
+  public static final String HIVE_METASTORE_URIS_KEY = "hive.metastore.uris";
+  // The default value for the above configuration key.
+  public static final String DEFAULT_HIVE_METASTORE_URIS = "";
+
   static {
     // Get the value from the Hive configuration, if present.
     HiveConf hiveConf = new HiveConf(HdfsTable.class);
@@ -112,6 +117,15 @@ public class MetaStoreUtil {
   }
 
   /**
+   * Return the value of thrift URI for the remote Hive Metastore.
+   */
+  public static String getHiveMetastoreUrisKeyValue(IMetaStoreClient client)
+      throws ConfigValSecurityException, TException {
+    return client.getConfigValue(
+        HIVE_METASTORE_URIS_KEY, DEFAULT_HIVE_METASTORE_URIS);
+  }
+
+  /**
    * Return the value set for the given config in the metastore.
    */
   public static String getMetastoreConfigValue(
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 70af069..d651b84 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -39,6 +39,7 @@ NUM_COORDINATORS = DEFAULT_CLUSTER_SIZE
 IMPALAD_ARGS = 'impalad_args'
 STATESTORED_ARGS = 'state_store_args'
 CATALOGD_ARGS = 'catalogd_args'
+KUDU_ARGS = 'kudu_args'
 # Additional args passed to the start-impala-cluster script.
 START_ARGS = 'start_args'
 SENTRY_CONFIG = 'sentry_config'
@@ -101,7 +102,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
   def with_args(impalad_args=None, statestored_args=None, catalogd_args=None,
       start_args=None, sentry_config=None, default_query_options=None,
       impala_log_dir=None, sentry_log_dir=None, cluster_size=None,
-      num_exclusive_coordinators=None):
+      num_exclusive_coordinators=None, kudu_args=None):
     """Records arguments to be passed to a cluster by adding them to the decorated
     method's func_dict"""
     def decorate(func):
@@ -120,6 +121,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         func.func_dict[SENTRY_CONFIG] = sentry_config
       if sentry_log_dir is not None:
         func.func_dict[SENTRY_LOG_DIR] = sentry_log_dir
+      if kudu_args is not None:
+        func.func_dict[KUDU_ARGS] = kudu_args
       if default_query_options is not None:
         func.func_dict[DEFAULT_QUERY_OPTIONS] = default_query_options
       if impala_log_dir is not None:
@@ -139,6 +142,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if START_ARGS in method.func_dict:
       cluster_args.extend(method.func_dict[START_ARGS])
 
+    if KUDU_ARGS in method.func_dict:
+      self._restart_kudu_service(method.func_dict[KUDU_ARGS])
+
     if SENTRY_CONFIG in method.func_dict:
       self._start_sentry_service(method.func_dict[SENTRY_CONFIG],
           method.func_dict.get(SENTRY_LOG_DIR))
@@ -178,6 +184,19 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     sleep(2)
 
   @classmethod
+  def _restart_kudu_service(cls, kudu_args=None):
+    kudu_env = dict(os.environ)
+    if kudu_args is not None:
+      kudu_env["IMPALA_KUDU_STARTUP_FLAGS"] = kudu_args
+    call = subprocess.Popen(
+        ['/bin/bash', '-c', os.path.join(IMPALA_HOME,
+                                         'testdata/cluster/admin restart kudu')],
+        env=kudu_env)
+    call.wait()
+    if call.returncode != 0:
+      raise RuntimeError("Unable to restart Kudu")
+
+  @classmethod
   def _start_sentry_service(cls, sentry_service_config, sentry_log_dir=None):
     sentry_env = dict(os.environ)
     if sentry_log_dir is not None:
diff --git a/tests/common/kudu_test_suite.py b/tests/common/kudu_test_suite.py
index 5826c74..d434c42 100644
--- a/tests/common/kudu_test_suite.py
+++ b/tests/common/kudu_test_suite.py
@@ -117,9 +117,12 @@ class KuduTestSuite(ImpalaTestSuite):
   @classmethod
   def to_kudu_table_name(cls, db_name, tbl_name):
     """Return the name of the underlying Kudu table, from the Impala database and table
-    name. This must be kept in sync with KuduUtil.getDefaultCreateKuduTableName() in the
+    name. This must be kept in sync with KuduUtil.getDefaultKuduTableName() in the
     FE."""
-    return "impala::%s.%s" % (db_name, tbl_name)
+    if get_kudu_master_flag("--hive_metastore_uris") != "":
+      return "%s.%s" % (db_name, tbl_name)
+    else:
+      return "impala::%s.%s" % (db_name, tbl_name)
 
   @classmethod
   def get_kudu_table_base_name(cls, name):
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 31d4cd6..81c004d 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -107,6 +107,9 @@ class SkipIfKudu:
   no_hybrid_clock = pytest.mark.skipif(
       get_kudu_master_flag("--use_hybrid_clock") == "false",
       reason="Test relies on --use_hybrid_clock=true in Kudu.")
+  hms_integration_enabled = pytest.mark.skipif(
+      get_kudu_master_flag("--hive_metastore_uris") != "",
+      reason="Test assumes Kudu/HMS integration is not enabled.")
 
 class SkipIf:
   skip_hbase = pytest.mark.skipif(pytest.config.option.skip_hbase,
diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py
index ce86f3c..012fce3 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -16,6 +16,7 @@
 # under the License.
 
 import logging
+import os
 import pytest
 from kudu.schema import INT32
 
@@ -44,6 +45,7 @@ class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite):
   @CustomClusterTestSuite.with_args(impalad_args=\
       "--use_local_tz_for_unix_timestamp_conversions=true")
   @SkipIfKudu.no_hybrid_clock
+  @SkipIfKudu.hms_integration_enabled
   def test_local_tz_conversion_ops(self, vector, unique_database):
     """IMPALA-5539: Test Kudu timestamp reads/writes are correct with the
        use_local_tz_for_unix_timestamp_conversions flag."""
@@ -53,6 +55,7 @@ class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="-kudu_master_hosts=")
+  @SkipIfKudu.hms_integration_enabled
   def test_kudu_master_hosts(self, cursor, kudu_client):
     """Check behavior when -kudu_master_hosts is not provided to catalogd."""
     with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
@@ -74,6 +77,7 @@ class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="-kudu_error_buffer_size=1024")
+  @SkipIfKudu.hms_integration_enabled
   def test_error_buffer_size(self, cursor, unique_database):
     """Check that queries fail if the size of the Kudu client errors they generate is
     greater than kudu_error_buffer_size."""
@@ -104,6 +108,84 @@ class TestKuduClientTimeout(CustomClusterTestSuite, KuduTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="-kudu_operation_timeout_ms=1")
+  @SkipIfKudu.hms_integration_enabled
   def test_impalad_timeout(self, vector):
     """Check impalad behavior when -kudu_operation_timeout_ms is too low."""
     self.run_test_case('QueryTest/kudu-timeouts-impalad', vector)
+
+
+class TestKuduHMSIntegration(CustomClusterTestSuite, KuduTestSuite):
+  """Tests the different DDL operations when using a kudu table with Kudu's integration
+     with the Hive Metastore."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @SkipIfKudu.no_hybrid_clock
+  def test_create_managed_kudu_tables(self, vector, unique_database):
+    """Tests the Create table operation when using a kudu table with Kudu's integration
+       with the Hive Metastore for managed tables."""
+    vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
+    self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database)
+
+  @pytest.mark.execute_serially
+  def test_implicit_external_table_props(self, cursor, kudu_client):
+    """Check that table properties added internally for external table during
+       table creation are as expected.
+    """
+    db_name = cursor.conn.db_name
+    with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table:
+      impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
+      external_table_name = "%s_external" % impala_table_name
+      props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
+      cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
+          external_table_name, props))
+      with self.drop_impala_table_after_context(cursor, external_table_name):
+        cursor.execute("DESCRIBE FORMATTED %s" % external_table_name)
+        table_desc = [[col.strip() if col else col for col in row] for row in cursor]
+        # Pytest shows truncated output on failure, so print the details just in case.
+        LOG.info(table_desc)
+        assert not any("kudu.table_id" in s for s in table_desc)
+        assert any("Owner:" in s for s in table_desc)
+        assert ["", "EXTERNAL", "TRUE"] in table_desc
+        assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc
+        assert ["", "kudu.table_name", kudu_table.name] in table_desc
+        assert ["", "storage_handler", "org.apache.kudu.hive.KuduStorageHandler"] \
+            in table_desc
+
+  @pytest.mark.execute_serially
+  def test_implicit_managed_table_props(self, cursor, kudu_client, unique_database):
+    """Check that table properties added internally for managed table during table
+       creation are as expected.
+    """
+    cursor.execute("""CREATE TABLE %s.foo (a INT PRIMARY KEY, s STRING)
+        PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % unique_database)
+    assert kudu_client.table_exists(
+      KuduTestSuite.to_kudu_table_name(unique_database, "foo"))
+    cursor.execute("DESCRIBE FORMATTED %s.foo" % unique_database)
+    table_desc = [[col.strip() if col else col for col in row] for row in cursor]
+    # Pytest shows truncated output on failure, so print the details just in case.
+    LOG.info(table_desc)
+    assert not any("EXTERNAL" in s for s in table_desc)
+    assert any("Owner:" in s for s in table_desc)
+    assert any("kudu.table_id" in s for s in table_desc)
+    assert any("kudu.master_addresses" in s for s in table_desc)
+    assert ["Table Type:", "MANAGED_TABLE", None] in table_desc
+    assert ["", "kudu.table_name", "%s.foo" % unique_database] in table_desc
+    assert ["", "storage_handler", "org.apache.kudu.hive.KuduStorageHandler"] \
+        in table_desc
+
+  @classmethod
+  def setup_class(cls):
+    # Restart Kudu cluster with HMS integration enabled
+    KUDU_ARGS = "-hive_metastore_uris=thrift://%s" % os.environ['INTERNAL_LISTEN_HOST']
+    cls._restart_kudu_service(KUDU_ARGS)
+    super(TestKuduHMSIntegration, cls).setup_class()
+
+  @classmethod
+  def teardown_class(cls):
+    # Restart Kudu cluster with HMS integration disabled
+    cls._restart_kudu_service("-hive_metastore_uris=")
+    super(TestKuduHMSIntegration, cls).teardown_class()
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index f00a6b5..549da6a 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -48,6 +48,7 @@ KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
 
 LOG = logging.getLogger(__name__)
 
+# TODO(IMPALA-8614): parameterize some tests to run with HMS integration enabled.
 class TestKuduOperations(KuduTestSuite):
   """
   This suite tests the different modification operations when using a kudu table.
@@ -61,6 +62,7 @@ class TestKuduOperations(KuduTestSuite):
     add_exec_option_dimension(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
 
   @SkipIfKudu.no_hybrid_clock
+  @SkipIfKudu.hms_integration_enabled
   def test_out_of_range_timestamps(self, vector, cursor, kudu_client, unique_database):
     """Test timestamp values that are outside of Impala's supported date range."""
     cursor.execute("""CREATE TABLE %s.times (a INT PRIMARY KEY, ts TIMESTAMP)
@@ -121,6 +123,7 @@ class TestKuduOperations(KuduTestSuite):
                       reason="Test references hardcoded hostnames: IMPALA-4873")
   @pytest.mark.execute_serially
   @SkipIfKudu.no_hybrid_clock
+  @SkipIfKudu.hms_integration_enabled
   def test_kudu_alter_table(self, vector, unique_database):
     self.run_test_case('QueryTest/kudu_alter', vector, use_db=unique_database)
 
@@ -332,6 +335,7 @@ class TestKuduOperations(KuduTestSuite):
     assert cursor.fetchall() == [(0, 0)]
 
   @SkipIfKudu.no_hybrid_clock
+  @SkipIfKudu.hms_integration_enabled
   def test_kudu_col_removed(self, cursor, kudu_client, unique_database):
     """Test removing a Kudu column outside of Impala."""
     cursor.execute("set kudu_read_mode=READ_AT_SNAPSHOT")
@@ -506,6 +510,7 @@ class TestKuduOperations(KuduTestSuite):
 
 class TestCreateExternalTable(KuduTestSuite):
 
+  @SkipIfKudu.hms_integration_enabled
   def test_external_timestamp_default_value(self, cursor, kudu_client, unique_database):
     """Checks that a Kudu table created outside Impala with a default value on a
        UNIXTIME_MICROS column can be loaded by Impala, and validates the DESCRIBE
@@ -538,6 +543,7 @@ class TestCreateExternalTable(KuduTestSuite):
       if kudu_client.table_exists(name):
         kudu_client.delete_table(name)
 
+  @SkipIfKudu.hms_integration_enabled
   def test_implicit_table_props(self, cursor, kudu_client):
     """Check that table properties added internally during table creation are as
        expected.
@@ -559,6 +565,7 @@ class TestCreateExternalTable(KuduTestSuite):
         assert ["", "storage_handler", "org.apache.kudu.hive.KuduStorageHandler"] \
             in table_desc
 
+  @SkipIfKudu.hms_integration_enabled
   def test_col_types(self, cursor, kudu_client):
     """Check that a table can be created using all available column types."""
     # TODO: Add DECIMAL when the Kudu python client supports decimal
@@ -577,6 +584,7 @@ class TestCreateExternalTable(KuduTestSuite):
           assert col_type.upper() == \
               self.kudu_col_type_to_impala_col_type(kudu_col.type.type)
 
+  @SkipIfKudu.hms_integration_enabled
   def test_unsupported_binary_col(self, cursor, kudu_client):
     """Check that external tables with BINARY columns fail gracefully.
     """
@@ -592,6 +600,7 @@ class TestCreateExternalTable(KuduTestSuite):
       except Exception as e:
         assert "Kudu type 'binary' is not supported in Impala" in str(e)
 
+  @SkipIfKudu.hms_integration_enabled
   def test_drop_external_table(self, cursor, kudu_client):
     """Check that dropping an external table only affects the catalog and does not delete
        the table in Kudu.
@@ -611,6 +620,7 @@ class TestCreateExternalTable(KuduTestSuite):
         assert "Could not resolve table reference" in str(e)
       assert kudu_client.table_exists(kudu_table.name)
 
+  @SkipIfKudu.hms_integration_enabled
   def test_explicit_name(self, cursor, kudu_client):
     """Check that a Kudu table can be specified using a table property."""
     with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
@@ -623,6 +633,7 @@ class TestCreateExternalTable(KuduTestSuite):
         cursor.execute("SELECT * FROM %s" % table_name)
         assert len(cursor.fetchall()) == 0
 
+  @SkipIfKudu.hms_integration_enabled
   def test_explicit_name_preference(self, cursor, kudu_client):
     """Check that the table name from a table property is used when a table of the
        implied name also exists.
@@ -641,6 +652,7 @@ class TestCreateExternalTable(KuduTestSuite):
               [("a", "bigint", "", "true", "false", "", "AUTO_ENCODING",
                 "DEFAULT_COMPRESSION", "0")]
 
+  @SkipIfKudu.hms_integration_enabled
   def test_explicit_name_doesnt_exist(self, cursor, kudu_client):
     kudu_table_name = self.random_table_name()
     try:
@@ -653,6 +665,7 @@ class TestCreateExternalTable(KuduTestSuite):
     except Exception as e:
       assert "Table does not exist in Kudu: '%s'" % kudu_table_name in str(e)
 
+  @SkipIfKudu.hms_integration_enabled
   def test_explicit_name_doesnt_exist_but_implicit_does(self, cursor, kudu_client):
     """Check that when an explicit table name is given but that table doesn't exist,
        there is no fall-through to an existing implicit table.
@@ -670,6 +683,7 @@ class TestCreateExternalTable(KuduTestSuite):
         assert "Table does not exist in Kudu: '%s'" % table_name in str(e)
 
   @SkipIfKudu.no_hybrid_clock
+  @SkipIfKudu.hms_integration_enabled
   def test_table_without_partitioning(self, cursor, kudu_client, unique_database):
     """Test a Kudu table created without partitioning (i.e. equivalent to a single
        unbounded partition). It is not possible to create such a table in Impala, but
@@ -705,6 +719,7 @@ class TestCreateExternalTable(KuduTestSuite):
         kudu_client.delete_table(name)
 
   @SkipIfKudu.no_hybrid_clock
+  @SkipIfKudu.hms_integration_enabled
   def test_column_name_case(self, cursor, kudu_client, unique_database):
     """IMPALA-5286: Tests that an external Kudu table that was created with a column name
        containing upper case letters is handled correctly."""
@@ -760,6 +775,7 @@ class TestCreateExternalTable(KuduTestSuite):
       if kudu_client.table_exists(table_name):
         kudu_client.delete_table(table_name)
 
+  @SkipIfKudu.hms_integration_enabled
   def test_conflicting_column_name(self, cursor, kudu_client, unique_database):
     """IMPALA-5283: Tests that loading an external Kudu table that was created with column
        names that differ only in case results in an error."""
@@ -804,6 +820,7 @@ class TestShowCreateTable(KuduTestSuite):
     assert cursor.fetchall()[0][0] == \
         textwrap.dedent(show_create_sql.format(**format_args)).strip()
 
+  @SkipIfKudu.hms_integration_enabled
   def test_primary_key_and_distribution(self, cursor):
     # TODO: Add case with BLOCK_SIZE
     self.assert_show_create_equals(cursor,
@@ -904,6 +921,7 @@ class TestShowCreateTable(KuduTestSuite):
             db=cursor.conn.db_name, p=self.column_properties,
             kudu_addr=KUDU_MASTER_HOSTS))
 
+  @SkipIfKudu.hms_integration_enabled
   def test_timestamp_default_value(self, cursor):
     create_sql_fmt = """
         CREATE TABLE {table} (c INT, d TIMESTAMP,
@@ -934,6 +952,7 @@ class TestShowCreateTable(KuduTestSuite):
       create_sql_fmt % ("2009-01-01 00:00:00.000000999"),
       show_create_sql_fmt % ("1230768000000001"))
 
+  @SkipIfKudu.hms_integration_enabled
   def test_external_kudu_table_name_with_show_create(self, cursor, kudu_client,
       unique_database):
     """Check that the generated kudu.table_name tblproperty is present with
@@ -967,6 +986,7 @@ class TestShowCreateTable(KuduTestSuite):
       if kudu_client.table_exists(kudu_table_name):
         kudu_client.delete_table(kudu_table_name)
 
+  @SkipIfKudu.hms_integration_enabled
   def test_managed_kudu_table_name_with_show_create(self, cursor):
     """Check that the generated kudu.table_name tblproperty is not present with
        show create table with managed Kudu tables.
@@ -988,6 +1008,7 @@ class TestShowCreateTable(KuduTestSuite):
 
 class TestDropDb(KuduTestSuite):
 
+  @SkipIfKudu.hms_integration_enabled
   def test_drop_non_empty_db(self, unique_cursor, kudu_client):
     """Check that an attempt to drop a database will fail if Kudu tables are present
        and that the tables remain.
@@ -1007,6 +1028,7 @@ class TestDropDb(KuduTestSuite):
       unique_cursor.execute("SELECT COUNT(*) FROM %s.%s" % (db_name, impala_table_name))
       assert unique_cursor.fetchall() == [(0, )]
 
+  @SkipIfKudu.hms_integration_enabled
   def test_drop_db_cascade(self, unique_cursor, kudu_client):
     """Check that an attempt to drop a database will succeed even if Kudu tables are
        present and that the managed tables are removed.
@@ -1040,6 +1062,7 @@ class TestDropDb(KuduTestSuite):
       assert not kudu_client.table_exists(managed_table_name)
 
 class TestImpalaKuduIntegration(KuduTestSuite):
+  @SkipIfKudu.hms_integration_enabled
   def test_replace_kudu_table(self, cursor, kudu_client):
     """Check that an external Kudu table is accessible if the underlying Kudu table is
         modified using the Kudu client.
@@ -1077,6 +1100,7 @@ class TestImpalaKuduIntegration(KuduTestSuite):
               "DEFAULT_COMPRESSION", "0")]
 
   @SkipIfCatalogV2.impala_8459()
+  @SkipIfKudu.hms_integration_enabled
   def test_delete_external_kudu_table(self, cursor, kudu_client):
     """Check that Impala can recover from the case where the underlying Kudu table of
         an external table is dropped using the Kudu client.
@@ -1104,6 +1128,7 @@ class TestImpalaKuduIntegration(KuduTestSuite):
       assert (impala_table_name,) not in cursor.fetchall()
 
   @SkipIfCatalogV2.impala_8459()
+  @SkipIfKudu.hms_integration_enabled
   def test_delete_managed_kudu_table(self, cursor, kudu_client, unique_database):
     """Check that dropping a managed Kudu table works even if the underlying Kudu table
         has been dropped externally."""


[impala] 02/02: IMPALA-8578: part 1: reduce dependencies on *metrics.h

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e78e6f0c260b342c67e055656d239438bca13288
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Fri May 31 23:11:44 2019 -0700

    IMPALA-8578: part 1: reduce dependencies on *metrics.h
    
    Before this patch there were 100s of compilation units that pulled in
    metrics.h and the significant amount of code in that header. It was
    painfully slow to recompile after changes to that file. The patch
    reduces that significantly and mostly eliminates transitive inclusions
    via other headers.
    
    * Add metrics-fwds.h with forward declarations needed to have pointers
      to the various classes.
    * Update headers to use metrics-fwds.h and move includes of *metrics.h
      to the .cc files.
    * Add includes, etc to fix compilation errors where files depended
      on transitively-included headers from *metrics.h
    
    This shaved about 30s off the build time on Jenkins - about a 4%
    speedup.
    
    I didn't end up removing anything from the headers - that is a bit
    more work since most of the classes are templatized and need to
    be explicitly instantiated in .cc files if functions are not
    all defined in the headers.
    
    Testing:
    Ran a core build
    
    Change-Id: Ie2942366cab5421f2db7c27e7da712ea6f775fdb
    Reviewed-on: http://gerrit.cloudera.org:8080/13491
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Tim Armstrong <ta...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |  2 +
 be/src/catalog/catalog-server.h                    |  2 +-
 be/src/exec/exec-node.h                            |  1 +
 be/src/exec/hash-table.h                           |  1 +
 be/src/exec/hdfs-orc-scanner.h                     |  2 +
 be/src/exec/hdfs-scan-node-base.cc                 |  3 +
 be/src/exec/hdfs-table-sink.cc                     |  1 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  2 +
 be/src/exec/parquet/hdfs-parquet-table-writer.cc   |  1 +
 be/src/exec/parquet/parquet-column-readers.cc      |  1 +
 be/src/exec/plan-root-sink.cc                      |  1 +
 be/src/exec/scan-node.cc                           |  2 +
 be/src/experiments/data-provider-test.cc           |  1 +
 be/src/experiments/tuple-splitter-test.cc          |  1 +
 be/src/rpc/TAcceptQueueServer.cpp                  |  1 +
 be/src/rpc/TAcceptQueueServer.h                    |  4 +-
 be/src/rpc/thrift-server.h                         |  3 +-
 be/src/rpc/thrift-util.cc                          |  3 +-
 be/src/runtime/buffered-tuple-stream.cc            |  1 +
 be/src/runtime/bufferpool/buffer-allocator-test.cc |  1 +
 be/src/runtime/bufferpool/reservation-tracker.cc   |  1 +
 be/src/runtime/client-cache.cc                     |  1 +
 be/src/runtime/client-cache.h                      |  3 +-
 be/src/runtime/coordinator-backend-state.cc        |  1 +
 be/src/runtime/initial-reservations.cc             |  1 +
 be/src/runtime/io/data-cache.cc                    |  1 +
 be/src/runtime/io/disk-io-mgr.cc                   |  2 +
 be/src/runtime/io/handle-cache.h                   |  1 -
 be/src/runtime/io/handle-cache.inline.h            |  2 +
 be/src/runtime/io/hdfs-file-reader.cc              |  1 +
 be/src/runtime/io/local-file-reader.cc             |  1 +
 be/src/runtime/krpc-data-stream-mgr.cc             |  1 +
 be/src/runtime/krpc-data-stream-mgr.h              |  2 +-
 be/src/runtime/mem-pool.cc                         |  1 -
 be/src/runtime/mem-tracker.cc                      | 13 ++++
 be/src/runtime/mem-tracker.h                       | 17 ++----
 be/src/runtime/query-exec-mgr.cc                   |  5 +-
 be/src/runtime/query-state.cc                      |  1 +
 be/src/runtime/sorter.cc                           |  1 +
 be/src/runtime/tmp-file-mgr-test.cc                |  1 +
 be/src/runtime/tmp-file-mgr.cc                     |  1 +
 be/src/runtime/tmp-file-mgr.h                      |  2 +-
 be/src/scheduling/request-pool-service.cc          |  1 +
 be/src/scheduling/request-pool-service.h           |  2 +-
 be/src/scheduling/scheduler.h                      |  2 +-
 be/src/service/client-request-state.cc             |  1 +
 be/src/service/impala-beeswax-server.cc            |  1 -
 be/src/service/impala-hs2-server.cc                |  1 +
 be/src/service/impala-server.cc                    |  1 +
 be/src/service/impalad-main.cc                     |  1 -
 be/src/service/session-expiry-test.cc              |  1 +
 be/src/statestore/statestore-subscriber.cc         |  3 +
 be/src/statestore/statestore-subscriber.h          |  3 +-
 be/src/statestore/statestore.cc                    |  1 +
 be/src/statestore/statestore.h                     |  2 +-
 be/src/util/collection-metrics.h                   | 14 +----
 be/src/util/impalad-metrics.cc                     |  2 +
 be/src/util/impalad-metrics.h                      |  5 +-
 be/src/util/metrics-fwd.h                          | 69 ++++++++++++++++++++++
 be/src/util/metrics.h                              | 17 +-----
 60 files changed, 160 insertions(+), 61 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index dee1871..1fea888 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -29,6 +29,8 @@
 #include "util/debug-util.h"
 #include "util/event-metrics.h"
 #include "util/logging-support.h"
+#include "util/collection-metrics.h"
+#include "util/metrics.h"
 #include "util/webserver.h"
 
 #include "common/names.h"
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index bbd46f8..6e86368 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -30,7 +30,7 @@
 #include "catalog/catalog.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/condition-variable.h"
-#include "util/metrics.h"
+#include "util/metrics-fwd.h"
 #include "rapidjson/rapidjson.h"
 
 namespace impala {
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 8ec9f3d..339e77c 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -26,6 +26,7 @@
 #include "common/status.h"
 #include "exprs/scalar-expr-evaluator.h"
 #include "gen-cpp/PlanNodes_types.h"
+#include "gutil/threading/thread_collision_warner.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/descriptors.h" // for RowDescriptor
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index e98bdfd..b4a6905 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <vector>
 #include <boost/cstdint.hpp>
+#include <boost/scoped_array.hpp>
 #include <boost/scoped_ptr.hpp>
 
 #include "codegen/impala-ir.h"
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 965e2dc..78cd0a9 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -19,6 +19,8 @@
 #ifndef IMPALA_EXEC_HDFS_ORC_SCANNER_H
 #define IMPALA_EXEC_HDFS_ORC_SCANNER_H
 
+#include <stack>
+
 #include <orc/OrcFile.hh>
 
 #include "runtime/exec-env.h"
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 20cf908..04866bb 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -45,7 +45,10 @@
 #include "runtime/runtime-state.h"
 #include "util/disk-info.h"
 #include "util/hdfs-util.h"
+#include "util/impalad-metrics.h"
+#include "util/metrics.h"
 #include "util/periodic-counter-updater.h"
+#include "util/pretty-printer.h"
 #include "util/scope-exit-trigger.h"
 
 #include "common/names.h"
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 23993e8..dab9992 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -32,6 +32,7 @@
 #include "util/coding-util.h"
 #include "util/hdfs-util.h"
 #include "util/impalad-metrics.h"
+#include "util/metrics.h"
 
 #include <limits>
 #include <vector>
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 27e5ca4..613d1a5 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <queue>
+#include <stack>
 
 #include <gflags/gflags.h>
 #include <gutil/strings/substitute.h>
@@ -36,6 +37,7 @@
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "util/dict-encoding.h"
+#include "util/pretty-printer.h"
 #include "util/scope-exit-trigger.h"
 
 #include "common/names.h"
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index 0c01395..b3c1967 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -40,6 +40,7 @@
 #include "util/debug-util.h"
 #include "util/dict-encoding.h"
 #include "util/hdfs-util.h"
+#include "util/pretty-printer.h"
 #include "util/rle-encoding.h"
 #include "util/string-util.h"
 
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index 16cdc2c..e9a35b8 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -42,6 +42,7 @@
 #include "util/codec.h"
 #include "util/debug-util.h"
 #include "util/dict-encoding.h"
+#include "util/pretty-printer.h"
 #include "util/rle-encoding.h"
 
 #include "common/names.h"
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 89e442b..5f2a8e4 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -22,6 +22,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/tuple-row.h"
 #include "service/query-result-set.h"
+#include "util/pretty-printer.h"
 
 #include <memory>
 #include <boost/thread/mutex.hpp>
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index 039836c..ab05432 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -17,6 +17,7 @@
 
 #include "exec/scan-node.h"
 
+#include <boost/algorithm/string/join.hpp>
 #include <boost/bind.hpp>
 
 #include "exprs/scalar-expr.h"
@@ -28,6 +29,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/scanner-mem-limiter.h"
 #include "util/disk-info.h"
+#include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
diff --git a/be/src/experiments/data-provider-test.cc b/be/src/experiments/data-provider-test.cc
index 3c5a92f..2b64a7a 100644
--- a/be/src/experiments/data-provider-test.cc
+++ b/be/src/experiments/data-provider-test.cc
@@ -20,6 +20,7 @@
 #include <stdio.h>
 #include <iostream>
 
+#include "common/object-pool.h"
 #include "experiments/data-provider.h"
 #include "util/cpu-info.h"
 #include "runtime/mem-pool.h"
diff --git a/be/src/experiments/tuple-splitter-test.cc b/be/src/experiments/tuple-splitter-test.cc
index 7d68f8e..9eb6547 100644
--- a/be/src/experiments/tuple-splitter-test.cc
+++ b/be/src/experiments/tuple-splitter-test.cc
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "common/compiler-util.h"
+#include "common/object-pool.h"
 #include "experiments/data-provider.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 8d7babe..2a77662 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -24,6 +24,7 @@
 
 #include <thrift/concurrency/PlatformThreadFactory.h>
 
+#include "util/metrics.h"
 #include "util/thread-pool.h"
 
 DEFINE_int32(accepted_cnxn_queue_depth, 10000,
diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h
index f5f281f..8f16add 100644
--- a/be/src/rpc/TAcceptQueueServer.h
+++ b/be/src/rpc/TAcceptQueueServer.h
@@ -29,7 +29,7 @@
 
 #include <boost/shared_ptr.hpp>
 
-#include "util/metrics.h"
+#include "util/metrics-fwd.h"
 
 namespace apache {
 namespace thrift {
@@ -75,7 +75,7 @@ class TAcceptQueueServer : public TServer {
 
   // New - Adds a metric for the size of the queue of connections waiting to be setup to
   // the provided MetricGroup, prefixing its key with key_prefix.
-  void InitMetrics(impala::MetricGroup* metrics, const string& key_prefix);
+  void InitMetrics(impala::MetricGroup* metrics, const std::string& key_prefix);
 
  protected:
   void init();
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 764fbf2..b716593 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -25,10 +25,11 @@
 #include <boost/uuid/uuid_generators.hpp>
 #include <thrift/TProcessor.h>
 #include <thrift/server/TServer.h>
+#include <thrift/transport/TBufferTransports.h>
 #include <thrift/transport/TSSLSocket.h>
 
 #include "common/status.h"
-#include "util/metrics.h"
+#include "util/metrics-fwd.h"
 #include "util/thread.h"
 
 namespace impala {
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index c0d414b..2c3ebc4 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -23,8 +23,9 @@
 #include "util/hash-util.h"
 #include "util/time.h"
 #include "rpc/thrift-server.h"
-#include "gen-cpp/Types_types.h"
 #include "gen-cpp/Data_types.h"
+#include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/Types_types.h"
 
 // TCompactProtocol requires some #defines to work right.  They also define UNLIKELY
 // so we need to undef this.
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index e0cf854..2b831b4 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -31,6 +31,7 @@
 #include "runtime/tuple-row.h"
 #include "util/bit-util.h"
 #include "util/debug-util.h"
+#include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc b/be/src/runtime/bufferpool/buffer-allocator-test.cc
index 4c7fe1e..23bcde2 100644
--- a/be/src/runtime/bufferpool/buffer-allocator-test.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -27,6 +27,7 @@
 #include "testutil/cpu-util.h"
 #include "testutil/gtest-util.h"
 #include "util/cpu-info.h"
+#include "util/metrics.h"
 
 #include "common/names.h"
 
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index a920e1c..205c6f6 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -26,6 +26,7 @@
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "util/dummy-runtime-profile.h"
+#include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index d066d6b..26d0de3 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -26,6 +26,7 @@
 
 #include "common/logging.h"
 #include "util/container-util.h"
+#include "util/metrics.h"
 #include "util/network-util.h"
 #include "rpc/thrift-util.h"
 #include "gen-cpp/ImpalaInternalService.h"
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index 21b323d..0b164f1 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -31,7 +31,8 @@
 #include "rpc/thrift-util.h"
 #include "runtime/client-cache-types.h"
 #include "util/debug-util.h"
-#include "util/metrics.h"
+#include "util/metrics-fwd.h"
+#include "util/network-util.h"
 
 #include "common/status.h"
 
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 625ee5a..4935a7a 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -37,6 +37,7 @@
 #include "util/counting-barrier.h"
 #include "util/error-util-internal.h"
 #include "util/network-util.h"
+#include "util/pretty-printer.h"
 #include "util/scope-exit-trigger.h"
 #include "util/uid-util.h"
 
diff --git a/be/src/runtime/initial-reservations.cc b/be/src/runtime/initial-reservations.cc
index 48a911d..c5d225f 100644
--- a/be/src/runtime/initial-reservations.cc
+++ b/be/src/runtime/initial-reservations.cc
@@ -27,6 +27,7 @@
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "util/debug-util.h"
+#include "util/pretty-printer.h"
 
 #include "common/names.h"
 
diff --git a/be/src/runtime/io/data-cache.cc b/be/src/runtime/io/data-cache.cc
index 11f13b6..b81d30a 100644
--- a/be/src/runtime/io/data-cache.cc
+++ b/be/src/runtime/io/data-cache.cc
@@ -35,6 +35,7 @@
 #include "util/filesystem-util.h"
 #include "util/hash-util.h"
 #include "util/impalad-metrics.h"
+#include "util/metrics.h"
 #include "util/parse-util.h"
 #include "util/pretty-printer.h"
 #include "util/scope-exit-trigger.h"
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 6c256b0..ed6b595 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -32,6 +32,8 @@
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
 #include "util/hdfs-util.h"
+#include "util/collection-metrics.h"
+#include "util/metrics.h"
 #include "util/time.h"
 
 #ifndef NDEBUG
diff --git a/be/src/runtime/io/handle-cache.h b/be/src/runtime/io/handle-cache.h
index 5bd43e0..cc873d7 100644
--- a/be/src/runtime/io/handle-cache.h
+++ b/be/src/runtime/io/handle-cache.h
@@ -28,7 +28,6 @@
 #include "common/hdfs.h"
 #include "common/status.h"
 #include "util/aligned-new.h"
-#include "util/impalad-metrics.h"
 #include "util/spinlock.h"
 #include "util/thread.h"
 
diff --git a/be/src/runtime/io/handle-cache.inline.h b/be/src/runtime/io/handle-cache.inline.h
index 8db7730..b5f184b 100644
--- a/be/src/runtime/io/handle-cache.inline.h
+++ b/be/src/runtime/io/handle-cache.inline.h
@@ -20,6 +20,8 @@
 #include "runtime/io/handle-cache.h"
 #include "runtime/io/hdfs-monitored-ops.h"
 #include "util/hash-util.h"
+#include "util/impalad-metrics.h"
+#include "util/metrics.h"
 #include "util/time.h"
 
 #ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H
diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc
index d83887c..93f1c09 100644
--- a/be/src/runtime/io/hdfs-file-reader.cc
+++ b/be/src/runtime/io/hdfs-file-reader.cc
@@ -25,6 +25,7 @@
 #include "runtime/io/request-ranges.h"
 #include "util/hdfs-util.h"
 #include "util/impalad-metrics.h"
+#include "util/metrics.h"
 #include "util/scope-exit-trigger.h"
 
 #include "common/names.h"
diff --git a/be/src/runtime/io/local-file-reader.cc b/be/src/runtime/io/local-file-reader.cc
index 9be45a9..acdd2ea 100644
--- a/be/src/runtime/io/local-file-reader.cc
+++ b/be/src/runtime/io/local-file-reader.cc
@@ -21,6 +21,7 @@
 #include "runtime/io/local-file-reader.h"
 #include "runtime/io/request-ranges.h"
 #include "util/impalad-metrics.h"
+#include "util/metrics.h"
 
 #include "common/names.h"
 
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 0ac19b5..07047c4 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -35,6 +35,7 @@
 #include "service/data-stream-service.h"
 #include "util/container-util.h"
 #include "util/debug-util.h"
+#include "util/metrics.h"
 #include "util/periodic-counter-updater.h"
 #include "util/runtime-profile-counters.h"
 #include "util/uid-util.h"
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index cb798e6..eb371c4 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -30,7 +30,7 @@
 #include "common/object-pool.h"
 #include "runtime/descriptors.h"  // for PlanNodeId
 #include "runtime/row-batch.h"
-#include "util/metrics.h"
+#include "util/metrics-fwd.h"
 #include "util/promise.h"
 #include "util/runtime-profile.h"
 #include "util/thread-pool.h"
diff --git a/be/src/runtime/mem-pool.cc b/be/src/runtime/mem-pool.cc
index da8e865..78fe46c 100644
--- a/be/src/runtime/mem-pool.cc
+++ b/be/src/runtime/mem-pool.cc
@@ -18,7 +18,6 @@
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "util/bit-util.h"
-#include "util/impalad-metrics.h"
 
 #include <algorithm>
 #include <stdio.h>
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index a2e2295..f9ba8fe 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -27,6 +27,7 @@
 #include "runtime/runtime-state.h"
 #include "util/debug-util.h"
 #include "util/mem-info.h"
+#include "util/metrics.h"
 #include "util/pretty-printer.h"
 #include "util/test-info.h"
 #include "util/uid-util.h"
@@ -166,6 +167,11 @@ int64_t MemTracker::SpareCapacity(MemLimit mode) const {
   return result;
 }
 
+void MemTracker::RefreshConsumptionFromMetric() {
+  DCHECK(consumption_metric_ != nullptr);
+  consumption_->Set(consumption_metric_->GetValue());
+}
+
 int64_t MemTracker::GetPoolMemReserved() {
   // Pool trackers should have a pool_name_ and no limit.
   DCHECK(!pool_name_.empty());
@@ -453,6 +459,13 @@ void MemTracker::AddGcFunction(GcFunction f) {
   gc_functions_.push_back(f);
 }
 
+bool MemTracker::LimitExceededSlow(MemLimit mode) {
+  if (mode == MemLimit::HARD && bytes_over_limit_metric_ != nullptr) {
+    bytes_over_limit_metric_->SetValue(consumption() - limit_);
+  }
+  return GcMemory(GetLimit(mode));
+}
+
 bool MemTracker::GcMemory(int64_t max_consumption) {
   if (max_consumption < 0) return true;
   lock_guard<mutex> l(gc_lock_);
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 8a66228..a0c1df7 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -32,7 +32,7 @@
 #include "runtime/mem-tracker-types.h"
 #include "util/debug-util.h"
 #include "util/internal-queue.h"
-#include "util/metrics.h"
+#include "util/metrics-fwd.h"
 #include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
 
@@ -262,12 +262,7 @@ class MemTracker {
   /// limit is exceeded after calling the GC functions. Returns false if there is no limit
   /// or consumption is under the limit.
   bool LimitExceeded(MemLimit mode) {
-    if (UNLIKELY(CheckLimitExceeded(mode))) {
-      if (mode == MemLimit::HARD && bytes_over_limit_metric_ != nullptr) {
-        bytes_over_limit_metric_->SetValue(consumption() - limit_);
-      }
-      return GcMemory(GetLimit(mode));
-    }
+    if (UNLIKELY(CheckLimitExceeded(mode))) return LimitExceededSlow(mode);
     return false;
   }
 
@@ -278,10 +273,7 @@ class MemTracker {
 
   /// Refresh the memory consumption value from the consumption metric. Only valid to
   /// call if this tracker has a consumption metric.
-  void RefreshConsumptionFromMetric() {
-    DCHECK(consumption_metric_ != nullptr);
-    consumption_->Set(consumption_metric_->GetValue());
-  }
+  void RefreshConsumptionFromMetric();
 
   int64_t limit() const { return limit_; }
   bool has_limit() const { return limit_ >= 0; }
@@ -377,6 +369,9 @@ class MemTracker {
     return limit >= 0 && limit < consumption();
   }
 
+  /// Slow path for LimitExceeded().
+  bool LimitExceededSlow(MemLimit mode);
+
   /// If consumption is higher than max_consumption, attempts to free memory by calling
   /// any added GC functions.  Returns true if max_consumption is still exceeded. Takes
   /// gc_lock. Updates metrics if initialized.
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 7dab8a2..25c5143 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -28,10 +28,11 @@
 #include "runtime/fragment-instance-state.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "util/debug-util.h"
+#include "util/impalad-metrics.h"
+#include "util/metrics.h"
 #include "util/uid-util.h"
 #include "util/thread.h"
-#include "util/impalad-metrics.h"
-#include "util/debug-util.h"
 
 #include "common/names.h"
 
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index bdbe983..7df1c4c 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -42,6 +42,7 @@
 #include "service/control-service.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
+#include "util/metrics.h"
 #include "util/system-state-info.h"
 #include "util/thread.h"
 
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index e3d828c..ee210dd 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -27,6 +27,7 @@
 #include "runtime/query-state.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
+#include "util/pretty-printer.h"
 #include "util/ubsan.h"
 
 #include "common/names.h"
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index f838c47..c017ef7 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -34,6 +34,7 @@
 #include "util/condition-variable.h"
 #include "util/cpu-info.h"
 #include "util/filesystem-util.h"
+#include "util/collection-metrics.h"
 #include "util/metrics.h"
 
 #include "gen-cpp/Types_types.h"  // for TUniqueId
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 372e301..8c61cf7 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -31,6 +31,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/tmp-file-mgr-internal.h"
 #include "util/bit-util.h"
+#include "util/collection-metrics.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index a22a792..9a38e4f 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -28,9 +28,9 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "gen-cpp/Types_types.h" // for TUniqueId
-#include "util/collection-metrics.h"
 #include "util/condition-variable.h"
 #include "util/mem-range.h"
+#include "util/metrics-fwd.h"
 #include "util/openssl-util.h"
 #include "util/runtime-profile.h"
 #include "util/spinlock.h"
diff --git a/be/src/scheduling/request-pool-service.cc b/be/src/scheduling/request-pool-service.cc
index 6edb6fd..cd78793 100644
--- a/be/src/scheduling/request-pool-service.cc
+++ b/be/src/scheduling/request-pool-service.cc
@@ -28,6 +28,7 @@
 #include "rpc/jni-thrift-util.h"
 #include "service/query-options.h"
 #include "util/auth-util.h"
+#include "util/collection-metrics.h"
 #include "util/mem-info.h"
 #include "util/parse-util.h"
 #include "util/time.h"
diff --git a/be/src/scheduling/request-pool-service.h b/be/src/scheduling/request-pool-service.h
index 935bae8..02642c4 100644
--- a/be/src/scheduling/request-pool-service.h
+++ b/be/src/scheduling/request-pool-service.h
@@ -22,7 +22,7 @@
 
 #include "gen-cpp/ImpalaInternalService.h"
 #include "common/status.h"
-#include "util/collection-metrics.h"
+#include "util/metrics-fwd.h"
 
 namespace impala {
 
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index ebabeb9..502029a 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -37,7 +37,7 @@
 #include "scheduling/query-schedule.h"
 #include "scheduling/request-pool-service.h"
 #include "statestore/statestore-subscriber.h"
-#include "util/metrics.h"
+#include "util/metrics-fwd.h"
 #include "util/network-util.h"
 #include "util/runtime-profile.h"
 
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index d3d3a04..f4dd448 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -38,6 +38,7 @@
 #include "service/query-result-set.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
+#include "util/metrics.h"
 #include "util/runtime-profile-counters.h"
 #include "util/time.h"
 
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 48bb26c..51eef3a0 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -29,7 +29,6 @@
 #include "service/query-options.h"
 #include "service/query-result-set.h"
 #include "util/auth-util.h"
-#include "util/impalad-metrics.h"
 #include "util/webserver.h"
 #include "util/runtime-profile.h"
 #include "util/runtime-profile-counters.h"
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 0ad3750..34a0de4 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -48,6 +48,7 @@
 #include "util/auth-util.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
+#include "util/metrics.h"
 #include "util/runtime-profile-counters.h"
 #include "util/string-parser.h"
 
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ac014e2..8200445 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -73,6 +73,7 @@
 #include "util/histogram-metric.h"
 #include "util/impalad-metrics.h"
 #include "util/lineage-util.h"
+#include "util/metrics.h"
 #include "util/network-util.h"
 #include "util/openssl-util.h"
 #include "util/parse-util.h"
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 9d44773..e76af56 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -44,7 +44,6 @@
 #include "service/fe-support.h"
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/ImpalaInternalService.h"
-#include "util/impalad-metrics.h"
 #include "util/thread.h"
 
 #include "common/names.h"
diff --git a/be/src/service/session-expiry-test.cc b/be/src/service/session-expiry-test.cc
index 7e5b86a..6251dd4 100644
--- a/be/src/service/session-expiry-test.cc
+++ b/be/src/service/session-expiry-test.cc
@@ -26,6 +26,7 @@
 #include "testutil/in-process-servers.h"
 #include "util/asan.h"
 #include "util/impalad-metrics.h"
+#include "util/metrics.h"
 #include "util/time.h"
 
 #include "common/names.h"
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 7dd6874..9ed64e2 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -34,8 +34,11 @@
 #include "rpc/thrift-util.h"
 #include "statestore/statestore-service-client-wrapper.h"
 #include "util/container-util.h"
+#include "util/collection-metrics.h"
 #include "util/debug-util.h"
+#include "util/metrics.h"
 #include "util/openssl-util.h"
+#include "util/collection-metrics.h"
 #include "util/time.h"
 
 #include "common/names.h"
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 233b7a7..cc566a9 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -32,8 +32,9 @@
 #include "rpc/thrift-util.h"
 #include "statestore/statestore.h"
 #include "statestore/statestore-service-client-wrapper.h"
-#include "util/metrics.h"
 #include "util/stopwatch.h"
+#include "gen-cpp/StatestoreService.h"
+#include "gen-cpp/StatestoreSubscriber.h"
 
 namespace impala {
 
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 6c93116..d7eb152 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -36,6 +36,7 @@
 #include "util/container-util.h"
 #include "util/debug-util.h"
 #include "util/logging-support.h"
+#include "util/metrics.h"
 #include "util/openssl-util.h"
 #include "util/test-info.h"
 #include "util/time.h"
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 378ae30..8bb6d1b 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -42,7 +42,7 @@
 #include "statestore/statestore-subscriber-client-wrapper.h"
 #include "util/aligned-new.h"
 #include "util/collection-metrics.h"
-#include "util/metrics.h"
+#include "util/metrics-fwd.h"
 #include "util/thread-pool.h"
 #include "util/webserver.h"
 
diff --git a/be/src/util/collection-metrics.h b/be/src/util/collection-metrics.h
index 4cf3330..a299554 100644
--- a/be/src/util/collection-metrics.h
+++ b/be/src/util/collection-metrics.h
@@ -119,18 +119,6 @@ class SetMetric : public Metric {
   std::set<T> value_;
 };
 
-/// Enum to define which statistic types are available in the StatsMetric
-struct StatsType {
-  enum type {
-    MIN = 1,
-    MAX = 2,
-    MEAN = 4,
-    STDDEV = 8,
-    COUNT = 16,
-    ALL = 31
-  };
-};
-
 /// Metric which accumulates min, max and mean of all values, plus a count of samples
 /// seen. The output can be controlled by passing a bitmask as a template parameter to
 /// indicate which values should be printed or returned as JSON.
@@ -140,7 +128,7 @@ struct StatsType {
 ///
 /// After construction, all statistics are ill-defined, but count will be 0. The first call
 /// to Update() will initialise all stats.
-template <typename T, int StatsSelection=StatsType::ALL>
+template <typename T, int StatsSelection>
 class StatsMetric : public Metric {
  public:
   static StatsMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index cdebe4c..4ced62d 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -18,7 +18,9 @@
 #include "util/impalad-metrics.h"
 
 #include "util/debug-util.h"
+#include "util/collection-metrics.h"
 #include "util/histogram-metric.h"
+#include "util/metrics.h"
 
 #include "common/names.h"
 
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index b39e4cc..6eeadac 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -19,13 +19,10 @@
 #ifndef IMPALA_UTIL_IMPALAD_METRICS_H
 #define IMPALA_UTIL_IMPALAD_METRICS_H
 
-#include "util/metrics.h"
-#include "util/collection-metrics.h"
+#include "util/metrics-fwd.h"
 
 namespace impala {
 
-class HistogramMetric;
-
 /// Contains the keys (strings) for impala metrics.
 class ImpaladMetricKeys {
  public:
diff --git a/be/src/util/metrics-fwd.h b/be/src/util/metrics-fwd.h
new file mode 100644
index 0000000..14786b9
--- /dev/null
+++ b/be/src/util/metrics-fwd.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Forward definitions of metric-related classes. A lot of metric-related
+// code is templatised and thus needs to be in headers, but in most headers
+// that depend on metrics, only forward declarations of those classes are
+// needed. This header should be included in those cases to reduce
+// compilation time and dependencies.
+
+#pragma once
+
+#include "gen-cpp/MetricDefs_types.h"
+#include "gen-cpp/MetricDefs_constants.h"
+
+namespace impala {
+
+class AtomicHighWaterMarkGauge;
+class HistogramMetric;
+class Metric;
+class MetricGroup;
+class NegatedGauge;
+class SumGauge;
+
+/// Enum to define which statistic types are available in the StatsMetric
+struct StatsType {
+  enum type {
+    MIN = 1,
+    MAX = 2,
+    MEAN = 4,
+    STDDEV = 8,
+    COUNT = 16,
+    ALL = 31
+  };
+};
+
+template<typename T, TMetricKind::type metric_kind_t>
+class ScalarMetric;
+template<typename T, TMetricKind::type metric_kind_t>
+class LockedMetric;
+template<TMetricKind::type metric_kind_t>
+class AtomicMetric;
+template <typename T>
+class SetMetric;
+template <typename T, int StatsSelection=StatsType::ALL>
+class StatsMetric;
+
+typedef class LockedMetric<bool, TMetricKind::PROPERTY> BooleanProperty;
+typedef class LockedMetric<std::string,TMetricKind::PROPERTY> StringProperty;
+typedef class LockedMetric<double, TMetricKind::GAUGE> DoubleGauge;
+
+/// We write 'Int' as a placeholder for all integer types.
+typedef class AtomicMetric<TMetricKind::GAUGE> IntGauge;
+typedef class AtomicMetric<TMetricKind::COUNTER> IntCounter;
+
+}
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index b319f35..7c7831d 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef IMPALA_UTIL_METRICS_H
-#define IMPALA_UTIL_METRICS_H
+#pragma once
 
 #include <map>
 #include <sstream>
@@ -34,13 +33,11 @@
 #include "common/status.h"
 #include "util/debug-util.h"
 #include "util/json-util.h"
+#include "util/metrics-fwd.h"
 #include "util/pretty-printer.h"
 #include "util/spinlock.h"
 #include "util/webserver.h"
 
-#include "gen-cpp/MetricDefs_types.h"
-#include "gen-cpp/MetricDefs_constants.h"
-
 namespace impala {
 
 /// Singleton that provides metric definitions. Metrics are defined in metrics.json
@@ -267,10 +264,6 @@ class LockedMetric : public ScalarMetric<T, metric_kind_t> {
   T value_;
 };
 
-typedef class LockedMetric<bool, TMetricKind::PROPERTY> BooleanProperty;
-typedef class LockedMetric<std::string,TMetricKind::PROPERTY> StringProperty;
-typedef class LockedMetric<double, TMetricKind::GAUGE> DoubleGauge;
-
 /// An implementation of 'gauge' or 'counter' metric kind. The metric can be incremented
 /// atomically via the Increment() interface.
 template<TMetricKind::type metric_kind_t>
@@ -304,10 +297,6 @@ class AtomicMetric : public ScalarMetric<int64_t, metric_kind_t> {
   AtomicInt64 value_;
 };
 
-/// We write 'Int' as a placeholder for all integer types.
-typedef class AtomicMetric<TMetricKind::GAUGE> IntGauge;
-typedef class AtomicMetric<TMetricKind::COUNTER> IntCounter;
-
 /// An AtomicMetric that keeps track of the highest value seen and the current value.
 ///
 /// Implementation notes:
@@ -559,5 +548,3 @@ class MetricGroup {
 TMetricDef MakeTMetricDef(const std::string& key, TMetricKind::type kind,
     TUnit::type unit);
 }
-
-#endif // IMPALA_UTIL_METRICS_H