You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/02/06 05:18:19 UTC

[impala] branch master updated (f9ced75 -> 24eab71)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from f9ced75  IMPALA-7999: clean up start-*d.sh scripts
     new b571409  IMPALA-7694: Add host resource usage metrics to profile
     new efd2478  IMPALA-8104: IF NOT EXISTS to ALTER TABLE ADD COLUMN(S)
     new 72b8452  IMPALA-8091 addendum: use absolute path for ntp-wait
     new 52af0bf  Enable full stacktrace logging with surefire
     new 2f5d001  test-with-docker: decrease image size by "de-duping" HDFS.
     new 14996f4  IMPALA-8103: In Analyzed Query use /* and */ to delimit hints.
     new ae96a9f  IMPALA-8151: Use sizeof() in HiveUdfCall to specify non-primitive type's size
     new 24eab71  IMPALA-7928: Consistent remote read scheduling

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/experiments/CMakeLists.txt                  |   2 +
 be/src/experiments/hash-ring-util.cc               | 101 +++++++++
 be/src/exprs/hive-udf-call.cc                      |   4 +-
 be/src/kudu/util/logging.h                         |   2 +-
 be/src/runtime/coordinator-backend-state.cc        |  12 +-
 be/src/runtime/coordinator-backend-state.h         |  14 +-
 be/src/runtime/coordinator.cc                      |  10 +-
 be/src/runtime/coordinator.h                       |   5 +
 be/src/runtime/exec-env.cc                         |  11 +
 be/src/runtime/exec-env.h                          |   8 +
 be/src/runtime/fragment-instance-state.cc          |   4 +-
 be/src/runtime/krpc-data-stream-recvr.cc           |   2 +-
 be/src/runtime/query-state.cc                      |  38 +++-
 be/src/runtime/query-state.h                       |   3 +
 be/src/runtime/runtime-state.cc                    |   7 +-
 be/src/scheduling/CMakeLists.txt                   |   2 +
 be/src/scheduling/backend-config.cc                |  17 +-
 be/src/scheduling/backend-config.h                 |  13 +-
 be/src/scheduling/hash-ring-test.cc                | 230 +++++++++++++++++++++
 be/src/scheduling/hash-ring.cc                     | 125 +++++++++++
 be/src/scheduling/hash-ring.h                      | 115 +++++++++++
 be/src/scheduling/scheduler-test-util.cc           |   4 +
 be/src/scheduling/scheduler-test-util.h            |   9 +-
 be/src/scheduling/scheduler-test.cc                | 111 +++++++++-
 be/src/scheduling/scheduler.cc                     |  65 +++++-
 be/src/scheduling/scheduler.h                      |  12 +-
 be/src/service/impala-server.cc                    |   9 +
 be/src/service/impala-server.h                     |   5 +
 be/src/service/query-options.cc                    |  32 ++-
 be/src/service/query-options.h                     |   5 +-
 be/src/util/CMakeLists.txt                         |   2 +
 be/src/util/periodic-counter-updater.cc            |  16 +-
 be/src/util/periodic-counter-updater.h             |  26 ++-
 be/src/util/pretty-printer.h                       |  10 +-
 be/src/util/runtime-profile-counters.h             | 133 +++++++++---
 be/src/util/runtime-profile-test.cc                | 175 ++++++++++++++++
 be/src/util/runtime-profile.cc                     | 199 ++++++++++++++----
 be/src/util/runtime-profile.h                      |  80 +++++--
 be/src/util/streaming-sampler.h                    |  60 +-----
 be/src/util/system-state-info-test.cc              |  91 ++++++++
 be/src/util/system-state-info.cc                   | 110 ++++++++++
 be/src/util/system-state-info.h                    |  94 +++++++++
 bin/parse-thrift-profile.py                        |  28 +--
 common/thrift/ImpalaInternalService.thrift         |  11 +
 common/thrift/ImpalaService.thrift                 |  18 +-
 common/thrift/Metrics.thrift                       |   3 +
 common/thrift/RuntimeProfile.thrift                |   7 +
 docker/entrypoint.sh                               |  18 ++
 docs/topics/impala_alter_table.xml                 |  44 ++--
 fe/pom.xml                                         |   1 +
 .../org/apache/impala/analysis/BaseTableRef.java   |   2 +-
 .../org/apache/impala/analysis/InsertStmt.java     |   4 +-
 .../org/apache/impala/analysis/SelectStmt.java     |   3 +-
 .../java/org/apache/impala/analysis/TableRef.java  |   3 +-
 .../org/apache/impala/analysis/ToSqlUtils.java     |  14 +-
 .../java/org/apache/impala/common/PrintUtils.java  |  15 +-
 .../java/org/apache/impala/analysis/ToSqlTest.java |  33 ++-
 .../org/apache/impala/util/PrintUtilsTest.java     |  11 -
 .../python/impala_py_lib/profiles.py               |  45 +---
 testdata/cluster/admin                             |   6 +-
 .../queries/PlannerTest/max-row-size.test          |  21 +-
 .../queries/PlannerTest/resource-requirements.test |  93 ++++-----
 .../PlannerTest/spillable-buffer-sizing.test       |  82 +++-----
 tests/beeswax/impala_beeswax.py                    |   9 +-
 tests/query_test/test_observability.py             | 130 +++++++-----
 65 files changed, 2097 insertions(+), 477 deletions(-)
 create mode 100644 be/src/experiments/hash-ring-util.cc
 create mode 100644 be/src/scheduling/hash-ring-test.cc
 create mode 100644 be/src/scheduling/hash-ring.cc
 create mode 100644 be/src/scheduling/hash-ring.h
 create mode 100644 be/src/util/system-state-info-test.cc
 create mode 100644 be/src/util/system-state-info.cc
 create mode 100644 be/src/util/system-state-info.h
 copy bin/parse-thrift-profile.py => lib/python/impala_py_lib/profiles.py (58%)
 mode change 100755 => 100644


[impala] 02/08: IMPALA-8104: IF NOT EXISTS to ALTER TABLE ADD COLUMN(S)

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit efd2478ed3c5f54ec5a1c486e7f4c657ca6addac
Author: Alex Rodoni <ar...@cloudera.com>
AuthorDate: Mon Feb 4 16:51:43 2019 -0800

    IMPALA-8104: IF NOT EXISTS to ALTER TABLE ADD COLUMN(S)
    
    Change-Id: Ieec5f99d868eee05dbf2cecff41dd57561360333
    Reviewed-on: http://gerrit.cloudera.org:8080/12361
    Reviewed-by: Fredy Wijaya <fw...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 docs/topics/impala_alter_table.xml | 44 ++++++++++++++++----------------------
 1 file changed, 19 insertions(+), 25 deletions(-)

diff --git a/docs/topics/impala_alter_table.xml b/docs/topics/impala_alter_table.xml
index 412aff0..8529a9c 100644
--- a/docs/topics/impala_alter_table.xml
+++ b/docs/topics/impala_alter_table.xml
@@ -46,11 +46,8 @@ under the License.
 
   <conbody>
 
-    <p>
-      <indexterm audience="hidden">ALTER TABLE statement</indexterm>
-      The <codeph>ALTER TABLE</codeph> statement changes the structure or properties of an
-      existing Impala table.
-    </p>
+    <p> The <codeph>ALTER TABLE</codeph> statement changes the structure or
+      properties of an existing Impala table. </p>
 
     <p>
       In Impala, this is primarily a logical operation that updates the table metadata in the
@@ -66,12 +63,13 @@ under the License.
 
 <codeblock>ALTER TABLE [<varname>old_db_name</varname>.]<varname>old_table_name</varname> RENAME TO [<varname>new_db_name</varname>.]<varname>new_table_name</varname>
 
-ALTER TABLE <varname>name</varname> ADD COLUMNS (<varname>col_spec</varname>[, <varname>col_spec</varname> ...])
+ALTER TABLE <varname>name</varname> ADD [IF NOT EXISTS] COLUMNS (<varname>col_spec</varname>[, <varname>col_spec</varname> ...])
+ALTER TABLE <varname>name</varname> REPLACE COLUMNS (<varname>col_spec</varname>[, <varname>col_spec</varname> ...])
+
+ALTER TABLE <varname>name</varname> ADD COLUMN [IF NOT EXISTS] <varname>col_spec</varname>
 ALTER TABLE <varname>name</varname> DROP [COLUMN] <varname>column_name</varname>
 ALTER TABLE <varname>name</varname> CHANGE <varname>column_name</varname> <varname>col_spec</varname>
 
-ALTER TABLE <varname>name</varname> REPLACE COLUMNS (<varname>col_spec</varname>[, <varname>col_spec</varname> ...])
-
 <ph rev="3.1 IMPALA-6988">ALTER TABLE <varname>name</varname> SET OWNER USER <varname>user_name</varname>
 ALTER TABLE <varname>name</varname> SET OWNER ROLE <varname>role_name</varname>
 </ph>
@@ -546,23 +544,19 @@ ALTER TABLE <varname>table_name</varname> SET SERDEPROPERTIES ('<varname>key1</v
     <p>
       <b>To reorganize columns for a table:</b>
     </p>
-
-<codeblock>ALTER TABLE <varname>table_name</varname> ADD COLUMNS (<varname>column_defs</varname>);
-ALTER TABLE <varname>table_name</varname> REPLACE COLUMNS (<varname>column_defs</varname>);
-ALTER TABLE <varname>table_name</varname> CHANGE <varname>column_name</varname> <varname>new_name</varname> <varname>new_type</varname>;
-ALTER TABLE <varname>table_name</varname> DROP <varname>column_name</varname>;</codeblock>
-
-    <p>
-      The <varname>column_spec</varname> is the same as in the <codeph>CREATE TABLE</codeph>
-      statement: the column name, then its data type, then an optional comment. You can add
-      multiple columns at a time. The parentheses are required whether you add a single column
-      or multiple columns. When you replace columns, all the original column definitions are
-      discarded. You might use this technique if you receive a new set of data files with
-      different data types or columns in a different order. (The data files are retained, so if
-      the new columns are incompatible with the old ones, use <codeph>INSERT OVERWRITE</codeph>
-      or <codeph>LOAD DATA OVERWRITE</codeph> to replace all the data before issuing any further
-      queries.)
-    </p>
+    <p>You can add multiple columns at a time using the <codeph>ALTER
+        TABLE</codeph> statement. If you specify the <codeph>IF NOT
+        EXISTS</codeph> clause, Impala silently ignores the <codeph>ADD</codeph>
+      request and does not return an error if a column with the same name exists
+      in the table.</p>
+    <p>When you replace columns, all the original column definitions are
+      discarded. </p>
+    <p>You might use these statements if you receive a new set of data files
+      with different data types or columns in a different order. The data files
+      are retained, so if the new columns are incompatible with the old ones,
+      use <codeph>INSERT OVERWRITE</codeph> or <codeph>LOAD DATA
+        OVERWRITE</codeph> to replace all the data before issuing any further
+      queries.</p>
 
     <p rev="">
       For example, here is how you might add columns to an existing table. The first


[impala] 04/08: Enable full stacktrace logging with surefire

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 52af0bff63af91506f08affa5fcd943af428be45
Author: Bharath Vissapragada <bh...@cloudera.com>
AuthorDate: Tue Feb 5 11:12:22 2019 -0800

    Enable full stacktrace logging with surefire
    
    By default, surefire plugin trims the stack trace while running
    the JUnit tests. Example: IMPALA-8150
    
    This commit disables that configuration. Tested it manually.
    
    Change-Id: I17fb7c72ff39dec1b0cbc7af9df3f61a9abc52b7
    Reviewed-on: http://gerrit.cloudera.org:8080/12370
    Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
    Reviewed-by: Fredy Wijaya <fw...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/pom.xml | 1 +
 1 file changed, 1 insertion(+)

diff --git a/fe/pom.xml b/fe/pom.xml
index 3fc9fe9..7fe5c50 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -603,6 +603,7 @@ under the License.
         <artifactId>maven-surefire-plugin</artifactId>
         <version>2.20</version>
         <configuration>
+          <trimStackTrace>false</trimStackTrace>
           <reportsDirectory>${surefire.reports.dir}</reportsDirectory>
           <redirectTestOutputToFile>true</redirectTestOutputToFile>
           <argLine>-Djava.library.path=${java.library.path}:${backend.library.path} ${surefireJacocoArg}</argLine>


[impala] 06/08: IMPALA-8103: In Analyzed Query use /* and */ to delimit hints.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 14996f495129eaee35ee030be87b289b9dae517e
Author: Andrew Sherman <as...@cloudera.com>
AuthorDate: Mon Feb 4 14:25:33 2019 -0800

    IMPALA-8103: In Analyzed Query use /* and */ to delimit hints.
    
    IMPALA-5821 added the analyzed query text to the output of EXPLAIN when
    explain_level is 2 or greater. Any query hits were displayed in the form
    
    SELECT
    -- +straight_join
    * FROM table_name [...]
    
    which meant that care had to be taken to embed and preserve newlines in
    the EXPLAIN output. This change makes the output to be of the form
    
    SELECT /* +straight_join */ * FROM  table_name  [...]
    
    The  /* +straight_join */ form was chosen over  /*+straight_join */ as
    it seems more readable, and is more commonly used in exiting tests.
    
    To do this the following changes were made:
    ToSqlUtils.getPlanHintsSql() was extended to take a ToSqlOptions
    parameter. If this parameter indicates that the analyzed query is being
    shown, then the hints are surrounded with '/*' and '*/'.
    PrintUtils.wrapString() was simplified as it no longer has to preserve
    newlines in a an analyzed query.
    
    TESTING
     All end-to-end tests were run.
     The output in a few .test files was updated.
     A couple of new cases were added to ToSqlTest.java
    
    Change-Id: I7215a4c17508e3408680a1d2bb6c3af355c78c8d
    Reviewed-on: http://gerrit.cloudera.org:8080/12360
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/BaseTableRef.java   |  2 +-
 .../org/apache/impala/analysis/InsertStmt.java     |  4 +-
 .../org/apache/impala/analysis/SelectStmt.java     |  3 +-
 .../java/org/apache/impala/analysis/TableRef.java  |  3 +-
 .../org/apache/impala/analysis/ToSqlUtils.java     | 14 +++-
 .../java/org/apache/impala/common/PrintUtils.java  | 15 +---
 .../java/org/apache/impala/analysis/ToSqlTest.java | 33 ++++++--
 .../org/apache/impala/util/PrintUtilsTest.java     | 11 ---
 .../queries/PlannerTest/max-row-size.test          | 21 ++---
 .../queries/PlannerTest/resource-requirements.test | 93 +++++++++-------------
 .../PlannerTest/spillable-buffer-sizing.test       | 82 +++++++------------
 11 files changed, 121 insertions(+), 160 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
index 7644329..7b10fdc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BaseTableRef.java
@@ -79,7 +79,7 @@ public class BaseTableRef extends TableRef {
     if (alias != null) aliasSql = " " + ToSqlUtils.getIdentSql(alias);
     String tableSampleSql = "";
     if (sampleParams_ != null) tableSampleSql = " " + sampleParams_.toSql(options);
-    String tableHintsSql = ToSqlUtils.getPlanHintsSql(tableHints_);
+    String tableHintsSql = ToSqlUtils.getPlanHintsSql(options, tableHints_);
     return getTable().getTableName().toSql() + aliasSql + tableSampleSql + tableHintsSql;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 7c34d52..72c29b3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -928,7 +928,7 @@ public class InsertStmt extends StatementBase {
 
     strBuilder.append(getOpName());
     if (!planHints_.isEmpty() && hintLoc_ == HintLocation.Start) {
-      strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(getPlanHints()));
+      strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(options, getPlanHints()));
     }
     if (overwrite_) {
       strBuilder.append(" OVERWRITE");
@@ -955,7 +955,7 @@ public class InsertStmt extends StatementBase {
       strBuilder.append(" PARTITION (" + Joiner.on(", ").join(values) + ")");
     }
     if (!planHints_.isEmpty() && hintLoc_ == HintLocation.End) {
-      strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(getPlanHints()));
+      strBuilder.append(" " + ToSqlUtils.getPlanHintsSql(options, getPlanHints()));
     }
     if (!needsGeneratedQueryStatement_) {
       strBuilder.append(" " + queryStmt_.toSql(options));
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index d568429..ff45a56 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -1051,7 +1051,8 @@ public class SelectStmt extends QueryStmt {
       strBuilder.append("DISTINCT ");
     }
     if (selectList_.hasPlanHints()) {
-      strBuilder.append(ToSqlUtils.getPlanHintsSql(selectList_.getPlanHints()) + " ");
+      strBuilder.append(ToSqlUtils.getPlanHintsSql(options, selectList_.getPlanHints()))
+          .append(" ");
     }
     for (int i = 0; i < selectList_.getItems().size(); ++i) {
       strBuilder.append(selectList_.getItems().get(i).toSql(options));
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableRef.java b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
index 5a6e866..f0acc41 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableRef.java
@@ -600,7 +600,8 @@ public class TableRef extends StmtNode {
     }
 
     StringBuilder output = new StringBuilder(" " + joinOp_.toString() + " ");
-    if(!joinHints_.isEmpty()) output.append(ToSqlUtils.getPlanHintsSql(joinHints_) + " ");
+    if (!joinHints_.isEmpty())
+      output.append(ToSqlUtils.getPlanHintsSql(options, joinHints_)).append(" ");
     output.append(tableRefToSql(options));
     if (usingColNames_ != null) {
       output.append(" USING (").append(Joiner.on(", ").join(usingColNames_)).append(")");
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index 366fb8f..3909eb8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -533,13 +533,19 @@ public class ToSqlUtils {
    * commented plan hint style such that hinted views created by Impala are readable by
    * Hive (parsed as a comment by Hive).
    */
-  public static String getPlanHintsSql(List<PlanHint> hints) {
+  public static String getPlanHintsSql(ToSqlOptions options, List<PlanHint> hints) {
     Preconditions.checkNotNull(hints);
     if (hints.isEmpty()) return "";
     StringBuilder sb = new StringBuilder();
-    sb.append("\n-- +");
-    sb.append(Joiner.on(",").join(hints));
-    sb.append("\n");
+    if (options.showRewritten()) {
+      sb.append("/* +");
+      sb.append(Joiner.on(",").join(hints));
+      sb.append(" */");
+    } else {
+      sb.append("\n-- +");
+      sb.append(Joiner.on(",").join(hints));
+      sb.append("\n");
+    }
     return sb.toString();
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/common/PrintUtils.java b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
index 70a667c..1f347be 100644
--- a/fe/src/main/java/org/apache/impala/common/PrintUtils.java
+++ b/fe/src/main/java/org/apache/impala/common/PrintUtils.java
@@ -175,16 +175,9 @@ public class PrintUtils {
    * Any newlines in the input are maintained.
    */
   public static String wrapString(String s, int wrapLength) {
-    StringBuilder ret = new StringBuilder(s.length());
-    String[] split = s.split("\n");
-    for (int i = 0; i < split.length; i++) {
-      String line = split[i];
-      String wrappedLine = WordUtils.wrap(line, wrapLength, null, true);
-      // we keep any existing newlines in text - these should be commented hints
-      wrappedLine = wrappedLine.replaceAll(" +$", "");
-      ret.append(wrappedLine);
-      if (i < split.length - 1) ret.append("\n");
-    }
-    return ret.toString();
+    String wrapped = WordUtils.wrap(s, wrapLength, null, true);
+    // Remove any trailing blanks from a line.
+    wrapped = wrapped.replaceAll(" +$", "");
+    return wrapped;
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
index a106820..6bc048f 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
@@ -82,6 +82,11 @@ public class ToSqlTest extends FrontendTestBase {
     testToSql(query, System.getProperty("user.name"), expected);
   }
 
+  private void testToSql(String query, String expected, ToSqlOptions options) {
+    String defaultDb = System.getProperty("user.name");
+    testToSql(createAnalysisCtx(defaultDb), query, defaultDb, expected, false, options);
+  }
+
   private void testToSql(AnalysisContext ctx, String query, String expected) {
     testToSql(ctx, query, System.getProperty("user.name"), expected);
   }
@@ -92,23 +97,24 @@ public class ToSqlTest extends FrontendTestBase {
 
   private void testToSql(AnalysisContext ctx, String query, String defaultDb,
       String expected) {
-    testToSql(ctx, query, defaultDb, expected, false);
+    testToSql(ctx, query, defaultDb, expected, false, ToSqlOptions.DEFAULT);
   }
 
   private void testToSql(String query, String defaultDb, String expected,
       boolean ignoreWhitespace) {
-    testToSql(createAnalysisCtx(defaultDb), query, defaultDb, expected, ignoreWhitespace);
+    testToSql(createAnalysisCtx(defaultDb), query, defaultDb, expected, ignoreWhitespace,
+        ToSqlOptions.DEFAULT);
   }
 
   private void testToSql(AnalysisContext ctx, String query, String defaultDb,
-      String expected, boolean ignoreWhitespace) {
+      String expected, boolean ignoreWhitespace, ToSqlOptions options) {
     String actual = null;
     try {
       ParseNode node = AnalyzesOk(query, ctx);
-      if (node instanceof QueryStmt) {
+      if (node instanceof QueryStmt && !options.showRewritten()) {
         actual = ((QueryStmt)node).getOrigSqlString();
       } else {
-        actual = node.toSql();
+        actual = node.toSql(options);
       }
       if (ignoreWhitespace) {
         // Transform whitespace to single space.
@@ -686,6 +692,23 @@ public class ToSqlTest extends FrontendTestBase {
           String.format("select distinct %sstraight_join%s * from functional.alltypes",
           prefix, suffix),
           "SELECT DISTINCT \n-- +straight_join\n * FROM functional.alltypes");
+
+      // Tests for analyzed/rewritten sql.
+      // First test the test by passing ToSqlOptions.DEFAULT which should result in the
+      // hints appearing with '--' style comments.
+      testToSql(
+          String.format("select distinct %sstraight_join%s * from functional.alltypes",
+              prefix, suffix),
+          "SELECT DISTINCT \n-- +straight_join\n * FROM functional.alltypes",
+          ToSqlOptions.DEFAULT);
+      // Test that analyzed queries use the '/*' style comments.
+      testToSql(
+          String.format("select distinct %sstraight_join%s * from functional.alltypes "
+                  + "where bool_col = false and id <= 5 and id >= 2",
+              prefix, suffix),
+          "SELECT DISTINCT /* +straight_join */ * FROM functional.alltypes "
+              + "WHERE bool_col = FALSE AND id <= 5 AND id >= 2",
+          ToSqlOptions.REWRITTEN);
     }
   }
 
diff --git a/fe/src/test/java/org/apache/impala/util/PrintUtilsTest.java b/fe/src/test/java/org/apache/impala/util/PrintUtilsTest.java
index 300022f..cd475a4 100644
--- a/fe/src/test/java/org/apache/impala/util/PrintUtilsTest.java
+++ b/fe/src/test/java/org/apache/impala/util/PrintUtilsTest.java
@@ -97,17 +97,6 @@ public class PrintUtilsTest {
             + " AS DOUBLE) < CAST(10 AS DOUBLE)",
         "Analyzed query: SELECT * FROM functional_kudu.alltypestiny\n"
             + "WHERE CAST(bigint_col AS DOUBLE) < CAST(10 AS DOUBLE)");
-    // Simple query with a hint retains newlines surrounding hint.
-    assertWrap("SELECT\n"
-            + "-- +straight_join\n"
-            + " * FROM tpch_parquet.orders INNER JOIN\n"
-            + "-- +shuffle\n"
-            + " tpch_parquet.customer ON o_custkey = c_custkey",
-        "SELECT\n"
-            + "-- +straight_join\n"
-            + "* FROM tpch_parquet.orders INNER JOIN\n"
-            + "-- +shuffle\n"
-            + "tpch_parquet.customer ON o_custkey = c_custkey");
     // test that a long string of blanks prints OK, some may be lost for clarity
     assertWrap("insert into foo values ('                                      "
             + "                                                                          "
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
index 34c4938..793c231 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/max-row-size.test
@@ -6,10 +6,8 @@ from tpch_parquet.customer
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=33.97MB Threads=5
 Per-Host Resource Estimates: Memory=68MB
-Analyzed query: SELECT
--- +straight_join
-* FROM tpch_parquet.customer INNER JOIN tpch_parquet.nation ON c_nationkey =
-n_nationkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.customer INNER
+JOIN tpch_parquet.nation ON c_nationkey = n_nationkey
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.33MB mem-reservation=0B thread-reservation=1
@@ -67,10 +65,8 @@ from tpch_parquet.lineitem
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=110.00MB Threads=5
 Per-Host Resource Estimates: Memory=410MB
-Analyzed query: SELECT
--- +straight_join
-* FROM tpch_parquet.lineitem LEFT OUTER JOIN tpch_parquet.orders ON l_orderkey =
-o_orderkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.lineitem LEFT
+OUTER JOIN tpch_parquet.orders ON l_orderkey = o_orderkey
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=11.20MB mem-reservation=0B thread-reservation=1
@@ -184,11 +180,10 @@ having count(*) = 1
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=110.00MB Threads=7
 Per-Host Resource Estimates: Memory=272MB
-Analyzed query: SELECT
--- +straight_join
-l_orderkey, o_orderstatus, count(*) FROM tpch_parquet.lineitem INNER JOIN
-tpch_parquet.orders ON o_orderkey = l_orderkey GROUP BY l_orderkey,
-o_orderstatus HAVING count(*) = CAST(1 AS BIGINT)
+Analyzed query: SELECT /* +straight_join */ l_orderkey, o_orderstatus, count(*)
+FROM tpch_parquet.lineitem INNER JOIN tpch_parquet.orders ON o_orderkey =
+l_orderkey GROUP BY l_orderkey, o_orderstatus HAVING count(*) = CAST(1 AS
+BIGINT)
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.10MB mem-reservation=0B thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index fdde0a1..a90363f 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -2422,8 +2422,7 @@ from tpch.lineitem inner join /* +shuffle */ tpch.orders on l_orderkey = o_order
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=51.00MB Threads=3
 Per-Host Resource Estimates: Memory=446MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
--- +shuffle
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN /* +shuffle */
 tpch.orders ON l_orderkey = o_orderkey
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -2462,8 +2461,7 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=52.00MB Threads=6
 Per-Host Resource Estimates: Memory=300MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
--- +shuffle
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN /* +shuffle */
 tpch.orders ON l_orderkey = o_orderkey
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -2523,8 +2521,7 @@ Per-Host Resources: mem-estimate=89.00MB mem-reservation=9.00MB thread-reservati
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=104.00MB Threads=7
 Per-Host Resource Estimates: Memory=481MB
-Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN
--- +shuffle
+Analyzed query: SELECT * FROM tpch.lineitem INNER JOIN /* +shuffle */
 tpch.orders ON l_orderkey = o_orderkey
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -4956,15 +4953,12 @@ join (
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=99.00MB Threads=5
 Per-Host Resource Estimates: Memory=180MB
-Analyzed query: SELECT
--- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
--- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
--- +straight_join
-t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
-tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
-t2.o_orderkey) v1 ON v1.k3 = t1.o_orderkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.orders t1 INNER
+JOIN (SELECT /* +straight_join */ t2.o_orderkey k2, k3, k4 FROM
+tpch_parquet.orders t2 INNER JOIN (SELECT /* +straight_join */ t3.o_orderkey k3,
+t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN tpch_parquet.orders t4
+ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 = t2.o_orderkey) v1 ON v1.k3 =
+t1.o_orderkey
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=180.00MB mem-reservation=99.00MB thread-reservation=5 runtime-filters-memory=3.00MB
@@ -5040,15 +5034,12 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=100.50MB Threads=10
 Per-Host Resource Estimates: Memory=260MB
-Analyzed query: SELECT
--- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
--- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
--- +straight_join
-t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
-tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
-t2.o_orderkey) v1 ON v1.k3 = t1.o_orderkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.orders t1 INNER
+JOIN (SELECT /* +straight_join */ t2.o_orderkey k2, k3, k4 FROM
+tpch_parquet.orders t2 INNER JOIN (SELECT /* +straight_join */ t3.o_orderkey k3,
+t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN tpch_parquet.orders t4
+ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 = t2.o_orderkey) v1 ON v1.k3 =
+t1.o_orderkey
 
 F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.41MB mem-reservation=0B thread-reservation=1
@@ -5159,15 +5150,12 @@ Per-Host Resources: mem-estimate=88.84MB mem-reservation=59.00MB thread-reservat
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=176.50MB Threads=11
 Per-Host Resource Estimates: Memory=454MB
-Analyzed query: SELECT
--- +straight_join
-* FROM tpch_parquet.orders t1 INNER JOIN (SELECT
--- +straight_join
-t2.o_orderkey k2, k3, k4 FROM tpch_parquet.orders t2 INNER JOIN (SELECT
--- +straight_join
-t3.o_orderkey k3, t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN
-tpch_parquet.orders t4 ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 =
-t2.o_orderkey) v1 ON v1.k3 = t1.o_orderkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.orders t1 INNER
+JOIN (SELECT /* +straight_join */ t2.o_orderkey k2, k3, k4 FROM
+tpch_parquet.orders t2 INNER JOIN (SELECT /* +straight_join */ t3.o_orderkey k3,
+t4.o_orderkey k4 FROM tpch_parquet.orders t3 INNER JOIN tpch_parquet.orders t4
+ON t3.o_orderkey = t4.o_orderkey) v2 ON v2.k3 = t2.o_orderkey) v1 ON v1.k3 =
+t1.o_orderkey
 
 F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.82MB mem-reservation=0B thread-reservation=1
@@ -5318,14 +5306,11 @@ join (
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=176.00KB Threads=5
 Per-Host Resource Estimates: Memory=138MB
-Analyzed query: SELECT
--- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
--- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
--- +straight_join
-t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
-tpch_parquet.supplier t4) v2) v1
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.nation t1 INNER
+JOIN (SELECT /* +straight_join */ t2.n_nationkey k2, k3, k4 FROM
+tpch_parquet.nation t2 INNER JOIN (SELECT /* +straight_join */ t3.n_nationkey
+k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN tpch_parquet.supplier
+t4) v2) v1
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=137.99MB mem-reservation=176.00KB thread-reservation=5
@@ -5389,14 +5374,11 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=176.00KB Threads=9
 Per-Host Resource Estimates: Memory=161MB
-Analyzed query: SELECT
--- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
--- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
--- +straight_join
-t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
-tpch_parquet.supplier t4) v2) v1
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.nation t1 INNER
+JOIN (SELECT /* +straight_join */ t2.n_nationkey k2, k3, k4 FROM
+tpch_parquet.nation t2 INNER JOIN (SELECT /* +straight_join */ t3.n_nationkey
+k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN tpch_parquet.supplier
+t4) v2) v1
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.13MB mem-reservation=0B thread-reservation=1
@@ -5488,14 +5470,11 @@ Per-Host Resources: mem-estimate=97.55MB mem-reservation=32.00KB thread-reservat
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=352.00KB Threads=9
 Per-Host Resource Estimates: Memory=311MB
-Analyzed query: SELECT
--- +straight_join
-* FROM tpch_parquet.nation t1 INNER JOIN (SELECT
--- +straight_join
-t2.n_nationkey k2, k3, k4 FROM tpch_parquet.nation t2 INNER JOIN (SELECT
--- +straight_join
-t3.n_nationkey k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN
-tpch_parquet.supplier t4) v2) v1
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.nation t1 INNER
+JOIN (SELECT /* +straight_join */ t2.n_nationkey k2, k3, k4 FROM
+tpch_parquet.nation t2 INNER JOIN (SELECT /* +straight_join */ t3.n_nationkey
+k3, t4.s_suppkey k4 FROM tpch_parquet.nation t3 INNER JOIN tpch_parquet.supplier
+t4) v2) v1
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.27MB mem-reservation=0B thread-reservation=1
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
index 8d35928..68e5110 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/spillable-buffer-sizing.test
@@ -5,10 +5,8 @@ from tpch_parquet.customer
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=18.97MB Threads=5
 Per-Host Resource Estimates: Memory=53MB
-Analyzed query: SELECT 
--- +straight_join
-* FROM tpch_parquet.customer INNER JOIN tpch_parquet.nation ON c_nationkey =
-n_nationkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.customer INNER
+JOIN tpch_parquet.nation ON c_nationkey = n_nationkey
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.33MB mem-reservation=0B thread-reservation=1
@@ -60,10 +58,8 @@ Per-Host Resources: mem-estimate=26.95MB mem-reservation=18.94MB thread-reservat
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=37.94MB Threads=5
 Per-Host Resource Estimates: Memory=97MB
-Analyzed query: SELECT 
--- +straight_join
-* FROM tpch_parquet.customer INNER JOIN tpch_parquet.nation ON c_nationkey =
-n_nationkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.customer INNER
+JOIN tpch_parquet.nation ON c_nationkey = n_nationkey
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.65MB mem-reservation=0B thread-reservation=1
@@ -128,10 +124,8 @@ from tpch_parquet.lineitem
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=98.00MB Threads=5
 Per-Host Resource Estimates: Memory=410MB
-Analyzed query: SELECT 
--- +straight_join
-* FROM tpch_parquet.lineitem LEFT OUTER JOIN tpch_parquet.orders ON l_orderkey =
-o_orderkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.lineitem LEFT
+OUTER JOIN tpch_parquet.orders ON l_orderkey = o_orderkey
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=11.20MB mem-reservation=0B thread-reservation=1
@@ -181,10 +175,8 @@ Per-Host Resources: mem-estimate=359.29MB mem-reservation=74.00MB thread-reserva
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=196.00MB Threads=5
 Per-Host Resource Estimates: Memory=790MB
-Analyzed query: SELECT 
--- +straight_join
-* FROM tpch_parquet.lineitem LEFT OUTER JOIN tpch_parquet.orders ON l_orderkey =
-o_orderkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.lineitem LEFT
+OUTER JOIN tpch_parquet.orders ON l_orderkey = o_orderkey
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=12.40MB mem-reservation=0B thread-reservation=1
@@ -247,11 +239,8 @@ from tpch_parquet.orders
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=59.00MB Threads=6
 Per-Host Resource Estimates: Memory=114MB
-Analyzed query: SELECT 
--- +straight_join
-* FROM tpch_parquet.orders INNER JOIN 
--- +shuffle
-tpch_parquet.customer ON o_custkey = c_custkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.orders INNER
+JOIN /* +shuffle */ tpch_parquet.customer ON o_custkey = c_custkey
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.77MB mem-reservation=0B thread-reservation=1
@@ -310,11 +299,8 @@ Per-Host Resources: mem-estimate=41.00MB mem-reservation=25.00MB thread-reservat
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=101.00MB Threads=7
 Per-Host Resource Estimates: Memory=182MB
-Analyzed query: SELECT 
--- +straight_join
-* FROM tpch_parquet.orders INNER JOIN 
--- +shuffle
-tpch_parquet.customer ON o_custkey = c_custkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.orders INNER
+JOIN /* +shuffle */ tpch_parquet.customer ON o_custkey = c_custkey
 
 F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=11.55MB mem-reservation=0B thread-reservation=1
@@ -386,11 +372,8 @@ from tpch_parquet.orders
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=75.00MB Threads=5
 Per-Host Resource Estimates: Memory=120MB
-Analyzed query: SELECT 
--- +straight_join
-* FROM tpch_parquet.orders INNER JOIN 
--- +broadcast
-tpch_parquet.customer ON o_custkey = c_custkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.orders INNER
+JOIN /* +broadcast */ tpch_parquet.customer ON o_custkey = c_custkey
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.77MB mem-reservation=0B thread-reservation=1
@@ -442,11 +425,8 @@ Per-Host Resources: mem-estimate=85.45MB mem-reservation=59.00MB thread-reservat
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=150.00MB Threads=5
 Per-Host Resource Estimates: Memory=210MB
-Analyzed query: SELECT 
--- +straight_join
-* FROM tpch_parquet.orders INNER JOIN 
--- +broadcast
-tpch_parquet.customer ON o_custkey = c_custkey
+Analyzed query: SELECT /* +straight_join */ * FROM tpch_parquet.orders INNER
+JOIN /* +broadcast */ tpch_parquet.customer ON o_custkey = c_custkey
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=11.55MB mem-reservation=0B thread-reservation=1
@@ -513,10 +493,8 @@ Max Per-Host Resource Reservation: Memory=34.17MB Threads=5
 Per-Host Resource Estimates: Memory=2.03GB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypes, functional_parquet.alltypestiny
-Analyzed query: SELECT 
--- +straight_join
-* FROM functional_parquet.alltypes LEFT OUTER JOIN
-functional_parquet.alltypestiny ON alltypes.id = alltypestiny.id
+Analyzed query: SELECT /* +straight_join */ * FROM functional_parquet.alltypes
+LEFT OUTER JOIN functional_parquet.alltypestiny ON alltypes.id = alltypestiny.id
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=503.95KB mem-reservation=0B thread-reservation=1
@@ -570,10 +548,8 @@ Max Per-Host Resource Reservation: Memory=68.34MB Threads=5
 Per-Host Resource Estimates: Memory=4.06GB
 WARNING: The following tables are missing relevant table and/or column statistics.
 functional_parquet.alltypestiny
-Analyzed query: SELECT 
--- +straight_join
-* FROM functional_parquet.alltypes LEFT OUTER JOIN
-functional_parquet.alltypestiny ON alltypes.id = alltypestiny.id
+Analyzed query: SELECT /* +straight_join */ * FROM functional_parquet.alltypes
+LEFT OUTER JOIN functional_parquet.alltypestiny ON alltypes.id = alltypestiny.id
 
 F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=1007.95KB mem-reservation=0B thread-reservation=1
@@ -741,11 +717,10 @@ having count(*) = 1
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=82.00MB Threads=7
 Per-Host Resource Estimates: Memory=244MB
-Analyzed query: SELECT 
--- +straight_join
-l_orderkey, o_orderstatus, count(*) FROM tpch_parquet.lineitem INNER JOIN
-tpch_parquet.orders ON o_orderkey = l_orderkey GROUP BY l_orderkey,
-o_orderstatus HAVING count(*) = CAST(1 AS BIGINT)
+Analyzed query: SELECT /* +straight_join */ l_orderkey, o_orderstatus, count(*)
+FROM tpch_parquet.lineitem INNER JOIN tpch_parquet.orders ON o_orderkey =
+l_orderkey GROUP BY l_orderkey, o_orderstatus HAVING count(*) = CAST(1 AS
+BIGINT)
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.10MB mem-reservation=0B thread-reservation=1
@@ -826,11 +801,10 @@ Per-Host Resources: mem-estimate=81.00MB mem-reservation=5.00MB thread-reservati
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=130.00MB Threads=9
 Per-Host Resource Estimates: Memory=400MB
-Analyzed query: SELECT 
--- +straight_join
-l_orderkey, o_orderstatus, count(*) FROM tpch_parquet.lineitem INNER JOIN
-tpch_parquet.orders ON o_orderkey = l_orderkey GROUP BY l_orderkey,
-o_orderstatus HAVING count(*) = CAST(1 AS BIGINT)
+Analyzed query: SELECT /* +straight_join */ l_orderkey, o_orderstatus, count(*)
+FROM tpch_parquet.lineitem INNER JOIN tpch_parquet.orders ON o_orderkey =
+l_orderkey GROUP BY l_orderkey, o_orderstatus HAVING count(*) = CAST(1 AS
+BIGINT)
 
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=10.19MB mem-reservation=0B thread-reservation=1


[impala] 08/08: IMPALA-7928: Consistent remote read scheduling

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 24eab713a0d35f629509f59711f8a563e1346acf
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Wed Dec 5 13:59:27 2018 -0800

    IMPALA-7928: Consistent remote read scheduling
    
    Currently, remote reads for a particular file are not scheduled to
    a consistent set of nodes. This reduces the efficiency of the HDFS
    file handle cache.
    
    This modifies the scheduling of remote reads to limit the number
    of executors considered when picking an executor for a remote scan
    range. The remote executor candidates are generated by hashing the
    filename+offset multiple times and finding the closest nodes in a
    hash ring. This is a consistent hash that is designed to limit the
    number of files remapped when cluster nodes come and go. The number
    of remote executor candidates is controlled by a query option
    'num_remote_executor_candidates', which defaults to 3. It is
    capped at 16.
    
    Once the remote executor candidates are chosen, the algorithm for
    picking a specific replica uses the same algorithm as picking a
    local replica. It picks the node with the minimum number of
    assigned bytes and uses 'schedule_random_replica' to determine
    how to break ties.
    
    This leaves the normal algorithms in place for local files, Kudu,
    and HBase. If 'num_remote_executor_candidates' is set to 0, the
    existing remote scheduling algorithm is used. The existing
    algorithm schedules remote scan ranges on all available executors.
    
    Testing:
     - There is a new hash-ring-test and related tests in scheduler-test.
     - There is a utility (hash-ring-util) in experiments for hand tuning
       the hash ring.
    
    Change-Id: Icbf74088a8bd8c285ab7285ea3a01acd1bb53a45
    Reviewed-on: http://gerrit.cloudera.org:8080/12037
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/experiments/CMakeLists.txt          |   2 +
 be/src/experiments/hash-ring-util.cc       | 101 +++++++++++++
 be/src/scheduling/CMakeLists.txt           |   2 +
 be/src/scheduling/backend-config.cc        |  17 ++-
 be/src/scheduling/backend-config.h         |  13 +-
 be/src/scheduling/hash-ring-test.cc        | 230 +++++++++++++++++++++++++++++
 be/src/scheduling/hash-ring.cc             | 125 ++++++++++++++++
 be/src/scheduling/hash-ring.h              | 115 +++++++++++++++
 be/src/scheduling/scheduler-test-util.cc   |   4 +
 be/src/scheduling/scheduler-test-util.h    |   9 +-
 be/src/scheduling/scheduler-test.cc        | 111 +++++++++++++-
 be/src/scheduling/scheduler.cc             |  65 +++++++-
 be/src/scheduling/scheduler.h              |  12 +-
 be/src/service/query-options.cc            |  14 ++
 be/src/service/query-options.h             |   4 +-
 common/thrift/ImpalaInternalService.thrift |   4 +
 common/thrift/ImpalaService.thrift         |  11 ++
 17 files changed, 823 insertions(+), 16 deletions(-)

diff --git a/be/src/experiments/CMakeLists.txt b/be/src/experiments/CMakeLists.txt
index c4b5593..a50798c 100644
--- a/be/src/experiments/CMakeLists.txt
+++ b/be/src/experiments/CMakeLists.txt
@@ -30,6 +30,7 @@ add_executable(data-provider-test data-provider-test.cc)
 add_executable(tuple-splitter-test tuple-splitter-test.cc)
 add_executable(hash-partition-test hash-partition-test.cc)
 add_executable(compression-test compression-test.cc)
+add_executable(hash-ring-util hash-ring-util.cc)
 
 # Add Experiments lib explicitly here.  It is not used by any other part
 # of impala so don't include it in link_libs
@@ -37,5 +38,6 @@ target_link_libraries(data-provider-test Experiments ${IMPALA_LINK_LIBS})
 target_link_libraries(tuple-splitter-test Experiments ${IMPALA_LINK_LIBS})
 target_link_libraries(hash-partition-test ${IMPALA_LINK_LIBS})
 target_link_libraries(compression-test ${IMPALA_LINK_LIBS})
+target_link_libraries(hash-ring-util ${IMPALA_LINK_LIBS})
 
 ADD_BE_LSAN_TEST(string-search-sse-test)
diff --git a/be/src/experiments/hash-ring-util.cc b/be/src/experiments/hash-ring-util.cc
new file mode 100644
index 0000000..1910755
--- /dev/null
+++ b/be/src/experiments/hash-ring-util.cc
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cmath>
+#include <stdlib.h>
+#include <stdio.h>
+#include <iostream>
+
+#include "common/init.h"
+#include "scheduling/hash-ring.h"
+#include "scheduling/scheduler-test-util.h"
+#include "testutil/gtest-util.h"
+#include "util/network-util.h"
+#include "util/test-info.h"
+#include "util/time.h"
+
+#include "common/names.h"
+
+DEFINE_int32(num_hosts, 10, "Number of hosts for simulation");
+DEFINE_int32(num_replicas, 10, "Replication factor for hashring");
+
+namespace impala {
+
+// Utility to test the HashRing's performance. It is important to understand how well the
+// HashRing distributes nodes throughout the hash space. Specifically, it is useful to
+// know how much of the hash space is mapped to each node and how much variation there is
+// between the nodes. This builds Cluster with appropriate number of hosts, then it adds
+// them into the HashRing. It extracts information about the distribution and prints it.
+
+class HashRingDistributionCheck {
+public:
+  // Given a number of hosts and number of replicas, generate the hashring and calculate
+  // various measurements of the distribution of the nodes across the hash space.
+  static void TestDistribution(int num_hosts, int num_replicas) {
+    int64_t start_nanos = MonotonicNanos();
+    cout << "Running TestDistribution with num_hosts=" << std::to_string(num_hosts)
+         << " num_replicas=" << std::to_string(num_replicas) << endl;
+    test::Cluster cluster;
+    cluster.AddHosts(num_hosts, true, false);
+    HashRing hashring(num_replicas);
+    for (int host_idx = 0; host_idx < num_hosts; host_idx++) {
+      IpAddr node = test::Cluster::HostIdxToIpAddr(host_idx);
+      hashring.AddNode(node);
+    }
+    int64_t end_nanos = MonotonicNanos();
+    map<IpAddr, uint64_t> distribution;
+    hashring.GetDistributionMap(&distribution);
+
+    uint64_t total_uint32_range = static_cast<uint64_t>(UINT_MAX) + 1;
+    // By definition, the ranges add up to UINT_MAX+1, so the average is
+    // UINT_MAX+1/num_hosts
+    double average = ((double) total_uint32_range / (double) num_hosts);
+    uint64_t min = total_uint32_range;
+    uint64_t max = 0;
+    uint64_t total = 0;
+    double sum_of_diff_squared = 0.0;
+    for (auto it = distribution.begin(); it != distribution.end(); it++) {
+      uint64_t range = it->second;
+      DCHECK_LE(range, total_uint32_range);
+      if (range < min) min = range;
+      if (range > max) max = range;
+      total += range;
+      double diff_squared = pow(((double)range - average), 2);
+      sum_of_diff_squared += diff_squared;
+      double range_pct = (100.00 * (double) range) / (double) UINT_MAX;
+      cout << "Host: " << it->first << " Range: " << std::to_string(range_pct) << endl;
+    }
+    cout << "Min Range: " << min << endl;
+    cout << "Max Range: " << max << endl;
+    cout << "Max/Min Ratio: " << std::to_string((double) max / (double) min) << endl;
+    // The distribution should sum to UINT_MAX+1
+    DCHECK_EQ(total, total_uint32_range);
+    double stddev = sqrt(sum_of_diff_squared / ((double) num_hosts - 1));
+    cout << "StdDev: " << std::to_string(stddev) << endl;
+    cout << "Time nanos: " << std::to_string(end_nanos - start_nanos) << endl;
+  }
+};
+
+}
+
+int main(int argc, char ** argv) {
+  google::ParseCommandLineFlags(&argc, &argv, true);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::HashRingDistributionCheck::TestDistribution(FLAGS_num_hosts,
+      FLAGS_num_replicas);
+  return 0;
+}
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 47fd438..998a05c 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -26,6 +26,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/scheduling")
 add_library(Scheduling STATIC
   admission-controller.cc
   backend-config.cc
+  hash-ring.cc
   query-schedule.cc
   request-pool-service.cc
   scheduler-test-util.cc
@@ -35,3 +36,4 @@ add_dependencies(Scheduling gen-deps)
 
 ADD_BE_LSAN_TEST(scheduler-test)
 ADD_BE_LSAN_TEST(backend-config-test)
+ADD_BE_LSAN_TEST(hash-ring-test)
diff --git a/be/src/scheduling/backend-config.cc b/be/src/scheduling/backend-config.cc
index 715ec34..f4d68bd 100644
--- a/be/src/scheduling/backend-config.cc
+++ b/be/src/scheduling/backend-config.cc
@@ -19,7 +19,18 @@
 
 namespace impala{
 
-BackendConfig::BackendConfig(const std::vector<TNetworkAddress>& backends) {
+// Hand-testing shows that 25 replicas produces a reasonable balance between nodes
+// across the hash ring. See HashRingTest::MaxMinRatio() for some empirical results
+// at similar replication levels. There is nothing special about 25 (i.e. 24 or 26
+// would be similar). Increasing this results in a more even distribution.
+// TODO: This can be tuned further with real world tests
+static const uint32_t NUM_HASH_RING_REPLICAS = 25;
+
+BackendConfig::BackendConfig()
+  : backend_ip_hash_ring_(NUM_HASH_RING_REPLICAS) {}
+
+BackendConfig::BackendConfig(const std::vector<TNetworkAddress>& backends)
+  : backend_ip_hash_ring_(NUM_HASH_RING_REPLICAS) {
   // Construct backend_map and backend_ip_map.
   for (const TNetworkAddress& backend: backends) {
     IpAddr ip;
@@ -54,6 +65,9 @@ void BackendConfig::GetAllBackends(BackendList* backends) const {
 void BackendConfig::AddBackend(const TBackendDescriptor& be_desc) {
   DCHECK(!be_desc.ip_address.empty());
   BackendList& be_descs = backend_map_[be_desc.ip_address];
+  if (be_descs.empty()) {
+    backend_ip_hash_ring_.AddNode(be_desc.ip_address);
+  }
   if (find(be_descs.begin(), be_descs.end(), be_desc) == be_descs.end()) {
     be_descs.push_back(be_desc);
   }
@@ -68,6 +82,7 @@ void BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) {
     if (be_descs->empty()) {
       backend_map_.erase(be_descs_it);
       backend_ip_map_.erase(be_desc.address.hostname);
+      backend_ip_hash_ring_.RemoveNode(be_desc.ip_address);
     }
   }
 }
diff --git a/be/src/scheduling/backend-config.h b/be/src/scheduling/backend-config.h
index c7cbbcd..32b7c2f 100644
--- a/be/src/scheduling/backend-config.h
+++ b/be/src/scheduling/backend-config.h
@@ -24,15 +24,16 @@
 
 #include "gen-cpp/StatestoreService_types.h"
 #include "gen-cpp/Types_types.h"
+#include "scheduling/hash-ring.h"
 #include "util/network-util.h"
 
 namespace impala {
 
-/// Configuration class to store a list of backends per IP address and a mapping from
-/// hostnames to IP addresses.
+/// Configuration class to store a list of backends per IP address, a mapping from
+/// hostnames to IP addresses, and a hash ring containing all backends.
 class BackendConfig {
  public:
-  BackendConfig() {}
+  BackendConfig();
 
   /// Construct from list of backends.
   BackendConfig(const std::vector<TNetworkAddress>& backends);
@@ -62,6 +63,8 @@ class BackendConfig {
   /// be retained beyond the lifetime of this BackendConfig.
   const TBackendDescriptor* LookUpBackendDesc(const TNetworkAddress& host) const;
 
+  const HashRing* GetHashRing() const { return &backend_ip_hash_ring_; }
+
   int NumBackends() const { return backend_map_.size(); }
 
  private:
@@ -74,6 +77,10 @@ class BackendConfig {
   /// backend_map_ changes.
   typedef boost::unordered_map<Hostname, IpAddr> BackendIpAddressMap;
   BackendIpAddressMap backend_ip_map_;
+
+  /// All backends are kept in a hash ring to allow a consistent mapping from filenames
+  /// to backends.
+  HashRing backend_ip_hash_ring_;
 };
 
 }  // end ns impala
diff --git a/be/src/scheduling/hash-ring-test.cc b/be/src/scheduling/hash-ring-test.cc
new file mode 100644
index 0000000..4ea9ba9
--- /dev/null
+++ b/be/src/scheduling/hash-ring-test.cc
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <map>
+#include <math.h>
+#include <stdio.h>
+#include <iostream>
+
+#include "scheduling/hash-ring.h"
+#include "testutil/gtest-util.h"
+#include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp"
+#include "util/network-util.h"
+#include "gen-cpp/Types_types.h"
+#include "common/names.h"
+
+namespace impala {
+
+class HashRingTest : public ::testing::Test {
+ protected:
+  void GetDistributionMap(const HashRing& hash_ring,
+      std::map<IpAddr, uint64_t>* distribution_map) {
+    return hash_ring.GetDistributionMap(distribution_map);
+  }
+
+  // Verify the specified hash ring has the appropriate number of nodes and replicas.
+  // This assumes no collisions. If there are collisions, the total replicas will be
+  // smaller than expected.
+  void VerifyCounts(const HashRing& hash_ring, uint32_t num_nodes) {
+    EXPECT_EQ(hash_ring.GetNumNodes(), num_nodes);
+    EXPECT_EQ(hash_ring.GetTotalReplicas(), num_nodes * hash_ring.GetNumReplicas());
+  }
+
+  // Verify that the allocations that GetDistributionMap() returns for each node
+  // sum to the size of the hash space (UINT_MAX + 1).
+  void VerifyTotalAllocation(const vector<IpAddr>& addresses,
+      const uint32_t replication) {
+    HashRing hash_ring(replication);
+    for (const IpAddr& addr : addresses) hash_ring.AddNode(addr);
+    std::map<IpAddr, uint64_t> dist_map;
+    hash_ring.GetDistributionMap(&dist_map);
+    uint64_t total_allocation = 0;
+    uint64_t total_uint32_range = static_cast<uint64_t>(UINT_MAX) + 1;
+    for (auto dist_map_it : dist_map) {
+      total_allocation += dist_map_it.second;
+    }
+    EXPECT_EQ(total_allocation, total_uint32_range);
+  }
+
+  // Verify that ratio of the maximum allocation to the minimum allocation is below
+  // the 'expected_ratio_limit'.
+  void VerifyMaxMinRatio(const vector<IpAddr>& addresses,
+      const uint32_t replication, double expected_ratio_limit) {
+    HashRing hash_ring(replication);
+    for (const IpAddr& addr : addresses) hash_ring.AddNode(addr);
+    std::map<IpAddr, uint64_t> dist_map;
+    hash_ring.GetDistributionMap(&dist_map);
+    uint64_t min = static_cast<uint64_t>(UINT_MAX) + 1;
+    uint64_t max = 0;
+    for (auto it = dist_map.begin(); it != dist_map.end(); it++) {
+      uint64_t range = it->second;
+      if (range < min) min = range;
+      if (range > max) max = range;
+    }
+    double max_min_ratio = (double) max / (double) min;
+    LOG(INFO) << "VerifyMaxMinRatio: num_nodes: " << addresses.size()
+              << " replication: " << replication << " max/min ratio: " << max_min_ratio
+              << " expected limit: " << expected_ratio_limit;
+    EXPECT_LT(max_min_ratio, expected_ratio_limit);
+  }
+
+  void GetBasicNetworkAddresses(vector<IpAddr>& addresses) {
+    // Collection of three addresses
+    addresses.push_back("hostname1");
+    addresses.push_back("hostname2");
+    addresses.push_back("hostname3");
+  }
+
+  void GetMultipleNetworkAddresses(string basename, uint32_t num_nodes,
+      vector<IpAddr>& addresses) {
+    for (uint32_t i = 0; i < num_nodes; i++) {
+      addresses.push_back(basename + std::to_string(i));
+    }
+  }
+};
+
+TEST_F(HashRingTest, BasicAddRemove) {
+  vector<IpAddr> basic_addresses;
+  GetBasicNetworkAddresses(basic_addresses);
+
+  const uint32_t replication = 10;
+  HashRing h(replication);
+  VerifyCounts(h, 0);
+
+  int total_addresses = basic_addresses.size();
+  for (int i = 0; i < total_addresses; i++) {
+    h.AddNode(basic_addresses[i]);
+    VerifyCounts(h, i + 1);
+  }
+
+  // Remove the elements in a different order
+  std::random_shuffle(basic_addresses.begin(), basic_addresses.end());
+  for (int i = 0; i < total_addresses; i++) {
+    h.RemoveNode(basic_addresses[i]);
+    VerifyCounts(h, total_addresses - (i + 1));
+  }
+}
+
+TEST_F(HashRingTest, GetNode) {
+  vector<IpAddr> basic_addresses;
+  GetBasicNetworkAddresses(basic_addresses);
+
+  const uint32_t replication = 10;
+  HashRing h(replication);
+  // Test null return when empty
+  EXPECT_EQ(h.GetNode(0), nullptr);
+
+  for (const IpAddr& addr : basic_addresses) h.AddNode(addr);
+
+  // Test wraparound
+  // (Note that this assumes that nothing mapped to exactly UINT_MAX.)
+  const IpAddr* first_node = h.GetNode(0);
+  const IpAddr* wrapped_node = h.GetNode(UINT_MAX);
+  EXPECT_EQ(first_node, wrapped_node);
+
+  // Pick 100 random addresses and verify that they are all non-null and exist in
+  // the input addresses. This is not deterministic.
+  pcg32 prng(time(nullptr));
+  for (int i = 0; i < 100; i++) {
+    const IpAddr* getnode_output = h.GetNode(prng());
+    DCHECK(getnode_output != nullptr);
+    auto addr_it =
+        std::find(basic_addresses.begin(), basic_addresses.end(), *getnode_output);
+    DCHECK(addr_it != basic_addresses.end());
+  }
+}
+
+TEST_F(HashRingTest, CopyConstructor) {
+  vector<IpAddr> basic_addresses;
+  GetBasicNetworkAddresses(basic_addresses);
+
+  // Make a basic HashRing with a few elements
+  const uint32_t replication = 10;
+  unique_ptr<HashRing> old_h(new HashRing(replication));
+  for (const IpAddr& addr : basic_addresses) old_h->AddNode(addr);
+  std::map<IpAddr, uint64_t> old_dist_map;
+  GetDistributionMap(*old_h, &old_dist_map);
+
+  // Use copy constructor
+  HashRing new_h(*old_h);
+
+  // Destroy old HashRing
+  old_h.reset();
+
+  // Verify that the new HashRing is the same
+  VerifyCounts(new_h, basic_addresses.size());
+  std::map<IpAddr, uint64_t> new_dist_map;
+  // Getting the distribution visits every element, so it is an effective check for
+  // use-after-free, etc.
+  GetDistributionMap(new_h, &new_dist_map);
+
+  // The distribution should be identical
+  EXPECT_EQ(new_dist_map, old_dist_map);
+}
+
+TEST_F(HashRingTest, TotalAllocation) {
+  const uint32_t replication = 10;
+
+  // Test total allocation for basic addresses
+  vector<IpAddr> basic_addresses;
+  GetBasicNetworkAddresses(basic_addresses);
+  VerifyTotalAllocation(basic_addresses, replication);
+
+  // Test total allocation for 100 addresses
+  vector<IpAddr> multiple_addresses;
+  GetMultipleNetworkAddresses("total_alloc_host", 100, multiple_addresses);
+  VerifyTotalAllocation(multiple_addresses, replication);
+}
+
+TEST_F(HashRingTest, Collisions) {
+  // Collisions should be rare, even with a relatively large number of nodes.
+  // If there are collisions, the counts will not match up, because a node will not have
+  // the specified replication. This verifies that that there are no collisions on this
+  // case. Clearly, there are cases where there will be collisions and this could fail
+  // innocuously if the algorithm changes, but this is a good canary in the coalmine to
+  // catch changes that cause abnormal collisions.
+  const uint32_t replication = 25;
+  vector<IpAddr> multiple_addresses;
+  GetMultipleNetworkAddresses("collision_host", 1000, multiple_addresses);
+  HashRing h(replication);
+  for (const IpAddr& addr : multiple_addresses) h.AddNode(addr);
+  VerifyCounts(h, 1000);
+}
+
+TEST_F(HashRingTest, MaxMinRatio) {
+  // The ratio of the maximum node to the minimum node should be bounded.
+  // This is purely a functional question. It makes no assumption about the underlying
+  // statistics. If this ratio regresses, it could impact scheduling.
+  const uint32_t replication = 25;
+  vector<uint32_t> node_counts = {10, 25, 100};
+  vector<double> max_expected_ratios = {3.0, 3.0, 4.5};
+  DCHECK_EQ(node_counts.size(), max_expected_ratios.size());
+  // Anecdotally, the base hostname has a noticeable (but bounded) impact on the ratio.
+  // Test with multiple base hostnames.
+  vector<string> base_hostnames = {"minmaxhost", "qwert", "asdfjkl", "2a8b67", "91htgh"};
+  for (int i = 0; i < node_counts.size(); i++) {
+    for (const string& base_hostname : base_hostnames) {
+      vector<IpAddr> addresses;
+      GetMultipleNetworkAddresses(base_hostname, node_counts[i], addresses);
+      VerifyMaxMinRatio(addresses, replication, max_expected_ratios[i]);
+    }
+  }
+}
+
+}
+
+IMPALA_TEST_MAIN();
diff --git a/be/src/scheduling/hash-ring.cc b/be/src/scheduling/hash-ring.cc
new file mode 100644
index 0000000..37bbed5
--- /dev/null
+++ b/be/src/scheduling/hash-ring.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <map>
+#include <random>
+
+#include "scheduling/hash-ring.h"
+#include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp"
+#include "util/container-util.h"
+#include "util/hash-util.h"
+#include "util/network-util.h"
+#include "gen-cpp/Types_types.h"
+#include "common/names.h"
+
+namespace impala {
+
+HashRing::HashRing(const HashRing& hash_ring)
+  : nodes_(hash_ring.nodes_), num_replicas_(hash_ring.num_replicas_) {
+  for (const auto& old_pair : hash_ring.hash_to_node_) {
+    uint32_t hash_value = old_pair.first;
+    NodeIterator old_node_it = old_pair.second;
+    // Look up the equivalent node iterator in the new nodes_ set.
+    NodeIterator new_node_it = nodes_.find(*old_node_it);
+    DCHECK(new_node_it != nodes_.end());
+    hash_to_node_.emplace(hash_value, new_node_it);
+  }
+}
+
+void HashRing::AddNode(const IpAddr& node) {
+  // This node should not already be in the set.
+  std::pair<NodeIterator, bool> node_pair = nodes_.insert(node);
+  // 'second' tells whether a new element was inserted. It must be true.
+  DCHECK(node_pair.second) << "Failed to add: " << node;
+  NodeIterator node_it = node_pair.first;
+  // Generate multiple hashes of the IpAddr by using the hash as a seed to a PRNG.
+  uint32_t hash = HashUtil::Hash(node.data(), node.length(), 0);
+  pcg32 prng(hash);
+  for (uint32_t i = 0; i < num_replicas_; i++) {
+    uint32_t hash_val = prng();
+    // Check for hash collision
+    auto hashmap_it = hash_to_node_.find(hash_val);
+    // Write this new value if:
+    // 1. There is no hash collision.
+    // -OR-
+    // 2. There is a hash collision, but the underlying IpAddr is less than
+    //    the current value.
+    // This guarantees consistency regardless of the order of adds and removes.
+    if (hashmap_it == hash_to_node_.end() || *node_it < *hashmap_it->second) {
+      hash_to_node_[hash_val] = node_it;
+    }
+  }
+}
+
+void HashRing::RemoveNode(const IpAddr& node) {
+  // This node must be in the set. Keep the iterator to erase it later.
+  NodeIterator node_it = nodes_.find(node);
+  DCHECK(node_it != nodes_.end());
+
+  // Walk the map and remove any entries that have a NodeIterator pointing to this node.
+  size_t num_removed = 0;
+  auto hash_to_node_it = hash_to_node_.begin();
+  while (hash_to_node_it != hash_to_node_.end()) {
+    if (hash_to_node_it->second == node_it) {
+      hash_to_node_it = hash_to_node_.erase(hash_to_node_it);
+      num_removed++;
+    } else {
+      hash_to_node_it++;
+    }
+  }
+  DCHECK_GT(num_removed, 0);
+  nodes_.erase(node_it);
+}
+
+const IpAddr* HashRing::GetNode(uint32_t hash_value) const {
+  if (hash_to_node_.empty()) return nullptr;
+  // Find the element that immediately follows this hash value
+  auto next_elem = hash_to_node_.lower_bound(hash_value);
+  if (next_elem == hash_to_node_.end()) {
+    // This is larger than the largest elem. Return the smallest elem
+    next_elem = hash_to_node_.begin();
+  }
+  NodeIterator node_it = next_elem->second;
+  return &(*node_it);
+}
+
+void HashRing::GetDistributionMap(
+    map<IpAddr, uint64_t>* distribution_map) const {
+  // Start at zero and add up the ranges for each distinct node by walking the map.
+  // There are UINT_MAX + 1 total values. GetNode() uses map::lower_bound, which is
+  // inclusive of the value (i.e. GetNode(5) would match a hash value of 5).
+  // This means all of ranges are off by one compared to typical indexes. This is
+  // irrelevent for the internal ranges, but the (end - start) calculation is wrong for
+  // the first range. It needs to be one longer. This is incorporated into the final
+  // section.
+  uint32_t last_index = 0;
+  for (const auto& hash_to_node_pair : hash_to_node_) {
+    const IpAddr& addr = *(hash_to_node_pair.second);
+    uint32_t end_index = hash_to_node_pair.first;
+    uint32_t range = end_index - last_index;
+    (*distribution_map)[addr] += range;
+    last_index = end_index;
+  }
+  // Any room from last_index to UINT_MAX goes to the min element
+  uint64_t range = UINT_MAX - last_index;
+  // Incorporate the missing +1 from the first range
+  range++;
+  const IpAddr& addr = *(hash_to_node_.begin()->second);
+  (*distribution_map)[addr] += range;
+}
+
+}
diff --git a/be/src/scheduling/hash-ring.h b/be/src/scheduling/hash-ring.h
new file mode 100644
index 0000000..73a13c8
--- /dev/null
+++ b/be/src/scheduling/hash-ring.h
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SCHEDULING_HASH_RING_H
+#define SCHEDULING_HASH_RING_H
+
+#include <map>
+#include <set>
+#include <vector>
+
+#include "common/logging.h"
+
+namespace impala {
+
+typedef std::string IpAddr;
+
+/// Simple hash ring implementation specialized for IpAddr.
+///
+/// A hash ring is a consistent hash. It allows distributing items consistently across
+/// a set of nodes. An important property is that if nodes come or go, the number of
+/// items that are remapped to different nodes should be small.
+///
+/// A hash ring works by hashing nodes to place them throughout the hash space (in this
+/// case unsigned 4 byte integers). To balance the allocation of the hash space, a node
+/// can be hashed multiple times to give it multiple locations throughout the hash space.
+/// To look up a node for a given item, we hash the item and find the closest node after
+/// that hash value in the hash space. This implementation is agnostic about the type of
+/// item and expects the caller to do its own hashing for the item.
+///
+/// When nodes are added or removed, only the hash space in the immediate vicinity of the
+/// node is remapped. This allows for bounded disruption that should be proportional to
+/// 1 / # of nodes.
+///
+/// TODO: This can be modified to use any type for the node (rather than IpAddr)
+/// by taking in a function pointer that generates a hash value for the templated type.
+class HashRing {
+ public:
+  /// Create a new empty hash ring using the specified replication. Each node is placed
+  /// into the hash space 'num_replicas' times. A higher value for 'num_replicas'
+  /// results in a more even distribution of the hash space between nodes, but it requires
+  /// more memory/time.
+  HashRing(uint32_t num_replicas)
+    : num_replicas_(num_replicas) {
+    DCHECK_GT(num_replicas_, 0);
+  }
+
+  /// Copy constructor
+  /// This is needed because HashRing is stored in a structure that is copy on write.
+  /// The standard copy constructor does not know about the NodeIterators in the
+  /// hash_to_node_. This copies the nodes_ structure and iterates over the
+  /// hash_to_node_, inserting an equivalent pair in the new hash_to_node_.
+  HashRing(const HashRing& hash_ring);
+
+  /// Move constructor is not tested or implemented, so delete for now
+  HashRing(HashRing&& hash_ring) = delete;
+
+  /// Add a node to the hash ring. This hashes the node 'num_replicas_' times and tries to
+  /// insert the node into the map at the hash location. In the event of a hash collision,
+  /// the map will be set to the minimum of the current value and the new value.
+  /// Nodes must be unique.
+  void AddNode(const IpAddr& node);
+
+  /// This removes the specified node from the hashring. This removes all elements that
+  /// reference this node. Nodes must be unique.
+  void RemoveNode(const IpAddr& node);
+
+  /// Get the first node equal or larger than the specified 'hash_value'. If 'hash_value'
+  /// is larger than the largest hash value, it gets the element with the smallest hash
+  /// value. If the hash ring is empty, this returns nullptr.
+  const IpAddr* GetNode(uint32_t hash_value) const;
+ private:
+  friend class HashRingTest;
+  friend class HashRingDistributionCheck;
+
+  /// Populate a map from IpAddr to the sum of ranges in the hash space that map to this
+  /// IpAddr. The total hash space has 2^32 numbers, so the elements in this map sum to
+  /// 2^32. The range is uint64_t as 2^32 is just outside the maximum value for uint32_t.
+  /// This is useful for examining how the hash space is balanced between nodes.
+  void GetDistributionMap(std::map<IpAddr, uint64_t>* distribution_map) const;
+  uint32_t GetNumReplicas() const { return num_replicas_; }
+  size_t GetNumNodes() const { return nodes_.size(); }
+  size_t GetTotalReplicas() const { return hash_to_node_.size(); }
+
+  /// To avoid keeping num_replicas_ copies of the IpAddr's, store them in
+  /// a set and store iterators pointing to these elements in the hash_to_node_.
+  std::set<IpAddr> nodes_;
+  typedef std::set<IpAddr>::iterator NodeIterator;
+
+  /// Map from a hash value to the associated node iterator, which can be used to lookup
+  /// the base value. In the event of a collision, the value is set by taking the minimum
+  /// of the two underlying IpAddr's. Collisions should be rare, so this will only rarely
+  /// impact the actual allocations.
+  std::map<uint32_t, NodeIterator> hash_to_node_;
+
+  /// The number of times each node is added to the hash space.
+  uint32_t num_replicas_;
+};
+
+}
+
+#endif
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 30c3ad1..b1534da 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -221,6 +221,10 @@ void Plan::SetReplicaPreference(TReplicaPreference::type p) {
   query_options_.replica_preference = p;
 }
 
+void Plan::SetNumRemoteExecutorCandidates(int32_t num) {
+  query_options_.num_remote_executor_candidates = num;
+}
+
 const vector<TNetworkAddress>& Plan::referenced_datanodes() const {
   return referenced_datanodes_;
 }
diff --git a/be/src/scheduling/scheduler-test-util.h b/be/src/scheduling/scheduler-test-util.h
index 5357d96..13dfda5 100644
--- a/be/src/scheduling/scheduler-test-util.h
+++ b/be/src/scheduling/scheduler-test-util.h
@@ -116,6 +116,10 @@ class Cluster {
   /// Convert a host index to a hostname.
   static Hostname HostIdxToHostname(int host_idx);
 
+  /// Convert a host index to an IP address. The host index must be smaller than 2^24 and
+  /// will specify the lower 24 bits of the IPv4 address (the lower 3 octets).
+  static IpAddr HostIdxToIpAddr(int host_idx);
+
   /// Return the backend address (ip, port) for the host with index 'host_idx'.
   void GetBackendAddress(int host_idx, TNetworkAddress* addr) const;
 
@@ -160,10 +164,6 @@ class Cluster {
 
   /// Map from IP addresses to host indexes.
   std::unordered_map<IpAddr, int> ip_to_idx_;
-
-  /// Convert a host index to an IP address. The host index must be smaller than 2^24 and
-  /// will specify the lower 24 bits of the IPv4 address (the lower 3 octets).
-  static IpAddr HostIdxToIpAddr(int host_idx);
 };
 
 struct Block {
@@ -267,6 +267,7 @@ class Plan {
   void SetReplicaPreference(TReplicaPreference::type p);
 
   void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
+  void SetNumRemoteExecutorCandidates(int32_t num);
   const Cluster& cluster() const { return schema_.cluster(); }
 
   const std::vector<TNetworkAddress>& referenced_datanodes() const;
diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc
index 855897b..0b271ad 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -118,7 +118,7 @@ TEST_F(SchedulerTest, ScanTableTwice) {
 /// TODO: This test can be removed once we have the non-random backend round-robin by
 /// rank.
 /// Schedule randomly over 3 backends and ensure that each backend is at least used once.
-TEST_F(SchedulerTest, RandomReads) {
+TEST_F(SchedulerTest, LocalReadRandomReplica) {
   Cluster cluster;
   cluster.AddHosts(3, true, true);
 
@@ -186,8 +186,9 @@ TEST_F(SchedulerTest, TestMediumSizedCluster) {
   EXPECT_EQ(16, result.NumDiskAssignments());
 }
 
-/// Verify that remote placement and scheduling work as expected.
-TEST_F(SchedulerTest, RemoteOnlyPlacement) {
+/// Verify that remote placement and scheduling work as expected when
+/// num_remote_executor_candidates=0. (i.e. that it is random and covers all nodes).
+TEST_F(SchedulerTest, NoRemoteExecutorCandidates) {
   Cluster cluster;
   for (int i = 0; i < 100; ++i) cluster.AddHost(i < 30, true);
 
@@ -196,6 +197,7 @@ TEST_F(SchedulerTest, RemoteOnlyPlacement) {
 
   Plan plan(schema);
   plan.AddTableScan("T1");
+  plan.SetNumRemoteExecutorCandidates(0);
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
@@ -206,6 +208,105 @@ TEST_F(SchedulerTest, RemoteOnlyPlacement) {
   EXPECT_EQ(Block::DEFAULT_BLOCK_SIZE, result.MaxNumAssignedBytesPerHost());
 }
 
+/// Tests scheduling with num_remote_executor_candidates > 0. Specifically, it verifies
+/// that repeated scheduling of a block happens on the appropriate number of distinct
+/// nodes for varying values of num_remote_executor_candidates. This includes cases
+/// where the num_remote_executor_candidates exceeds the number of Impala executors.
+TEST_F(SchedulerTest, RemoteExecutorCandidates) {
+  Cluster cluster;
+  int num_data_nodes = 3;
+  int num_impala_nodes = 5;
+  // Set of datanodes
+  cluster.AddHosts(num_data_nodes, false, true);
+  // Set of Impala hosts
+  cluster.AddHosts(num_impala_nodes, true, false);
+
+  Schema schema(cluster);
+  schema.AddSingleBlockTable("T1", {0, 1, 2});
+
+  // Test a range of number of remote executor candidates, including cases where the
+  // number of remote executor candidates exceeds the number of Impala nodes.
+  for (int num_candidates = 1; num_candidates <= num_impala_nodes + 1; ++num_candidates) {
+    Plan plan(schema);
+    plan.AddTableScan("T1");
+    plan.SetRandomReplica(true);
+    plan.SetNumRemoteExecutorCandidates(num_candidates);
+
+    Result result(plan);
+    SchedulerWrapper scheduler(plan);
+    for (int i = 0; i < 100; ++i) ASSERT_OK(scheduler.Compute(&result));
+
+    ASSERT_EQ(100, result.NumAssignments());
+    EXPECT_EQ(100, result.NumTotalAssignments());
+    EXPECT_EQ(100 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
+    if (num_candidates < num_impala_nodes) {
+      EXPECT_EQ(num_candidates, result.NumDistinctBackends());
+    } else {
+      EXPECT_EQ(num_impala_nodes, result.NumDistinctBackends());
+    }
+    EXPECT_GE(result.MinNumAssignedBytesPerHost(), Block::DEFAULT_BLOCK_SIZE);
+    // If there is only one remote executor candidate, then all scan ranges will be
+    // assigned to one backend.
+    if (num_candidates == 1) {
+      EXPECT_EQ(result.MinNumAssignedBytesPerHost(), 100 * Block::DEFAULT_BLOCK_SIZE);
+    }
+  }
+}
+
+/// Verify basic consistency of remote executor candidates. Specifically, it schedules
+/// a set of blocks, then removes an executor that did not have any blocks assigned to
+/// it, and verifies that rerunning the scheduling results in the same assignments.
+TEST_F(SchedulerTest, RemoteExecutorCandidateConsistency) {
+  Cluster cluster;
+  int num_data_nodes = 3;
+  int num_impala_nodes = 50;
+
+  // Set of Impala hosts
+  cluster.AddHosts(num_impala_nodes, true, false);
+  // Set of datanodes
+  cluster.AddHosts(num_data_nodes, false, true);
+
+  // Replica placement is unimportant for this test. All blocks will be on
+  // all datanodes, but Impala is runnning remotely.
+  Schema schema(cluster);
+  schema.AddMultiBlockTable("T1", 25, ReplicaPlacement::RANDOM, 3);
+
+  Plan plan(schema);
+  plan.AddTableScan("T1");
+  plan.SetRandomReplica(false);
+  plan.SetNumRemoteExecutorCandidates(3);
+
+  Result result_base(plan);
+  SchedulerWrapper scheduler(plan);
+  ASSERT_OK(scheduler.Compute(&result_base));
+  EXPECT_EQ(25, result_base.NumTotalAssignments());
+  EXPECT_EQ(25 * Block::DEFAULT_BLOCK_SIZE, result_base.NumTotalAssignedBytes());
+
+  // There are 25 blocks and 50 Impala hosts. There will be Impala hosts without
+  // any assigned bytes. Removing one of them should not change the outcome.
+  Result result_empty_removed(plan);
+  // Find an Impala host that was not assigned any bytes and remove it
+  bool removed_one = false;
+  for (int i = 0; i < num_impala_nodes; ++i) {
+    if (result_base.NumTotalAssignedBytes(i) == 0) {
+      scheduler.RemoveBackend(cluster.hosts()[i]);
+      removed_one = true;
+      break;
+    }
+  }
+  ASSERT_TRUE(removed_one);
+  // Rerun the scheduling with the node removed.
+  ASSERT_OK(scheduler.Compute(&result_empty_removed));
+  EXPECT_EQ(25, result_empty_removed.NumTotalAssignments());
+  EXPECT_EQ(25 * Block::DEFAULT_BLOCK_SIZE, result_empty_removed.NumTotalAssignedBytes());
+
+  // Verify that the outcome is identical.
+  for (int i = 0; i < num_impala_nodes; ++i) {
+    EXPECT_EQ(result_base.NumRemoteAssignedBytes(i),
+              result_empty_removed.NumRemoteAssignedBytes(i));
+  }
+}
+
 /// Add a table with 1000 scan ranges over 10 hosts and ensure that the right number of
 /// assignments is computed.
 TEST_F(SchedulerTest, ManyScanRanges) {
@@ -296,6 +397,8 @@ TEST_F(SchedulerTest, EmptyStatestoreMessage) {
 
   Plan plan(schema);
   plan.AddTableScan("T1");
+  // Test only applies when num_remote_executor_candidates=0.
+  plan.SetNumRemoteExecutorCandidates(0);
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
@@ -329,6 +432,8 @@ TEST_F(SchedulerTest, TestSendUpdates) {
 
   Plan plan(schema);
   plan.AddTableScan("T1");
+  // Test only applies when num_remote_executor_candidates=0.
+  plan.SetNumRemoteExecutorCandidates(0);
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index de671a9..3d6b49f 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -32,9 +32,12 @@
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/exec-env.h"
+#include "scheduling/hash-ring.h"
 #include "statestore/statestore-subscriber.h"
+#include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp"
 #include "util/container-util.h"
 #include "util/flat_buffer.h"
+#include "util/hash-util.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
 #include "util/runtime-profile-counters.h"
@@ -674,7 +677,7 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& executor_confi
       // Remote reads will always break ties by executor rank.
       bool decide_local_assignment_by_rank = random_replica || cached_replica;
       const IpAddr* executor_ip = nullptr;
-      executor_ip = assignment_ctx.SelectLocalExecutor(
+      executor_ip = assignment_ctx.SelectExecutorFromCandidates(
           executor_candidates, decide_local_assignment_by_rank);
       TBackendDescriptor executor;
       assignment_ctx.SelectExecutorOnHost(*executor_ip, &executor);
@@ -684,9 +687,31 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& executor_confi
   } // End of for loop over scan ranges.
 
   // Assign remote scans to executors.
+  int num_remote_executor_candidates = query_options.num_remote_executor_candidates;
   for (const TScanRangeLocationList* scan_range_locations : remote_scan_range_locations) {
     DCHECK(!exec_at_coord);
-    const IpAddr* executor_ip = assignment_ctx.SelectRemoteExecutor();
+    const IpAddr* executor_ip;
+    vector<IpAddr> remote_executor_candidates;
+    // Limit the number of remote executor candidates:
+    // 1. When enabled by setting 'num_remote_executor_candidates' > 0
+    // AND
+    // 2. This is an HDFS file split
+    // AND
+    // 3. The number of remote executor candidates is less than the number of backends.
+    // Otherwise, fall back to the normal method of selecting executors for remote
+    // ranges, which allows for execution on any backend.
+    if (scan_range_locations->scan_range.__isset.hdfs_file_split &&
+        num_remote_executor_candidates > 0 &&
+        num_remote_executor_candidates < executor_config.NumBackends()) {
+      assignment_ctx.GetRemoteExecutorCandidates(
+          &scan_range_locations->scan_range.hdfs_file_split,
+          num_remote_executor_candidates, &remote_executor_candidates);
+      // Like the local case, schedule_random_replica determines how to break ties.
+      executor_ip = assignment_ctx.SelectExecutorFromCandidates(
+          remote_executor_candidates, random_replica);
+    } else {
+      executor_ip = assignment_ctx.SelectRemoteExecutor();
+    }
     TBackendDescriptor executor;
     assignment_ctx.SelectExecutorOnHost(*executor_ip, &executor);
     assignment_ctx.RecordScanRangeAssignment(
@@ -842,7 +867,7 @@ Scheduler::AssignmentCtx::AssignmentCtx(const BackendConfig& executor_config,
   for (const IpAddr& ip : random_executor_order_) random_executor_rank_[ip] = i++;
 }
 
-const IpAddr* Scheduler::AssignmentCtx::SelectLocalExecutor(
+const IpAddr* Scheduler::AssignmentCtx::SelectExecutorFromCandidates(
     const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) {
   DCHECK(!data_locations.empty());
   // List of candidate indexes into 'data_locations'.
@@ -874,6 +899,40 @@ const IpAddr* Scheduler::AssignmentCtx::SelectLocalExecutor(
   return &data_locations[*min_rank_idx];
 }
 
+void Scheduler::AssignmentCtx::GetRemoteExecutorCandidates(
+    const THdfsFileSplit* hdfs_file_split, int num_candidates,
+    vector<IpAddr>* remote_executor_candidates) {
+  // This should be given an empty vector
+  DCHECK_EQ(remote_executor_candidates->size(), 0);
+  // This function should not be used when 'num_candidates' exceeds the number
+  // of executors.
+  DCHECK_LT(num_candidates, executors_config_.NumBackends());
+  // Two different hashes of the filename can result in the same executor.
+  // The function should return distinct executors, so it may need to do more hashes
+  // than 'num_candidates'.
+  set<IpAddr> distinct_backends;
+  // Generate multiple hashes of the file split by using the hash as a seed to a PRNG.
+  // Note: This hashes both the filename and the offset to allow very large files
+  // to be spread across more executors.
+  uint32_t hash = HashUtil::Hash(hdfs_file_split->file_name.data(),
+      hdfs_file_split->file_name.length(), 0);
+  hash = HashUtil::Hash(&hdfs_file_split->offset, sizeof(hdfs_file_split->offset), hash);
+  pcg32 prng(hash);
+  // To avoid any problem scenarios, limit the total number of iterations
+  int max_iterations = num_candidates * 3;
+  for (int i = 0; i < max_iterations; ++i) {
+    // Look up nearest IpAddr
+    const IpAddr* executor_addr = executors_config_.GetHashRing()->GetNode(prng());
+    DCHECK(executor_addr != nullptr);
+    distinct_backends.insert(*executor_addr);
+    // Short-circuit if we reach the appropriate number of replicas
+    if (distinct_backends.size() == num_candidates) break;
+  }
+  for (const IpAddr& addr : distinct_backends) {
+    remote_executor_candidates->push_back(addr);
+  }
+}
+
 const IpAddr* Scheduler::AssignmentCtx::SelectRemoteExecutor() {
   const IpAddr* candidate_ip;
   if (HasUnusedExecutors()) {
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 48f223d..241a8cb 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -183,9 +183,19 @@ class Scheduler {
     /// 'break_ties_by_rank' is true, then the executor rank is used to break ties.
     /// Otherwise the first executor according to their order in 'data_locations' is
     /// selected.
-    const IpAddr* SelectLocalExecutor(
+    const IpAddr* SelectExecutorFromCandidates(
         const std::vector<IpAddr>& data_locations, bool break_ties_by_rank);
 
+    /// Populate 'remote_executor_candidates' with 'num_candidates' distinct
+    /// executors. The algorithm for picking remote executor candidates is to hash
+    /// the file name / offset from 'hdfs_file_split' multiple times and look up the
+    /// closest executors stored in the BackendConfig's HashRing. Given the same file
+    /// name / offset and same set of executors, this function is deterministic. The hash
+    /// ring also limits the disruption when executors are added or removed. Note that
+    /// 'num_candidates' cannot be 0 and must be less than the total number of executors.
+    void GetRemoteExecutorCandidates(const THdfsFileSplit* hdfs_file_split,
+        int num_remote_replicas, vector<IpAddr>* remote_executor_candidates);
+
     /// Select an executor for a remote read. If there are unused executor hosts, then
     /// those will be preferred. Otherwise the one with the lowest number of assigned
     /// bytes is picked. If executors have been assigned equal amounts of work, then the
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 6f5bfce..177acac 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -736,6 +736,20 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_resource_trace_ratio(val);
         break;
       }
+      case TImpalaQueryOptions::NUM_REMOTE_EXECUTOR_CANDIDATES: {
+        StringParser::ParseResult result;
+        const int64_t num_remote_executor_candidates =
+            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS ||
+            num_remote_executor_candidates < 0 || num_remote_executor_candidates > 16) {
+          return Status(
+              Substitute("$0 is not valid for num_remote_executor_candidates. "
+                         "Valid values are in [0, 16].", value));
+        }
+        query_options->__set_num_remote_executor_candidates(
+            num_remote_executor_candidates);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index b3276ff..53aedd6 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::RESOURCE_TRACE_RATIO + 1);\
+      TImpalaQueryOptions::NUM_REMOTE_EXECUTOR_CANDIDATES + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -146,6 +146,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(topn_bytes_limit, TOPN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(client_identifier, CLIENT_IDENTIFIER, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(resource_trace_ratio, RESOURCE_TRACE_RATIO, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(num_remote_executor_candidates, NUM_REMOTE_EXECUTOR_CANDIDATES,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index ee8f369..a790105 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -314,6 +314,10 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   75: optional double resource_trace_ratio = 0;
+
+  // See comment in ImpalaService.thrift.
+  // The default value is set to 3 as this is the default value of HDFS replicas.
+  76: optional i32 num_remote_executor_candidates = 3;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index bb3b0af..0eafd6e 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -356,6 +356,17 @@ enum TImpalaQueryOptions {
   // executors of a query. Must be between 0 and 1 inclusive, 0 means no query will be
   // traced, 1 means all queries will be traced.
   RESOURCE_TRACE_RATIO,
+
+  // The maximum number of executor candidates to consider when scheduling remote
+  // HDFS ranges. When non-zero, the scheduler generates a consistent set of
+  // executor candidates based on the filename and offset. This algorithm is designed
+  // to avoid changing file to node mappings when nodes come and go. It then picks from
+  // among the candidates by the same method used for local scan ranges. Limiting the
+  // number of nodes that can read a single file provides a type of simulated locality.
+  // This increases the efficiency of file-related caches (e.g. the HDFS file handle
+  // cache). If set to 0, the number of executor candidates is unlimited, and remote
+  // ranges will be scheduled across all executors.
+  NUM_REMOTE_EXECUTOR_CANDIDATES,
 }
 
 // The summary of a DML statement.


[impala] 05/08: test-with-docker: decrease image size by "de-duping" HDFS.

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2f5d0016eaa3f9e0e4e3b02032d092cd7b887198
Author: Philip Zeyliger <ph...@cloudera.com>
AuthorDate: Tue Oct 23 09:53:54 2018 -0700

    test-with-docker: decrease image size by "de-duping" HDFS.
    
    This change shaves about 20GB of the (uncompressed) Docker
    image for test-with-docker, taking it from ~60GB to ~40GB.
    Compressed, the image ends up being about 14GB.
    
    To do this, we cheat: HDFS represents every block three times, so we
    have three copies of every block. Before committing the image, we simply
    hard-link the blocks together, which happens to work. It's an
    implementation detail of HDFS that these blocks aren't, say, appended
    to, but I think the trade-off in time and disk space saved is worth it.
    Because the image is smaller, it takes less time to "docker commit" it.
    
    Change-Id: I4a13910ba5e873c31893dbb810a8410547adb2f1
    Reviewed-on: http://gerrit.cloudera.org:8080/11782
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 docker/entrypoint.sh | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh
index c4a1243..3f56252 100755
--- a/docker/entrypoint.sh
+++ b/docker/entrypoint.sh
@@ -252,6 +252,24 @@ function build_impdev() {
   # Shut down things cleanly.
   testdata/bin/kill-all.sh
 
+  # "Compress" HDFS data by de-duplicating blocks. As a result of
+  # having three datanodes, our data load is 3x larger than it needs
+  # to be. To alleviate this (to the tune of ~20GB savings), we
+  # use hardlinks to link together the identical blocks. This is absolutely
+  # taking advantage of an implementation detail of HDFS.
+  echo "Hardlinking duplicate HDFS block data."
+  set +x
+  for x in $(find testdata/cluster/*/node-1/data/dfs/dn/current/ -name 'blk_*[0-9]'); do
+    for n in 2 3; do
+      xn=${x/node-1/node-$n}
+      if [ -f $xn ]; then
+        rm $xn
+        ln $x $xn
+      fi
+    done
+  done
+  set -x
+
   # Shutting down PostgreSQL nicely speeds up it's start time for new containers.
   _pg_ctl stop
 


[impala] 01/08: IMPALA-7694: Add host resource usage metrics to profile

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b5714097e096c6e4b0573a7b326789807a1e4e5f
Author: Lars Volker <lv...@cloudera.com>
AuthorDate: Tue Nov 13 20:50:41 2018 -0800

    IMPALA-7694: Add host resource usage metrics to profile
    
    This change adds a mechanism to collect host resource usage metrics to
    profiles. Metric collection can be controlled through a new query option
    'RESOURCE_TRACE_RATIO'. It specifies the probability with which metrics
    collection will be enabled. Collection always happens per query for all
    executors that run one or more fragment instances of the query.
    
    This mechanism adds a new time series counter class that collects all
    measured values and does not re-sample them. It will re-sample values
    when printing them into a string profile, preserving up to 64 values,
    but Thrift profiles will contain the full list of values.
    
    We add a new section "Per Node Profiles" to the profile to store and
    show these values:
    
    Per Node Profiles:
      lv-desktop:22000:
        CpuIoWaitPercentage (500.000ms): 0, 0
        CpuSysPercentage (500.000ms): 1, 1
        CpuUserPercentage (500.000ms): 4, 0
          - ScratchBytesRead: 0
          - ScratchBytesWritten: 0
          - ScratchFileUsedBytes: 0
          - ScratchReads: 0 (0)
          - ScratchWrites: 0 (0)
          - TotalEncryptionTime: 0.000ns
          - TotalReadBlockTime: 0.000ns
    
    This change also uses the aforementioned mechanism to collect CPU usage
    metrics (user, system, and IO wait time).
    
    A future change can then add a tool to decode a Thrift profile and plot
    the contained usage metrics, e.g. using matplotlib (IMPALA-8123). Such a
    tool is not included in this change because it will require some
    reworking of the python dependencies.
    
    This change also includes a few minor improvements to make the resulting
    code more readable:
    - Extend the PeriodicCounterUpdater to call functions to update global
      metrics before updating the counters.
    - Expose the scratch profile within the per node resource usage section.
    - Improve documentation of the profile counter classes.
    - Remove synchronization from StreamingSampler.
    - Remove a few pieces of dead code that otherwise would have required
      updates.
    - Factor some code for profile decoding into the Impala python library
    
    Testing: This change contains a unit test for the system level metrics
    collection and e2e tests for the profile changes.
    
    Change-Id: I3aedc20c553ab8d7ed50f72a1a936eba151487d9
    Reviewed-on: http://gerrit.cloudera.org:8080/12069
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/kudu/util/logging.h                         |   2 +-
 be/src/runtime/coordinator-backend-state.cc        |  12 +-
 be/src/runtime/coordinator-backend-state.h         |  14 +-
 be/src/runtime/coordinator.cc                      |  10 +-
 be/src/runtime/coordinator.h                       |   5 +
 be/src/runtime/exec-env.cc                         |  11 ++
 be/src/runtime/exec-env.h                          |   8 +
 be/src/runtime/fragment-instance-state.cc          |   4 +-
 be/src/runtime/krpc-data-stream-recvr.cc           |   2 +-
 be/src/runtime/query-state.cc                      |  38 +++-
 be/src/runtime/query-state.h                       |   3 +
 be/src/runtime/runtime-state.cc                    |   7 +-
 be/src/service/impala-server.cc                    |   9 +
 be/src/service/impala-server.h                     |   5 +
 be/src/service/query-options.cc                    |  18 +-
 be/src/service/query-options.h                     |   3 +-
 be/src/util/CMakeLists.txt                         |   2 +
 be/src/util/periodic-counter-updater.cc            |  16 +-
 be/src/util/periodic-counter-updater.h             |  26 ++-
 be/src/util/pretty-printer.h                       |  10 +-
 be/src/util/runtime-profile-counters.h             | 133 +++++++++++---
 be/src/util/runtime-profile-test.cc                | 175 ++++++++++++++++++
 be/src/util/runtime-profile.cc                     | 199 +++++++++++++++++----
 be/src/util/runtime-profile.h                      |  80 +++++++--
 be/src/util/streaming-sampler.h                    |  60 +------
 be/src/util/system-state-info-test.cc              |  91 ++++++++++
 be/src/util/system-state-info.cc                   | 110 ++++++++++++
 be/src/util/system-state-info.h                    |  94 ++++++++++
 bin/parse-thrift-profile.py                        |  28 +--
 common/thrift/ImpalaInternalService.thrift         |   7 +
 common/thrift/ImpalaService.thrift                 |   7 +-
 common/thrift/Metrics.thrift                       |   3 +
 common/thrift/RuntimeProfile.thrift                |   7 +
 .../python/impala_py_lib/profiles.py               |  45 +----
 tests/beeswax/impala_beeswax.py                    |   9 +-
 tests/query_test/test_observability.py             | 130 +++++++++-----
 36 files changed, 1109 insertions(+), 274 deletions(-)

diff --git a/be/src/kudu/util/logging.h b/be/src/kudu/util/logging.h
index 428dadc..5aebaec 100644
--- a/be/src/kudu/util/logging.h
+++ b/be/src/kudu/util/logging.h
@@ -161,7 +161,7 @@ class ScopedDisableRedaction {
       &google::LogMessage::SendToLog).stream()
 
 #define KLOG_EVERY_N_SECS(severity, n_secs) \
-  static logging::LogThrottler LOG_THROTTLER;  \
+  static ::kudu::logging::LogThrottler LOG_THROTTLER;  \
   KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, LOG_THROTTLER, "no-tag")
 
 
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 5d79450..ba4f355 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -58,13 +58,16 @@ Coordinator::BackendState::BackendState(
 }
 
 void Coordinator::BackendState::Init(
-    const BackendExecParams& exec_params,
-    const vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool) {
+    const BackendExecParams& exec_params, const vector<FragmentStats*>& fragment_stats,
+    RuntimeProfile* host_profile_parent, ObjectPool* obj_pool) {
   backend_exec_params_ = &exec_params;
   host_ = backend_exec_params_->instance_params[0]->host;
   krpc_host_ = backend_exec_params_->instance_params[0]->krpc_host;
   num_remaining_instances_ = backend_exec_params_->instance_params.size();
 
+  host_profile_ = RuntimeProfile::Create(obj_pool, TNetworkAddressToString(host_));
+  host_profile_parent->AddChild(host_profile_);
+
   // populate instance_stats_map_ and install instance
   // profiles as child profiles in fragment_stats' profile
   int prev_fragment_idx = -1;
@@ -384,6 +387,11 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
   return IsDoneLocked(lock);
 }
 
+void Coordinator::BackendState::UpdateHostProfile(
+    const TRuntimeProfileTree& thrift_profile) {
+  host_profile_->Update(thrift_profile);
+}
+
 void Coordinator::BackendState::UpdateExecStats(
     const vector<FragmentStats*>& fragment_stats) {
   lock_guard<mutex> l(lock_);
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 15790f9..122da42 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -61,10 +61,12 @@ class Coordinator::BackendState {
 
   /// Creates InstanceStats for all instance in backend_exec_params in obj_pool
   /// and installs the instance profiles as children of the corresponding FragmentStats'
-  /// root profile.
+  /// root profile. Also creates a child profile below 'host_profile_parent' that contains
+  /// counters for the backend.
   /// Separated from c'tor to simplify future handling of out-of-mem errors.
   void Init(const BackendExecParams& backend_exec_params,
-      const std::vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool);
+      const std::vector<FragmentStats*>& fragment_stats,
+      RuntimeProfile* host_profile_parent, ObjectPool* obj_pool);
 
   /// Starts query execution at this backend by issuing an ExecQueryFInstances rpc and
   /// notifies on rpc_complete_barrier when the rpc completes. Success/failure is
@@ -87,6 +89,9 @@ class Coordinator::BackendState {
       const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
       ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state);
 
+  /// Merges the incoming 'thrift_profile' into this backend state's host profile.
+  void UpdateHostProfile(const TRuntimeProfileTree& thrift_profile);
+
   /// Update completion_times, rates, and avg_profile for all fragment_stats.
   void UpdateExecStats(const std::vector<FragmentStats*>& fragment_stats);
 
@@ -244,6 +249,11 @@ class Coordinator::BackendState {
   /// indices of fragments executing on this backend, populated in Init()
   std::unordered_set<int> fragments_;
 
+  /// Contains counters for the backend host that are not specific to a particular
+  /// fragment instance, e.g. global CPU utilization and scratch space usage.
+  /// Owned by coordinator object pool provided in the c'tor, created in Update().
+  RuntimeProfile* host_profile_ = nullptr;
+
   /// Thrift address of execution backend.
   TNetworkAddress host_;
   /// Krpc address of execution backend.
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 21da855..1579ada 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -99,6 +99,9 @@ Status Coordinator::Exec() {
   finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer");
   filter_updates_received_ = ADD_COUNTER(query_profile_, "FiltersReceived", TUnit::UNIT);
 
+  host_profiles_ = RuntimeProfile::Create(obj_pool(), "Per Node Profiles");
+  query_profile_->AddChild(host_profiles_);
+
   SCOPED_TIMER(query_profile_->total_time_counter());
 
   // initialize progress updater
@@ -207,7 +210,7 @@ void Coordinator::InitBackendStates() {
   for (const auto& entry: schedule_.per_backend_exec_params()) {
     BackendState* backend_state = obj_pool()->Add(
         new BackendState(*this, backend_idx, filter_mode_));
-    backend_state->Init(entry.second, fragment_stats_, obj_pool());
+    backend_state->Init(entry.second, fragment_stats_, host_profiles_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
   }
 }
@@ -699,6 +702,10 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
   }
   BackendState* backend_state = backend_states_[coord_state_idx];
 
+  if (thrift_profiles.__isset.host_profile) {
+    backend_state->UpdateHostProfile(thrift_profiles.host_profile);
+  }
+
   if (backend_state->ApplyExecStatusReport(request, thrift_profiles, &exec_summary_,
           &progress_, &dml_exec_state_)) {
     // This backend execution has completed.
@@ -816,6 +823,7 @@ void Coordinator::ComputeQuerySummary() {
   COUNTER_SET(ADD_COUNTER(query_profile_, "TotalCpuTime", TUnit::TIME_NS),
       total_utilization.cpu_user_ns + total_utilization.cpu_sys_ns);
 
+  // TODO(IMPALA-8126): Move to host profiles
   query_profile_->AddInfoString("Per Node Peak Memory Usage", mem_info.str());
   query_profile_->AddInfoString("Per Node Bytes Read", bytes_read_info.str());
   query_profile_->AddInfoString("Per Node User Time", cpu_user_info.str());
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index a559c14..b040120 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -263,6 +263,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Aggregate counters for the entire query. Lives in 'obj_pool_'. Set in Exec().
   RuntimeProfile* query_profile_ = nullptr;
 
+  /// Aggregate counters for backend host resource usage and other per-host information.
+  /// Will contain a child profile for each backend host that participates in the query
+  /// execution. Lives in 'obj_pool_'. Set in Exec().
+  RuntimeProfile* host_profiles_ = nullptr;
+
   /// Total time spent in finalization (typically 0 except for INSERT into hdfs
   /// tables). Set in Exec().
   RuntimeProfile::Counter* finalization_timer_ = nullptr;
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 0b0a889..db214f3 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -62,8 +62,10 @@
 #include "util/network-util.h"
 #include "util/openssl-util.h"
 #include "util/parse-util.h"
+#include "util/periodic-counter-updater.h"
 #include "util/pretty-printer.h"
 #include "util/test-info.h"
+#include "util/system-state-info.h"
 #include "util/thread-pool.h"
 #include "util/webserver.h"
 
@@ -269,6 +271,8 @@ Status ExecEnv::Init() {
   }
   InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit);
 
+  InitSystemStateInfo();
+
   RETURN_IF_ERROR(metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr));
   impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
   catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
@@ -500,6 +504,13 @@ void ExecEnv::InitMemTracker(int64_t bytes_limit) {
   }
 }
 
+void ExecEnv::InitSystemStateInfo() {
+  system_state_info_.reset(new SystemStateInfo());
+  PeriodicCounterUpdater::RegisterUpdateFunction([s = system_state_info_.get()]() {
+    s->CaptureSystemStateSnapshot();
+  });
+}
+
 TNetworkAddress ExecEnv::GetThriftBackendAddress() const {
   DCHECK(impala_server_ != nullptr);
   return impala_server_->GetThriftBackendAddress();
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 8a5a1f4..fc6f41d 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -61,6 +61,7 @@ class ReservationTracker;
 class RpcMgr;
 class Scheduler;
 class StatestoreSubscriber;
+class SystemStateInfo;
 class ThreadResourceMgr;
 class TmpFileMgr;
 class Webserver;
@@ -138,6 +139,7 @@ class ExecEnv {
   PoolMemTrackerRegistry* pool_mem_trackers() { return pool_mem_trackers_.get(); }
   ReservationTracker* buffer_reservation() { return buffer_reservation_.get(); }
   BufferPool* buffer_pool() { return buffer_pool_.get(); }
+  SystemStateInfo* system_state_info() { return system_state_info_.get(); }
 
   void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
 
@@ -207,6 +209,9 @@ class ExecEnv {
   boost::scoped_ptr<ReservationTracker> buffer_reservation_;
   boost::scoped_ptr<BufferPool> buffer_pool_;
 
+  /// Tracks system resource usage which we then include in profiles.
+  boost::scoped_ptr<SystemStateInfo> system_state_info_;
+
   /// Not owned by this class
   ImpalaServer* impala_server_ = nullptr;
   MetricGroup* rpc_metrics_ = nullptr;
@@ -260,6 +265,9 @@ class ExecEnv {
   /// Initialise 'mem_tracker_' with a limit of 'bytes_limit'. Must be called after
   /// InitBufferPool() and RegisterMemoryMetrics().
   void InitMemTracker(int64_t bytes_limit);
+
+  /// Initialize 'system_state_info_' to track system resource usage.
+  void InitSystemStateInfo();
 };
 
 } // namespace impala
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index c8ef699..c5799ee 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -162,11 +162,11 @@ Status FragmentInstanceState::Prepare() {
   avg_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
       bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
-  mem_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("MemoryUsage",
+  mem_usage_sampled_counter_ = profile()->AddSamplingTimeSeriesCounter("MemoryUsage",
       TUnit::BYTES,
       bind<int64_t>(mem_fn(&MemTracker::consumption),
           runtime_state_->instance_mem_tracker()));
-  thread_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("ThreadUsage",
+  thread_usage_sampled_counter_ = profile()->AddSamplingTimeSeriesCounter("ThreadUsage",
       TUnit::UNIT,
       bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 3bad645..04babfb 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -672,7 +672,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
   total_deferred_rpcs_counter_ =
       ADD_COUNTER(enqueue_profile_, "TotalRPCsDeferred", TUnit::UNIT);
   deferred_rpcs_time_series_counter_ =
-      enqueue_profile_->AddTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT,
+      enqueue_profile_->AddSamplingTimeSeriesCounter("DeferredQueueSize", TUnit::UNIT,
       bind<int64_t>(mem_fn(&KrpcDataStreamRecvr::num_deferred_rpcs), this));
   total_has_deferred_rpcs_timer_ =
       ADD_TIMER(enqueue_profile_, "TotalHasDeferredRPCsTime");
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 5779a0c..c0b5cc6 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -42,6 +42,7 @@
 #include "service/control-service.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
+#include "util/system-state-info.h"
 #include "util/thread.h"
 
 #include "gen-cpp/control_service.pb.h"
@@ -82,7 +83,8 @@ QueryState::QueryState(
     backend_resource_refcnt_(0),
     refcnt_(0),
     is_cancelled_(0),
-    query_spilled_(0) {
+    query_spilled_(0),
+    host_profile_(RuntimeProfile::Create(obj_pool(), "<track resource usage>")) {
   if (query_ctx_.request_pool.empty()) {
     // fix up pool name for tests
     DCHECK(!request_pool.empty());
@@ -130,6 +132,8 @@ QueryState::~QueryState() {
     // therefore be safely destroyed.
     query_mem_tracker_->CloseAndUnregisterFromParent();
   }
+  /// We started periodic counters that track the system resource usage in Init().
+  host_profile_->StopPeriodicCounters();
 }
 
 Status QueryState::Init(const TExecQueryFInstancesParams& exec_rpc_params) {
@@ -138,10 +142,28 @@ Status QueryState::Init(const TExecQueryFInstancesParams& exec_rpc_params) {
   // returns a resource refcount to its caller.
   AcquireBackendResourceRefcount();
 
+  ExecEnv* exec_env = ExecEnv::GetInstance();
+
+  // Initialize resource tracking counters.
+  if (query_ctx().trace_resource_usage) {
+    SystemStateInfo* system_state_info = exec_env->system_state_info();
+    host_profile_->AddChunkedTimeSeriesCounter(
+        "HostCpuUserPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
+        return system_state_info->GetCpuUsageRatios().user;
+        });
+    host_profile_->AddChunkedTimeSeriesCounter(
+        "HostCpuSysPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
+        return system_state_info->GetCpuUsageRatios().system;
+        });
+    host_profile_->AddChunkedTimeSeriesCounter(
+        "HostCpuIoWaitPercentage", TUnit::BASIS_POINTS, [system_state_info] () {
+        return system_state_info->GetCpuUsageRatios().iowait;
+        });
+  }
+
   // Starting a new query creates threads and consumes a non-trivial amount of memory.
   // If we are already starved for memory, fail as early as possible to avoid consuming
   // more resources.
-  ExecEnv* exec_env = ExecEnv::GetInstance();
   MemTracker* process_mem_tracker = exec_env->process_mem_tracker();
   if (process_mem_tracker->LimitExceeded(MemLimit::HARD)) {
     string msg = Substitute(
@@ -202,14 +224,10 @@ Status QueryState::InitBufferPoolState() {
   buffer_reservation_->InitChildTracker(
       NULL, exec_env->buffer_reservation(), query_mem_tracker_, max_reservation);
 
-  // TODO: once there's a mechanism for reporting non-fragment-local profiles,
-  // should make sure to report this profile so it's not going into a black hole.
-  RuntimeProfile* dummy_profile = RuntimeProfile::Create(&obj_pool_, "dummy");
-  // Only create file group if spilling is enabled.
   if (query_options().scratch_limit != 0 && !query_ctx_.disable_spilling) {
     file_group_ = obj_pool_.Add(
         new TmpFileMgr::FileGroup(exec_env->tmp_file_mgr(), exec_env->disk_io_mgr(),
-            dummy_profile, query_id(), query_options().scratch_limit));
+            host_profile_, query_id(), query_options().scratch_limit));
   }
   return Status::OK();
 }
@@ -275,6 +293,12 @@ void QueryState::ConstructReport(bool instances_started,
     }
   }
 
+  // Add profile to report
+  host_profile_->ToThrift(&profiles_forest->host_profile);
+  profiles_forest->__isset.host_profile = true;
+  // Free resources in chunked counters in the profile
+  host_profile_->ClearChunkedTimeSeriesCounters();
+
   if (instances_started) {
     for (const auto& entry : fis_map_) {
       FragmentInstanceState* fis = entry.second;
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 6453d3e..74afb8a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -384,6 +384,9 @@ class QueryState {
   /// StartFInstances().
   int64_t fragment_events_start_time_ = 0;
 
+  /// Tracks host resource usage of this backend. Owned by 'obj_pool_', created in c'tor.
+  RuntimeProfile* const host_profile_;
+
   /// Create QueryState w/ a refcnt of 0 and a memory limit of 'mem_limit' bytes applied
   /// to the query mem tracker. The query is associated with the resource pool set in
   /// 'query_ctx.request_pool' or from 'request_pool', if the former is not set (needed
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index bcaa028..9ecaafb 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -70,11 +70,10 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
     fragment_ctx_(&fragment_ctx),
     instance_ctx_(&instance_ctx),
     now_(new TimestampValue(TimestampValue::Parse(query_state->query_ctx().now_string))),
-    utc_timestamp_(new TimestampValue(TimestampValue::Parse(
-        query_state->query_ctx().utc_timestamp_string))),
+    utc_timestamp_(new TimestampValue(
+        TimestampValue::Parse(query_state->query_ctx().utc_timestamp_string))),
     local_time_zone_(&TimezoneDatabase::GetUtcTimezone()),
-    profile_(RuntimeProfile::Create(
-          obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id))),
+    profile_(RuntimeProfile::Create(obj_pool(), "<fragment instance>")),
     instance_buffer_reservation_(new ReservationTracker) {
   Init();
 }
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index fa0c002..5b18cdd 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -48,6 +48,7 @@
 #include "exec/external-data-source-executor.h"
 #include "exprs/timezone_db.h"
 #include "gen-cpp/CatalogService_constants.h"
+#include "kudu/util/random_util.h"
 #include "rpc/authentication.h"
 #include "rpc/rpc-trace.h"
 #include "rpc/thrift-thread.h"
@@ -107,6 +108,7 @@ using boost::get_system_time;
 using boost::system_time;
 using boost::uuids::random_generator;
 using boost::uuids::uuid;
+using kudu::GetRandomSeed32;
 using namespace apache::hive::service::cli::thrift;
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
@@ -262,6 +264,8 @@ const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
 // Interval between checks for query expiration.
 const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
 
+ThreadSafeRandom ImpalaServer::rng_(GetRandomSeed32());
+
 ImpalaServer::ImpalaServer(ExecEnv* exec_env)
     : exec_env_(exec_env),
       thrift_serializer_(false),
@@ -1018,6 +1022,11 @@ void ImpalaServer::PrepareQueryContext(const TNetworkAddress& backend_addr,
   // thread-safe).
   query_ctx->query_id = UuidToQueryId(random_generator()());
   GetThreadDebugInfo()->SetQueryId(query_ctx->query_id);
+
+  const double trace_ratio = query_ctx->client_request.query_options.resource_trace_ratio;
+  if (trace_ratio > 0 && rng_.NextDoubleFraction() < trace_ratio) {
+    query_ctx->__set_trace_resource_usage(true);
+  }
 }
 
 Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 981df74..dd7d221 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -32,6 +32,7 @@
 #include "gen-cpp/ImpalaHiveServer2Service.h"
 #include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/Frontend_types.h"
+#include "kudu/util/random.h"
 #include "rpc/thrift-server.h"
 #include "common/status.h"
 #include "service/query-options.h"
@@ -47,6 +48,7 @@
 #include "statestore/statestore-subscriber.h"
 
 namespace impala {
+using kudu::ThreadSafeRandom;
 
 class ExecEnv;
 class DataSink;
@@ -914,6 +916,9 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Background thread that does the shutdown.
   [[noreturn]] void ShutdownThread();
 
+  /// Random number generator for use in this class, thread safe.
+  static ThreadSafeRandom rng_;
+
   /// Guards query_log_ and query_log_index_
   boost::mutex query_log_lock_;
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index c3cf1e3..6f5bfce 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -75,13 +75,13 @@ void impala::OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMas
 // Choose different print function based on the type.
 // TODO: In thrift 0.11.0 operator << is implemented for enums and this indirection can be
 // removed.
-template<typename T, typename std::enable_if_t<std::is_enum<T>::value>* = nullptr>
+template <typename T, typename std::enable_if_t<std::is_enum<T>::value>* = nullptr>
 string PrintQueryOptionValue(const T& option) {
   return PrintThriftEnum(option);
 }
 
-template<typename T, typename std::enable_if_t<std::is_integral<T>::value>* = nullptr>
-string PrintQueryOptionValue(const T& option)  {
+template <typename T, typename std::enable_if_t<std::is_arithmetic<T>::value>* = nullptr>
+string PrintQueryOptionValue(const T& option) {
   return std::to_string(option);
 }
 
@@ -724,6 +724,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_client_identifier(value);
         break;
       }
+      case TImpalaQueryOptions::RESOURCE_TRACE_RATIO: {
+        StringParser::ParseResult result;
+        const double val =
+            StringParser::StringToFloat<double>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || val < 0 || val > 1) {
+          return Status(Substitute("Invalid resource trace ratio: '$0'. "
+                                   "Only values from 0 to 1 are allowed.",
+              value));
+        }
+        query_options->__set_resource_trace_ratio(val);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index f001ba4..b3276ff 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::CLIENT_IDENTIFIER + 1);\
+      TImpalaQueryOptions::RESOURCE_TRACE_RATIO + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -145,6 +145,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(cpu_limit_s, CPU_LIMIT_S, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(topn_bytes_limit, TOPN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(client_identifier, CLIENT_IDENTIFIER, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(resource_trace_ratio, RESOURCE_TRACE_RATIO, TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index e80560e..63bb0ba 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -79,6 +79,7 @@ add_library(Util
   string-parser.cc
   string-util.cc
   symbols-util.cc
+  system-state-info.cc
   static-asserts.cc
   summary-util.cc
   table-printer.cc
@@ -144,6 +145,7 @@ ADD_BE_LSAN_TEST(runtime-profile-test)
 ADD_BE_LSAN_TEST(string-parser-test)
 ADD_BE_LSAN_TEST(string-util-test)
 ADD_BE_LSAN_TEST(symbols-util-test)
+ADD_BE_LSAN_TEST(system-state-info-test)
 ADD_BE_LSAN_TEST(sys-info-test)
 ADD_BE_LSAN_TEST(thread-pool-test)
 ADD_BE_LSAN_TEST(time-test)
diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc
index 098e683..8bef6aa 100644
--- a/be/src/util/periodic-counter-updater.cc
+++ b/be/src/util/periodic-counter-updater.cc
@@ -26,12 +26,12 @@ namespace posix_time = boost::posix_time;
 using boost::get_system_time;
 using boost::system_time;
 
-namespace impala {
-
 // Period to update rate counters and sampling counters in ms.
 DEFINE_int32(periodic_counter_update_period_ms, 500, "Period to update rate counters and"
     " sampling counters in ms");
 
+namespace impala {
+
 PeriodicCounterUpdater* PeriodicCounterUpdater::instance_ = nullptr;
 
 void PeriodicCounterUpdater::Init() {
@@ -42,9 +42,14 @@ void PeriodicCounterUpdater::Init() {
       new thread(&PeriodicCounterUpdater::UpdateLoop, instance_));
 }
 
+void PeriodicCounterUpdater::RegisterUpdateFunction(UpdateFn update_fn) {
+  lock_guard<SpinLock> l(instance_->update_fns_lock_);
+  instance_->update_fns_.push_back(update_fn);
+}
+
 void PeriodicCounterUpdater::RegisterPeriodicCounter(
     RuntimeProfile::Counter* src_counter,
-    RuntimeProfile::DerivedCounterFunction sample_fn,
+    RuntimeProfile::SampleFunction sample_fn,
     RuntimeProfile::Counter* dst_counter, PeriodicCounterType type) {
   DCHECK(src_counter == NULL || sample_fn == NULL);
 
@@ -133,6 +138,11 @@ void PeriodicCounterUpdater::UpdateLoop() {
     int elapsed_ms = elapsed.total_milliseconds();
 
     {
+      lock_guard<SpinLock> l(update_fns_lock_);
+      for (UpdateFn& f : update_fns_) f();
+    }
+
+    {
       lock_guard<SpinLock> ratelock(instance_->rate_lock_);
       for (RateCounterMap::iterator it = rate_counters_.begin();
            it != rate_counters_.end(); ++it) {
diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h
index 762f372..3c63d6d 100644
--- a/be/src/util/periodic-counter-updater.h
+++ b/be/src/util/periodic-counter-updater.h
@@ -34,7 +34,9 @@ namespace impala {
 /// metric (e.g. memory used) at regular intervals. The samples can be summarized in
 /// a few ways (e.g. averaged, stored as histogram, kept as a time series data, etc).
 /// This class has one thread that will wake up at a regular period and update all
-/// the registered counters.
+/// the registered counters. Optionally, users can register functions to be called before
+/// counters get updated, for example to update global metrics that the counters then
+/// pull from.
 /// Typically, the counter updates should be stopped as early as possible to prevent
 /// future stale samples from polluting the useful values.
 class PeriodicCounterUpdater {
@@ -44,17 +46,23 @@ class PeriodicCounterUpdater {
     SAMPLING_COUNTER,
   };
 
-  // Sets up data structures and starts the counter update thread. Should only be called
-  // once during process startup and must be called before other methods.
+  /// Sets up data structures and starts the counter update thread. Should only be called
+  /// once during process startup and must be called before other methods.
   static void Init();
 
+  typedef std::function<void()> UpdateFn;
+  /// Registers an update function that will be called before individual counters will be
+  /// updated. This can be used to update some global metric once before reading it
+  /// through individual counters.
+  static void RegisterUpdateFunction(UpdateFn update_fn);
+
   /// Registers a periodic counter to be updated by the update thread.
   /// Either sample_fn or dst_counter must be non-NULL.  When the periodic counter
   /// is updated, it either gets the value from the dst_counter or calls the sample
   /// function to get the value.
   /// dst_counter/sample fn is assumed to be compatible types with src_counter.
   static void RegisterPeriodicCounter(RuntimeProfile::Counter* src_counter,
-      RuntimeProfile::DerivedCounterFunction sample_fn,
+      RuntimeProfile::SampleFunction sample_fn,
       RuntimeProfile::Counter* dst_counter, PeriodicCounterType type);
 
   /// Adds a bucketing counter to be updated at regular intervals.
@@ -82,13 +90,13 @@ class PeriodicCounterUpdater {
  private:
   struct RateCounterInfo {
     RuntimeProfile::Counter* src_counter;
-    RuntimeProfile::DerivedCounterFunction sample_fn;
+    RuntimeProfile::SampleFunction sample_fn;
     int64_t elapsed_ms;
   };
 
   struct SamplingCounterInfo {
     RuntimeProfile::Counter* src_counter; // the counter to be sampled
-    RuntimeProfile::DerivedCounterFunction sample_fn;
+    RuntimeProfile::SampleFunction sample_fn;
     int64_t total_sampled_value; // sum of all sampled values;
     int64_t num_sampled; // number of samples taken
   };
@@ -106,6 +114,12 @@ class PeriodicCounterUpdater {
   /// Thread performing asynchronous updates.
   boost::scoped_ptr<boost::thread> update_thread_;
 
+  /// List of functions that will be called before individual counters will be sampled.
+  std::vector<UpdateFn> update_fns_;
+
+  /// Spinlock that protects the list of update functions (and their execution).
+  SpinLock update_fns_lock_;
+
   /// Spinlock that protects the map of rate counters
   SpinLock rate_lock_;
 
diff --git a/be/src/util/pretty-printer.h b/be/src/util/pretty-printer.h
index 6226eba..c44bc26 100644
--- a/be/src/util/pretty-printer.h
+++ b/be/src/util/pretty-printer.h
@@ -20,8 +20,9 @@
 
 #include <boost/algorithm/string.hpp>
 #include <cmath>
-#include <sstream>
 #include <iomanip>
+#include <limits>
+#include <sstream>
 
 #include "gen-cpp/RuntimeProfile_types.h"
 #include "util/cpu-info.h"
@@ -135,6 +136,13 @@ class PrettyPrinter {
         break;
       }
 
+      // Printed as integer values
+      case TUnit::BASIS_POINTS: {
+        DCHECK_LE(value, 10000);
+        ss << (value / 100);
+        break;
+      }
+
       default:
         DCHECK(false) << "Unsupported TUnit: " << value;
         break;
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 1d3d748..a9b3875 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #ifndef IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
 #define IMPALA_UTIL_RUNTIME_PROFILE_COUNTERS_H
 
@@ -33,6 +32,10 @@
 
 namespace impala {
 
+/// This file contains the declarations of various counters that can be used in runtime
+/// profiles. See the class-level comment for RuntimeProfile (runtime-profile.h) for an
+/// overview of what there is. When making changes, please also update that comment.
+
 /// Define macros for updating counters.  The macros make it very easy to disable
 /// all counters at compile time.  Set this to 0 to remove counters.  This is useful
 /// to do to make sure the counters aren't affecting the system.
@@ -45,7 +48,7 @@ namespace impala {
 #if ENABLE_COUNTERS
   #define ADD_COUNTER(profile, name, unit) (profile)->AddCounter(name, unit)
   #define ADD_TIME_SERIES_COUNTER(profile, name, src_counter) \
-      (profile)->AddTimeSeriesCounter(name, src_counter)
+      (profile)->AddSamplingTimeSeriesCounter(name, src_counter)
   #define ADD_TIMER(profile, name) (profile)->AddCounter(name, TUnit::TIME_NS)
   #define ADD_SUMMARY_STATS_TIMER(profile, name) \
       (profile)->AddSummaryStatsCounter(name, TUnit::TIME_NS)
@@ -142,7 +145,7 @@ class RuntimeProfile::HighWaterMarkCounter : public RuntimeProfile::Counter {
 /// Do not call Set() and Add().
 class RuntimeProfile::DerivedCounter : public RuntimeProfile::Counter {
  public:
-  DerivedCounter(TUnit::type unit, const DerivedCounterFunction& counter_fn)
+  DerivedCounter(TUnit::type unit, const SampleFunction& counter_fn)
     : Counter(unit),
       counter_fn_(counter_fn) {}
 
@@ -151,7 +154,7 @@ class RuntimeProfile::DerivedCounter : public RuntimeProfile::Counter {
   }
 
  private:
-  DerivedCounterFunction counter_fn_;
+  SampleFunction counter_fn_;
 };
 
 /// An AveragedCounter maintains a set of counters and its value is the
@@ -384,41 +387,125 @@ class RuntimeProfile::EventSequence {
   int64_t offset_ = 0;
 };
 
-typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
+/// Abstract base for counters to capture a time series of values. Users can add samples
+/// to counters in periodic intervals, and the RuntimeProfile class will retrieve them by
+/// accessing the private interface. Methods are thread-safe where explicitly stated.
 class RuntimeProfile::TimeSeriesCounter {
  public:
-  std::string DebugString() const;
+  // Adds a sample. Thread-safe.
+  void AddSample(int ms_elapsed);
 
-  void AddSample(int ms_elapsed) {
-    int64_t sample = sample_fn_();
-    samples_.AddSample(sample, ms_elapsed);
+  // Returns a pointer do the sample data together with the number of samples and the
+  // sampling period. This method is not thread-safe and must only be used in tests.
+  const int64_t* GetSamplesTest(int* num_samples, int* period) {
+    return GetSamplesLockedForSend(num_samples, period);
   }
 
+  virtual ~TimeSeriesCounter() {}
+
  private:
   friend class RuntimeProfile;
 
-  TimeSeriesCounter(const std::string& name, TUnit::type unit,
-      DerivedCounterFunction fn)
-    : name_(name), unit_(unit), sample_fn_(fn) {
-  }
+  void ToThrift(TTimeSeriesCounter* counter);
 
-  /// Construct a time series object from existing sample data. This counter
-  /// is then read-only (i.e. there is no sample function).
-  TimeSeriesCounter(const std::string& name, TUnit::type unit, int period,
-      const std::vector<int64_t>& values)
-    : name_(name), unit_(unit), sample_fn_(), samples_(period, values) {
-  }
+  /// Adds a sample to the counter. Caller must hold lock_.
+  virtual void AddSampleLocked(int64_t value, int ms_elapsed) = 0;
 
-  void ToThrift(TTimeSeriesCounter* counter);
+  /// Returns a pointer to memory containing all samples of the counter. The caller must
+  /// hold lock_. The returned pointer is only valid while the caller holds lock_.
+  virtual const int64_t* GetSamplesLocked(int* num_samples, int* period) const = 0;
+
+  /// Returns a pointer to memory containing all samples of the counter and marks the
+  /// samples as retrieved, so that a subsequent call to Clear() can remove them. The
+  /// caller must hold lock_. The returned pointer is only valid while the caller holds
+  /// lock_.
+  virtual const int64_t* GetSamplesLockedForSend(int* num_samples, int* period);
+
+  /// Sets all internal samples. Thread-safe. Not implemented by all child classes. The
+  /// caller must make sure that this is only called on supported classes.
+  virtual void SetSamples(
+      int period, const std::vector<int64_t>& samples, int64_t start_idx);
+
+  /// Implemented by some child classes to clear internal sample buffers. No-op on other
+  /// child classes.
+  virtual void Clear() {}
+
+ protected:
+  TimeSeriesCounter(const std::string& name, TUnit::type unit,
+      SampleFunction fn = SampleFunction())
+    : name_(name), unit_(unit), sample_fn_(fn) {}
+
+  TUnit::type unit() const { return unit_; }
 
   std::string name_;
   TUnit::type unit_;
-  DerivedCounterFunction sample_fn_;
+  SampleFunction sample_fn_;
+  /// The number of samples that have been retrieved and cleared from this counter.
+  int64_t previous_sample_count_ = 0;
+  mutable SpinLock lock_;
+};
+
+typedef StreamingSampler<int64_t, 64> StreamingCounterSampler;
+class RuntimeProfile::SamplingTimeSeriesCounter
+    : public RuntimeProfile::TimeSeriesCounter {
+ private:
+  friend class RuntimeProfile;
+
+  SamplingTimeSeriesCounter(
+      const std::string& name, TUnit::type unit, SampleFunction fn)
+    : TimeSeriesCounter(name, unit, fn) {}
+
+  virtual void AddSampleLocked(int64_t sample, int ms_elapsed) override;
+  virtual const int64_t* GetSamplesLocked( int* num_samples, int* period) const override;
+
   StreamingCounterSampler samples_;
 };
 
-/// Counter whose value comes from an internal ConcurrentStopWatch to track multiple threads
-/// concurrent running time.
+/// Time series counter that supports piece-wise transmission of its samples.
+///
+/// This time series counter will capture samples into an internal unbounded buffer.
+/// The buffer can be reset to clear out values that have already been transmitted
+/// elsewhere.
+class RuntimeProfile::ChunkedTimeSeriesCounter
+    : public RuntimeProfile::TimeSeriesCounter {
+ public:
+  /// Clears the internal sample buffer and updates the number of samples that the counter
+  /// has seen in total so far.
+  virtual void Clear() override;
+
+ private:
+  friend class RuntimeProfile;
+
+  /// Constructs a time series counter that uses 'fn' to generate new samples. It's size
+  /// is bounded by the expected number of samples per status update times a constant
+  /// factor.
+  ChunkedTimeSeriesCounter(
+      const std::string& name, TUnit::type unit, SampleFunction fn);
+
+  /// Constructs a time series object from existing sample data. This counter is then
+  /// read-only (i.e. there is no sample function). This counter has no maximum size.
+  ChunkedTimeSeriesCounter(const std::string& name, TUnit::type unit, int period,
+      const std::vector<int64_t>& values)
+    : TimeSeriesCounter(name, unit), period_(period), values_(values), max_size_(0) {}
+
+  virtual void AddSampleLocked(int64_t value, int ms_elapsed) override;
+  virtual const int64_t* GetSamplesLocked(int* num_samples, int* period) const override;
+  virtual const int64_t* GetSamplesLockedForSend(int* num_samples, int* period) override;
+
+  virtual void SetSamples(
+      int period, const std::vector<int64_t>& samples, int64_t start_idx) override;
+
+  int period_ = 0;
+  std::vector<int64_t> values_;
+  // The number of values returned through the last call to GetSamplesLockedForSend().
+  int64_t last_get_count_ = 0;
+  // The maximum number of samples that can be stored in this counter. We drop samples at
+  // the front before appending new ones if we would exceed this count.
+  int64_t max_size_;
+};
+
+/// Counter whose value comes from an internal ConcurrentStopWatch to track concurrent
+/// running time for multiple threads.
 class RuntimeProfile::ConcurrentTimerCounter : public Counter {
  public:
   ConcurrentTimerCounter(TUnit::type unit) : Counter(unit) {}
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index 73553d2..87acb69 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -29,6 +29,9 @@
 
 #include "common/names.h"
 
+DECLARE_int32(status_report_interval_ms);
+DECLARE_int32(periodic_counter_update_period_ms);
+
 namespace impala {
 
 TEST(CountersTest, Basic) {
@@ -966,6 +969,178 @@ TEST(TimerCounterTest, CountersTestRandom) {
   ValidateLapTime(&tester, MonotonicStopWatch::Now() - lap_time_start);
 }
 
+
+TEST(TimeSeriesCounterTest, TestAddClearRace) {
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+  int i = 0;
+  // Return and increment i
+  auto f = [&i]() { return i++; };
+  RuntimeProfile::TimeSeriesCounter* counter =
+      profile->AddChunkedTimeSeriesCounter("Counter", TUnit::UNIT, f);
+  // Sleep 1 second for some values to accumulate.
+  sleep(1);
+  int num_samples, period;
+  counter->GetSamplesTest(&num_samples, &period);
+  EXPECT_GT(num_samples, 0);
+
+  // Wait for more values to show up
+  sleep(1);
+
+  // Stop the counters. The rest of the test assumes that no new values will be added.
+  profile->StopPeriodicCounters();
+
+  // Clear the counter
+  profile->ClearChunkedTimeSeriesCounters();
+
+  // Check that clearing multiple times doesn't affect valued that have not been
+  // retrieved.
+  profile->ClearChunkedTimeSeriesCounters();
+
+  // Make sure that it still has values in it.
+  counter->GetSamplesTest(&num_samples, &period);
+  EXPECT_GT(num_samples, 0);
+
+  // Clear it again
+  profile->ClearChunkedTimeSeriesCounters();
+
+  // Make sure the values are gone.
+  counter->GetSamplesTest(&num_samples, &period);
+  EXPECT_EQ(num_samples, 0);
+}
+
+/// Stops the periodic counter updater in 'profile' and then clears the samples in
+/// 'counter'.
+void StopAndClearCounter(RuntimeProfile* profile,
+    RuntimeProfile::TimeSeriesCounter* counter) {
+  // There's a race between adding the counter and calling StopPeriodicCounters so we
+  // sleep here to make sure we exercise the code that handles the race.
+  sleep(1);
+  profile->StopPeriodicCounters();
+
+  // Reset the counter state by reading and clearing its samples.
+  int num_samples = 0;
+  int result_period_unused = 0;
+  counter->GetSamplesTest(&num_samples, &result_period_unused);
+  ASSERT_GT(num_samples, 0);
+  profile->ClearChunkedTimeSeriesCounters();
+  // Ensure clean state.
+  counter->GetSamplesTest(&num_samples, &result_period_unused);
+  ASSERT_EQ(num_samples, 0);
+}
+
+/// Tests that ChunkedTimeSeriesCounters are bounded by a maximum size.
+TEST(TimeSeriesCounterTest, TestMaximumSize) {
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+
+  const int test_period = FLAGS_periodic_counter_update_period_ms;
+
+  // Add a counter with a sample function that counts up, starting from 0.
+  int value = 0;
+  auto sample_fn = [&value]() { return value++; };
+  RuntimeProfile::TimeSeriesCounter* counter =
+      profile->AddChunkedTimeSeriesCounter("TestCounter", TUnit::UNIT, sample_fn);
+
+  // Stop counter updates from interfering with the rest of the test.
+  StopAndClearCounter(profile, counter);
+
+  // Reset value after previous values have been retrieved.
+  value = 0;
+
+  int64_t max_size = 10 * FLAGS_status_report_interval_ms / test_period;
+  for (int i = 0; i < 10 + max_size; ++i) counter->AddSample(test_period);
+
+  int num_samples = 0;
+  int result_period = 0;
+  // Retrieve and validate samples.
+  const int64_t* samples = counter->GetSamplesTest(&num_samples, &result_period);
+  ASSERT_EQ(num_samples, max_size);
+  // No resampling happens with ChunkedTimeSeriesCounter.
+  ASSERT_EQ(result_period, test_period);
+
+  // First 10 samples have been truncated
+  ASSERT_EQ(samples[0], 10);
+}
+
+/// Test parameter class that helps to test time series resampling during profile pretty
+/// printing with a varying number of test samples.
+struct TimeSeriesTestParam {
+  TimeSeriesTestParam(int num_samples, vector<const char*> expected)
+    : num_samples(num_samples), expected(expected) {}
+  int num_samples;
+  vector<const char*> expected;
+
+  // Used by gtest to print values of this struct
+  friend std::ostream& operator<<(std::ostream& os, const TimeSeriesTestParam& p) {
+    return os << "num_samples: " << p.num_samples << endl;
+  }
+};
+
+class TimeSeriesCounterResampleTest : public testing::TestWithParam<TimeSeriesTestParam> {
+};
+
+/// Tests that pretty-printing a ChunkedTimeSeriesCounter limits the number or printed
+/// samples to 64 or lower.
+TEST_P(TimeSeriesCounterResampleTest, TestPrettyPrint) {
+  ObjectPool pool;
+  RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile");
+
+  const TimeSeriesTestParam& param = GetParam();
+  const int test_period = FLAGS_periodic_counter_update_period_ms;
+
+  // Add a counter with a sample function that counts up, starting from 0.
+  int value = 0;
+  auto sample_fn = [&value]() { return value++; };
+  // We increase the value of this flag to allow the counter to store enough samples.
+  FLAGS_status_report_interval_ms = 50000;
+  RuntimeProfile::TimeSeriesCounter* counter =
+      profile->AddChunkedTimeSeriesCounter("TestCounter", TUnit::UNIT, sample_fn);
+
+  // Stop counter updates from interfering with the rest of the test.
+  StopAndClearCounter(profile, counter);
+
+  // Reset value after previous values have been retrieved.
+  value = 0;
+  for (int i = 0; i < param.num_samples; ++i) counter->AddSample(test_period);
+
+  int num_samples = 0;
+  int result_period = 0;
+  // Retrieve and validate samples.
+  const int64_t* samples = counter->GetSamplesTest(&num_samples, &result_period);
+  ASSERT_EQ(num_samples, param.num_samples);
+  // No resampling happens with ChunkedTimeSeriesCounter.
+  ASSERT_EQ(result_period, test_period);
+
+  for (int i = 0; i < param.num_samples; ++i) ASSERT_EQ(samples[i], i);
+
+  stringstream pretty;
+  profile->PrettyPrint(&pretty);
+  const string pretty_str = pretty.str();
+
+  for (const char* e : param.expected) EXPECT_STR_CONTAINS(pretty_str, e);
 }
 
+INSTANTIATE_TEST_CASE_P(VariousNumbers, TimeSeriesCounterResampleTest,
+    ::testing::Values(
+    TimeSeriesTestParam(64, {"TestCounter (500.000ms): 0, 1, 2, 3", "61, 62, 63"}),
+
+    TimeSeriesTestParam(65, {"TestCounter (1s000ms): 0, 2, 4, 6,",
+    "60, 62, 64 (Showing 33 of 65 values from Thrift Profile)"}),
+
+    TimeSeriesTestParam(80, {"TestCounter (1s000ms): 0, 2, 4, 6,",
+    "74, 76, 78 (Showing 40 of 80 values from Thrift Profile)"}),
+
+    TimeSeriesTestParam(127, {"TestCounter (1s000ms): 0, 2, 4, 6,",
+    "122, 124, 126 (Showing 64 of 127 values from Thrift Profile)"}),
+
+    TimeSeriesTestParam(128, {"TestCounter (1s000ms): 0, 2, 4, 6,",
+    "122, 124, 126 (Showing 64 of 128 values from Thrift Profile)"}),
+
+    TimeSeriesTestParam(129, {"TestCounter (1s500ms): 0, 3, 6, 9,",
+    "120, 123, 126 (Showing 43 of 129 values from Thrift Profile)"})
+    ));
+
+} // namespace impala
+
 IMPALA_TEST_MAIN();
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 5696486..ec7417b 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -28,6 +28,7 @@
 
 #include "common/object-pool.h"
 #include "gutil/strings/strip.h"
+#include "kudu/util/logging.h"
 #include "rpc/thrift-util.h"
 #include "runtime/mem-tracker.h"
 #include "util/coding-util.h"
@@ -42,6 +43,9 @@
 
 #include "common/names.h"
 
+DECLARE_int32(status_report_interval_ms);
+DECLARE_int32(periodic_counter_update_period_ms);
+
 namespace impala {
 
 // Thread counters name
@@ -138,8 +142,10 @@ RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
 
   if (node.__isset.time_series_counters) {
     for (const TTimeSeriesCounter& val: node.time_series_counters) {
-      profile->time_series_counter_map_[val.name] =
-          pool->Add(new TimeSeriesCounter(val.name, val.unit, val.period_ms, val.values));
+      // Capture all incoming time series counters with the same type since re-sampling
+      // will have happened on the sender side.
+      profile->time_series_counter_map_[val.name] = pool->Add(
+          new ChunkedTimeSeriesCounter(val.name, val.unit, val.period_ms, val.values));
     }
   }
 
@@ -310,10 +316,13 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx)
       const TTimeSeriesCounter& c = node.time_series_counters[i];
       TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(c.name);
       if (it == time_series_counter_map_.end()) {
-        time_series_counter_map_[c.name] =
-            pool_->Add(new TimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
+        // Capture all incoming time series counters with the same type since re-sampling
+        // will have happened on the sender side.
+        time_series_counter_map_[c.name] = pool_->Add(
+            new ChunkedTimeSeriesCounter(c.name, c.unit, c.period_ms, c.values));
       } else {
-        it->second->samples_.SetSamples(c.period_ms, c.values);
+        int64_t start_idx = c.__isset.start_index ? c.start_index : 0;
+        it->second->SetSamples(c.period_ms, c.values, start_idx);
       }
     }
   }
@@ -618,7 +627,7 @@ ADD_COUNTER_IMPL(AddConcurrentTimerCounter, ConcurrentTimerCounter);
 
 RuntimeProfile::DerivedCounter* RuntimeProfile::AddDerivedCounter(
     const string& name, TUnit::type unit,
-    const DerivedCounterFunction& counter_fn, const string& parent_counter_name) {
+    const SampleFunction& counter_fn, const string& parent_counter_name) {
   DCHECK_EQ(is_averaged_profile_, false);
   lock_guard<SpinLock> l(counter_map_lock_);
   if (counter_map_.find(name) != counter_map_.end()) return NULL;
@@ -645,7 +654,7 @@ RuntimeProfile::ThreadCounters* RuntimeProfile::AddThreadCounters(
   return counter;
 }
 
-void RuntimeProfile::AddLocalTimeCounter(const DerivedCounterFunction& counter_fn) {
+void RuntimeProfile::AddLocalTimeCounter(const SampleFunction& counter_fn) {
   DerivedCounter* local_time_counter = pool_->Add(
       new DerivedCounter(TUnit::TIME_NS, counter_fn));
   lock_guard<SpinLock> l(counter_map_lock_);
@@ -768,22 +777,29 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
   {
     // Print all time series counters as following:
     //    - <Name> (<period>): <val1>, <val2>, <etc>
-    SpinLock* lock;
-    int num, period;
     lock_guard<SpinLock> l(counter_map_lock_);
     for (const TimeSeriesCounterMap::value_type& v: time_series_counter_map_) {
-      const int64_t* samples = v.second->samples_.GetSamples(&num, &period, &lock);
+      const TimeSeriesCounter* counter = v.second;
+      lock_guard<SpinLock> l(counter->lock_);
+      int num, period;
+      const int64_t* samples = counter->GetSamplesLocked(&num, &period);
       if (num > 0) {
-        stream << prefix << "   - " << v.first << "("
-               << PrettyPrinter::Print(period * 1000000L, TUnit::TIME_NS)
-               << "): ";
-        for (int i = 0; i < num; ++i) {
-          stream << PrettyPrinter::Print(samples[i], v.second->unit_);
-          if (i != num - 1) stream << ", ";
+        // Clamp number of printed values at 64, the maximum number of values in the
+        // SamplingTimeSeriesCounter.
+        int step = 1 + (num - 1) / 64;
+        period *= step;
+        stream << prefix << "   - " << v.first << " ("
+               << PrettyPrinter::Print(period * 1000000L, TUnit::TIME_NS) << "): ";
+        for (int i = 0; i < num; i += step) {
+          stream << PrettyPrinter::Print(samples[i], counter->unit());
+          if (i + step < num) stream << ", ";
+        }
+        if (step > 1) {
+          stream << " (Showing " << ((num + 1) / step) << " of " << num << " values from "
+            "Thrift Profile)";
         }
         stream << endl;
       }
-      lock->unlock();
     }
   }
 
@@ -1068,7 +1084,7 @@ RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
 }
 
 RuntimeProfile::Counter* RuntimeProfile::AddRateCounter(
-    const string& name, DerivedCounterFunction fn, TUnit::type dst_unit) {
+    const string& name, SampleFunction fn, TUnit::type dst_unit) {
   lock_guard<SpinLock> l(counter_map_lock_);
   bool created;
   Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created);
@@ -1095,7 +1111,7 @@ RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
 }
 
 RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter(
-    const string& name, DerivedCounterFunction sample_fn) {
+    const string& name, SampleFunction sample_fn) {
   lock_guard<SpinLock> l(counter_map_lock_);
   bool created;
   Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created);
@@ -1173,44 +1189,153 @@ RuntimeProfile::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter(
   return counter;
 }
 
-RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddTimeSeriesCounter(
-    const string& name, TUnit::type unit, DerivedCounterFunction fn) {
+RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
+    const string& name, TUnit::type unit, SampleFunction fn) {
   DCHECK(fn != nullptr);
   lock_guard<SpinLock> l(counter_map_lock_);
   TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
   if (it != time_series_counter_map_.end()) return it->second;
-  TimeSeriesCounter* counter = pool_->Add(new TimeSeriesCounter(name, unit, fn));
+  TimeSeriesCounter* counter = pool_->Add(new SamplingTimeSeriesCounter(name, unit, fn));
   time_series_counter_map_[name] = counter;
   PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
   has_active_periodic_counters_ = true;
   return counter;
 }
 
-RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddTimeSeriesCounter(
+RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddSamplingTimeSeriesCounter(
     const string& name, Counter* src_counter) {
   DCHECK(src_counter != NULL);
-  return AddTimeSeriesCounter(name, src_counter->unit(),
+  return AddSamplingTimeSeriesCounter(name, src_counter->unit(),
       bind(&Counter::value, src_counter));
 }
 
-void RuntimeProfile::TimeSeriesCounter::ToThrift(TTimeSeriesCounter* counter) {
-  counter->name = name_;
-  counter->unit = unit_;
+void RuntimeProfile::TimeSeriesCounter::AddSample(int ms_elapsed) {
+  lock_guard<SpinLock> l(lock_);
+  int64_t sample = sample_fn_();
+  AddSampleLocked(sample, ms_elapsed);
+}
+
+const int64_t* RuntimeProfile::TimeSeriesCounter::GetSamplesLockedForSend(
+    int* num_samples, int* period) {
+  return GetSamplesLocked(num_samples, period);
+}
+
+void RuntimeProfile::TimeSeriesCounter::SetSamples(
+      int period, const std::vector<int64_t>& samples, int64_t start_idx) {
+  DCHECK(false);
+}
+
+void RuntimeProfile::SamplingTimeSeriesCounter::AddSampleLocked(
+    int64_t sample, int ms_elapsed){
+  samples_.AddSample(sample, ms_elapsed);
+}
+
+const int64_t* RuntimeProfile::SamplingTimeSeriesCounter::GetSamplesLocked(
+    int* num_samples, int* period) const {
+  return samples_.GetSamples(num_samples, period);
+}
+
+RuntimeProfile::ChunkedTimeSeriesCounter::ChunkedTimeSeriesCounter(
+    const string& name, TUnit::type unit, SampleFunction fn)
+  : TimeSeriesCounter(name, unit, fn)
+  , period_(FLAGS_periodic_counter_update_period_ms)
+  , max_size_(10 * FLAGS_status_report_interval_ms / period_) {}
+
+void RuntimeProfile::ChunkedTimeSeriesCounter::Clear() {
+  lock_guard<SpinLock> l(lock_);
+  previous_sample_count_ += last_get_count_;
+  values_.erase(values_.begin(), values_.begin() + last_get_count_);
+  last_get_count_ = 0;
+}
+
+void RuntimeProfile::ChunkedTimeSeriesCounter::AddSampleLocked(
+    int64_t sample, int ms_elapsed) {
+  // We chose inefficiently erasing elements from a vector over using a std::deque because
+  // this should only happen very infrequently and we rely on contiguous storage in
+  // GetSamplesLocked*().
+  if (max_size_ > 0 && values_.size() == max_size_) {
+    KLOG_EVERY_N_SECS(WARNING, 60) << "ChunkedTimeSeriesCounter reached maximum size";
+    values_.erase(values_.begin(), values_.begin() + 1);
+  }
+  DCHECK_LT(values_.size(), max_size_);
+  values_.push_back(sample);
+}
+
+const int64_t* RuntimeProfile::ChunkedTimeSeriesCounter::GetSamplesLocked(
+    int* num_samples, int* period) const {
+  DCHECK(num_samples != nullptr);
+  DCHECK(period != nullptr);
+  *num_samples = values_.size();
+  *period = period_;
+  return values_.data();
+}
+
+const int64_t* RuntimeProfile::ChunkedTimeSeriesCounter::GetSamplesLockedForSend(
+    int* num_samples, int* period) {
+  last_get_count_ = values_.size();
+  return GetSamplesLocked(num_samples, period);
+}
+
+void RuntimeProfile::ChunkedTimeSeriesCounter::SetSamples(
+    int period, const std::vector<int64_t>& samples, int64_t start_idx) {
+  lock_guard<SpinLock> l(lock_);
+  if (start_idx == 0) {
+    // This could be coming from a SamplingTimeSeriesCounter or another
+    // ChunkedTimeSeriesCounter.
+    period_ = period;
+    values_ = samples;
+    return;
+  }
+  // Only ChunkedTimeSeriesCounter will set start_idx > 0.
+  DCHECK_GT(start_idx, 0);
+  DCHECK_EQ(period_, period);
+  if (values_.size() < start_idx) {
+    // Fill up with 0.
+    values_.resize(start_idx);
+  }
+  DCHECK_GE(values_.size(), start_idx);
+  // Skip values we already received.
+  auto start_it = samples.begin() + values_.size() - start_idx;
+  values_.insert(values_.end(), start_it, samples.end());
+}
 
+RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddChunkedTimeSeriesCounter(
+    const string& name, TUnit::type unit, SampleFunction fn) {
+  DCHECK(fn != nullptr);
+  lock_guard<SpinLock> l(counter_map_lock_);
+  TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name);
+  if (it != time_series_counter_map_.end()) return it->second;
+  TimeSeriesCounter* counter = pool_->Add(new ChunkedTimeSeriesCounter(name, unit, fn));
+  time_series_counter_map_[name] = counter;
+  PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter);
+  has_active_periodic_counters_ = true;
+  return counter;
+}
+
+void RuntimeProfile::ClearChunkedTimeSeriesCounters() {
+  {
+    lock_guard<SpinLock> l(counter_map_lock_);
+    for (auto& it : time_series_counter_map_) it.second->Clear();
+  }
+  {
+    lock_guard<SpinLock> l(children_lock_);
+    for (int i = 0; i < children_.size(); ++i) {
+      children_[i].first->ClearChunkedTimeSeriesCounters();
+    }
+  }
+}
+
+void RuntimeProfile::TimeSeriesCounter::ToThrift(TTimeSeriesCounter* counter) {
+  lock_guard<SpinLock> l(lock_);
   int num, period;
-  SpinLock* lock;
-  const int64_t* samples = samples_.GetSamples(&num, &period, &lock);
+  const int64_t* samples = GetSamplesLockedForSend(&num, &period);
   counter->values.resize(num);
   Ubsan::MemCpy(counter->values.data(), samples, num * sizeof(int64_t));
-  lock->unlock();
-  counter->period_ms = period;
-}
 
-string RuntimeProfile::TimeSeriesCounter::DebugString() const {
-  stringstream ss;
-  ss << "Counter=" << name_ << endl
-     << samples_.DebugString();
-  return ss.str();
+  counter->name = name_;
+  counter->unit = unit_;
+  counter->period_ms = period;
+  counter->__set_start_index(previous_sample_count_);
 }
 
 void RuntimeProfile::EventSequence::ToThrift(TEventSequence* seq) {
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 8bba192..9203785 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -40,6 +40,43 @@ class ObjectPool;
 /// single thread per process that will convert an amount (i.e. bytes) counter to a
 /// corresponding rate based counter.  This thread wakes up at fixed intervals and updates
 /// all of the rate counters.
+///
+/// Runtime profile counters can be of several types. See their definition in
+/// runtime-profile-counters.h for more details.
+///
+/// - Counter: Tracks a single value or bitmap. Also serves as the base class for several
+///   |   other counters.
+///   |
+///   - AveragedCounter: Maintains a set of child counters. Its current value is the
+///   |     average of the current values of its children.
+///   |
+///   - ConcurrentTimerCounter: Wraps a ConcurrentStopWatch to track concurrent running
+///   |     time for multiple threads.
+///   |
+///   - DerivedCounter: Computes its current value by calling a function passed during
+///   |     construction.
+///   |
+///   - HighWaterMarkCounter: Keeps track of the highest value seen so far.
+///   |
+///   - SummaryStatsCounter: Keeps track of minimum, maximum, and average value of all
+///         values seen so far.
+///
+/// - EventSequence: Captures a sequence of events, each added by calling MarkEvent().
+///       Events have a text label and a time, relative to when the sequence was started.
+///
+/// - ThreadCounters: Tracks thread runtime information, such as total time, user time,
+///       sys time.
+///
+/// - TimeSeriesCounter (abstract): Keeps track of a value over time. Has two
+///   |   implementations.
+///   |
+///   - SamplingTimeSeriesCounter: Maintains a fixed array of 64 values and resamples if
+///   |     more value than that are added.
+///   |
+///   - ChunkedTimeSeriesCounter: Maintains an unbounded vector of values. Supports
+///         clearing its values after they have been retrieved, and will track the number
+///         of previously retrieved values.
+///
 /// All methods are thread-safe unless otherwise mentioned.
 class RuntimeProfile { // NOLINT: This struct is not packed, but there are not so many
                        // of them that it makes a performance difference
@@ -93,13 +130,15 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   class AveragedCounter;
   class ConcurrentTimerCounter;
   class DerivedCounter;
-  class EventSequence;
   class HighWaterMarkCounter;
   class SummaryStatsCounter;
+  class EventSequence;
   class ThreadCounters;
   class TimeSeriesCounter;
+  class SamplingTimeSeriesCounter;
+  class ChunkedTimeSeriesCounter;
 
-  typedef boost::function<int64_t ()> DerivedCounterFunction;
+  typedef boost::function<int64_t ()> SampleFunction;
 
   /// Create a runtime profile object with 'name'. The profile, counters and any other
   /// structures owned by the profile are allocated from 'pool'.
@@ -182,7 +221,7 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// parent_counter_name.
   /// Returns NULL if the counter already exists.
   DerivedCounter* AddDerivedCounter(const std::string& name, TUnit::type unit,
-      const DerivedCounterFunction& counter_fn,
+      const SampleFunction& counter_fn,
       const std::string& parent_counter_name = "");
 
   /// Add a set of thread counters prefixed with 'prefix'. Returns a ThreadCounters object
@@ -191,7 +230,7 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
 
   // Add a derived counter to capture the local time. This function can be called at most
   // once.
-  void AddLocalTimeCounter(const DerivedCounterFunction& counter_fn);
+  void AddLocalTimeCounter(const SampleFunction& counter_fn);
 
   /// Gets the counter object with 'name'.  Returns NULL if there is no counter with
   /// that name.
@@ -332,7 +371,7 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
 
   /// Same as 'AddRateCounter' above except values are taken by calling fn.
   /// The resulting counter will be of 'unit'.
-  Counter* AddRateCounter(const std::string& name, DerivedCounterFunction fn,
+  Counter* AddRateCounter(const std::string& name, SampleFunction fn,
       TUnit::type unit);
 
   /// Add a sampling counter to the current profile based on src_counter with name.
@@ -345,7 +384,7 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   Counter* AddSamplingCounter(const std::string& name, Counter* src_counter);
 
   /// Same as 'AddSamplingCounter' above except the samples are taken by calling fn.
-  Counter* AddSamplingCounter(const std::string& name, DerivedCounterFunction fn);
+  Counter* AddSamplingCounter(const std::string& name, SampleFunction fn);
 
   /// Create a set of counters, one per bucket, to store the sampled value of src_counter.
   /// The 'src_counter' is sampled periodically to obtain the index of the bucket to
@@ -363,17 +402,30 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// PeriodicCounterUpdater::StopBucketingCounters() if 'buckets' stops changing.
   std::vector<Counter*>* AddBucketingCounters(Counter* src_counter, int num_buckets);
 
-  /// Create a time series counter. This begins sampling immediately. This counter
-  /// contains a number of samples that are collected periodically by calling sample_fn().
-  /// StopPeriodicCounters() must be called to stop the periodic updating before this
-  /// profile is destroyed. The periodic updating can be stopped earlier by calling
-  /// PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing.
+  /// Creates a sampling time series counter. This begins sampling immediately. This
+  /// counter contains a number of samples that are collected periodically by calling
+  /// sample_fn(). StopPeriodicCounters() must be called to stop the periodic updating
+  /// before this profile is destroyed. The periodic updating can be stopped earlier by
+  /// calling PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing.
   /// Note: these counters don't get merged (to make average profiles)
-  TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name,
-      TUnit::type unit, DerivedCounterFunction sample_fn);
+  TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name,
+      TUnit::type unit, SampleFunction sample_fn);
 
   /// Same as above except the samples are collected from 'src_counter'.
-  TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name, Counter* src_counter);
+  TimeSeriesCounter* AddSamplingTimeSeriesCounter(const std::string& name, Counter*
+      src_counter);
+
+  /// Adds a chunked time series counter to the profile. This begins sampling immediately.
+  /// This counter will collect new samples periodically by calling 'sample_fn()'. Samples
+  /// are not re-sampled into larger intervals, instead owners of this profile can call
+  /// ClearChunkedTimeSeriesCounters() to reset the sample buffers of all chunked time
+  /// series counters, e.g. after their current values have been transmitted to a remote
+  /// node for profile aggregation.
+  TimeSeriesCounter* AddChunkedTimeSeriesCounter(
+      const std::string& name, TUnit::type unit, SampleFunction sample_fn);
+
+  /// Clear all chunked time series counters in this profile and all children.
+  void ClearChunkedTimeSeriesCounters();
 
   /// Recursively compute the fraction of the 'total_time' spent in this profile and
   /// its children.
diff --git a/be/src/util/streaming-sampler.h b/be/src/util/streaming-sampler.h
index 027f330..d517d37 100644
--- a/be/src/util/streaming-sampler.h
+++ b/be/src/util/streaming-sampler.h
@@ -20,9 +20,7 @@
 
 #include <string.h>
 #include <iostream>
-#include <boost/thread/lock_guard.hpp>
 
-#include "util/spinlock.h"
 #include "util/ubsan.h"
 
 namespace impala {
@@ -33,9 +31,13 @@ namespace impala {
 /// are collapsed and the collection period is doubled.
 /// The input period and the streaming sampler period do not need to match, the
 /// streaming sampler will average values.
-/// T is the type of the sample and must be a native numerical type (e.g. int or float).
+/// T is the type of the sample and must be a native numerical type which fulfills
+/// std::is_arithmetic (e.g. int or float).
+///
+/// This class is not thread-safe.
 template<typename T, int MAX_SAMPLES>
 class StreamingSampler {
+  static_assert(std::is_arithmetic<T>::value, "Numerical type required");
  public:
   StreamingSampler(int initial_period = 500)
     : samples_collected_(0) ,
@@ -45,17 +47,6 @@ class StreamingSampler {
       current_sample_total_time_(0) {
   }
 
-  /// Initialize the sampler with values.
-  StreamingSampler(int period, const std::vector<T>& initial_samples)
-    : samples_collected_(initial_samples.size()),
-      period_(period),
-      current_sample_sum_(0),
-      current_sample_count_(0),
-      current_sample_total_time_(0) {
-    DCHECK_LE(samples_collected_, MAX_SAMPLES);
-    Ubsan::MemCpy(samples_, initial_samples.data(), sizeof(T) * samples_collected_);
-  }
-
   /// Add a sample to the sampler. 'ms' is the time elapsed since the last time this
   /// was called.
   /// The input value is accumulated into current_*. If the total time elapsed
@@ -65,7 +56,6 @@ class StreamingSampler {
   /// TODO: we can make this more complex by taking a weighted average of samples
   /// accumulated in a period.
   void AddSample(T sample, int ms) {
-    boost::lock_guard<SpinLock> l(lock_);
     ++current_sample_count_;
     current_sample_sum_ += sample;
     current_sample_total_time_ += ms;
@@ -87,49 +77,15 @@ class StreamingSampler {
     }
   }
 
-  /// Get the samples collected.  Returns the number of samples and
-  /// the period they were collected at.
-  /// If lock is non-null, the lock will be taken before returning. The caller
-  /// must unlock it.
-  const T* GetSamples(int* num_samples, int* period, SpinLock** lock = NULL) const {
-    if (lock != NULL) {
-      lock_.lock();
-      *lock = &lock_;
-    }
+  /// Get the samples collected.  Returns the number of samples and the period they were
+  /// collected at.
+  const T* GetSamples(int* num_samples, int* period) const {
     *num_samples = samples_collected_;
     *period = period_;
     return samples_;
   }
 
-  /// Set the underlying data to period/samples
-  void SetSamples(int period, const std::vector<T>& samples) {
-    DCHECK_LE(samples.size(), MAX_SAMPLES);
-
-    boost::lock_guard<SpinLock> l(lock_);
-    period_ = period;
-    samples_collected_ = samples.size();
-    Ubsan::MemCpy(samples_, samples.data(), sizeof(T) * samples_collected_);
-    current_sample_sum_ = 0;
-    current_sample_count_ = 0;
-    current_sample_total_time_ = 0;
-  }
-
-  std::string DebugString(const std::string& prefix="") const {
-    boost::lock_guard<SpinLock> l(lock_);
-    std::stringstream ss;
-    ss << prefix << "Period = " << period_ << std::endl
-       << prefix << "Num = " << samples_collected_ << std::endl
-       << prefix << "Samples = {";
-    for (int i = 0; i < samples_collected_; ++i) {
-      ss << samples_[i] << ", ";
-    }
-    ss << prefix << "}" << std::endl;
-    return ss.str();
-  }
-
  private:
-  mutable SpinLock lock_;
-
   /// Aggregated samples collected. Note: this is not all the input samples from
   /// AddSample(), as logically, those samples get resampled and aggregated.
   T samples_[MAX_SAMPLES];
diff --git a/be/src/util/system-state-info-test.cc b/be/src/util/system-state-info-test.cc
new file mode 100644
index 0000000..561b2d6
--- /dev/null
+++ b/be/src/util/system-state-info-test.cc
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/atomic.h"
+#include "testutil/gtest-util.h"
+#include "util/system-state-info.h"
+#include "util/time.h"
+
+#include <thread>
+
+namespace impala {
+
+class SystemStateInfoTest : public testing::Test {
+ protected:
+  SystemStateInfo info;
+};
+
+TEST_F(SystemStateInfoTest, FirstCallReturnsZero) {
+  const SystemStateInfo::CpuUsageRatios& r = info.GetCpuUsageRatios();
+  EXPECT_EQ(0, r.user + r.system + r.iowait);
+}
+
+// Smoke test to make sure that we read non-zero values from /proc/stat.
+TEST_F(SystemStateInfoTest, ReadProcStat) {
+  info.ReadCurrentProcStat();
+  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cur_val_idx_];
+  EXPECT_GT(state[SystemStateInfo::CPU_USER], 0);
+  EXPECT_GT(state[SystemStateInfo::CPU_SYSTEM], 0);
+  EXPECT_GT(state[SystemStateInfo::CPU_IDLE], 0);
+  EXPECT_GT(state[SystemStateInfo::CPU_IOWAIT], 0);
+}
+
+// Tests parsing a line similar to the first line of /proc/stat.
+TEST_F(SystemStateInfoTest, ParseProcStat) {
+  // Fields are: user nice system idle iowait irq softirq steal guest guest_nice
+  info.ReadProcStatString("cpu  20 30 10 70 100 0 0 0 0 0");
+  const SystemStateInfo::CpuValues& state = info.cpu_values_[info.cur_val_idx_];
+  EXPECT_EQ(state[SystemStateInfo::CPU_USER], 20);
+  EXPECT_EQ(state[SystemStateInfo::CPU_SYSTEM], 10);
+  EXPECT_EQ(state[SystemStateInfo::CPU_IDLE], 70);
+  EXPECT_EQ(state[SystemStateInfo::CPU_IOWAIT], 100);
+
+  // Test that values larger than INT_MAX parse without error.
+  info.ReadProcStatString("cpu  3000000000 30 10 70 100 0 0 0 0 0");
+  const SystemStateInfo::CpuValues& changed_state = info.cpu_values_[info.cur_val_idx_];
+  EXPECT_EQ(changed_state[SystemStateInfo::CPU_USER], 3000000000);
+}
+
+// Smoke test for the public interface.
+TEST_F(SystemStateInfoTest, GetCpuUsageRatios) {
+  AtomicBool running(true);
+  // Generate some load to observe counters > 0.
+  std::thread t([&running]() { while (running.Load()); });
+  for (int i = 0; i < 3; ++i) {
+    SleepForMs(200);
+    info.CaptureSystemStateSnapshot();
+    const SystemStateInfo::CpuUsageRatios& r = info.GetCpuUsageRatios();
+    EXPECT_GT(r.user + r.system + r.iowait, 0);
+  }
+  running.Store(false);
+  t.join();
+}
+
+// Tests the computation logic.
+TEST_F(SystemStateInfoTest, ComputeCpuRatios) {
+  info.ReadProcStatString("cpu  20 30 10 70 100 0 0 0 0 0");
+  info.ReadProcStatString("cpu  30 30 20 70 100 0 0 0 0 0");
+  info.ComputeCpuRatios();
+  const SystemStateInfo::CpuUsageRatios& r = info.GetCpuUsageRatios();
+  EXPECT_EQ(r.user, 5000);
+  EXPECT_EQ(r.system, 5000);
+  EXPECT_EQ(r.iowait, 0);
+}
+
+} // namespace impala
+
+IMPALA_TEST_MAIN();
diff --git a/be/src/util/system-state-info.cc b/be/src/util/system-state-info.cc
new file mode 100644
index 0000000..ea3803b
--- /dev/null
+++ b/be/src/util/system-state-info.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gutil/strings/split.h"
+#include "util/error-util.h"
+#include "util/string-parser.h"
+#include "util/system-state-info.h"
+
+#include <algorithm>
+#include <fstream>
+#include <iostream>
+
+#include "common/names.h"
+
+using std::accumulate;
+
+namespace impala {
+
+// Partially initializing cpu_ratios_ will default-initialize the remaining members.
+SystemStateInfo::SystemStateInfo() {
+  memset(&cpu_ratios_, 0, sizeof(cpu_ratios_));
+  ReadCurrentProcStat();
+}
+
+void SystemStateInfo::CaptureSystemStateSnapshot() {
+  // Capture and compute CPU usage
+  ReadCurrentProcStat();
+  ComputeCpuRatios();
+}
+
+int64_t SystemStateInfo::ParseInt64(const string& val) const {
+  StringParser::ParseResult result;
+  int64_t parsed = StringParser::StringToInt<int64_t>(val.c_str(), val.size(), &result);
+  if (result == StringParser::PARSE_SUCCESS) return parsed;
+  return -1;
+}
+
+void SystemStateInfo::ReadFirstLineFromFile(const char* path, string* out) const {
+  ifstream proc_file(path);
+  if (!proc_file.is_open()) {
+    LOG(WARNING) << "Could not open " << path << ": " << GetStrErrMsg() << endl;
+    return;
+  }
+  DCHECK(proc_file.is_open());
+  DCHECK(out != nullptr);
+  getline(proc_file, *out);
+}
+
+void SystemStateInfo::ReadCurrentProcStat() {
+  string line;
+  ReadFirstLineFromFile("/proc/stat", &line);
+  ReadProcStatString(line);
+}
+
+void SystemStateInfo::ReadProcStatString(const string& stat_string) {
+  stringstream ss(stat_string);
+
+  // Skip the first value ('cpu  ');
+  ss.ignore(5);
+
+  cur_val_idx_ = 1 - cur_val_idx_;
+  CpuValues& next_values = cpu_values_[cur_val_idx_];
+
+  for (int i = 0; i < NUM_CPU_VALUES; ++i) {
+    int64_t v = -1;
+    ss >> v;
+    DCHECK_GE(v, 0) << "Value " << i << ": " << v;
+    // Clamp invalid entries at 0
+    next_values[i] = max(v, 0L);
+  }
+}
+
+void SystemStateInfo::ComputeCpuRatios() {
+  const CpuValues& cur = cpu_values_[cur_val_idx_];
+  const CpuValues& old = cpu_values_[1 - cur_val_idx_];
+
+  // Sum up all counters
+  int64_t cur_sum = accumulate(cur.begin(), cur.end(), 0);
+  int64_t old_sum = accumulate(old.begin(), old.end(), 0);
+
+  int64_t total_tics = cur_sum - old_sum;
+  // If less than 1/USER_HZ time has time has passed for any of the counters, the ratio is
+  // zero (to avoid dividing by zero).
+  if (total_tics == 0) {
+    memset(&cpu_ratios_, 0, sizeof(cpu_ratios_));
+    return;
+  }
+  DCHECK_GT(total_tics, 0);
+  // Convert each ratio to basis points.
+  const int BASIS_MAX = 10000;
+  cpu_ratios_.user = ((cur[CPU_USER] - old[CPU_USER]) * BASIS_MAX) / total_tics;
+  cpu_ratios_.system = ((cur[CPU_SYSTEM] - old[CPU_SYSTEM]) * BASIS_MAX) / total_tics;
+  cpu_ratios_.iowait = ((cur[CPU_IOWAIT] - old[CPU_IOWAIT]) * BASIS_MAX) / total_tics;
+}
+
+} // namespace impala
diff --git a/be/src/util/system-state-info.h b/be/src/util/system-state-info.h
new file mode 100644
index 0000000..df2068f
--- /dev/null
+++ b/be/src/util/system-state-info.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <array>
+#include <cstdint>
+#include <string>
+
+#include <gtest/gtest_prod.h> // for FRIEND_TEST
+
+#include "common/names.h"
+namespace impala {
+
+/// Utility class to track and compute system resource consumption.
+///
+/// This class can be used to capture snapshots of various metrics of system resource
+/// consumption (e.g. CPU usage) and compute usage ratios and derived metrics between
+/// subsequent snapshots. Users of this class must call CaptureSystemStateSnapshot() and
+/// can then obtain various resource utilization metrics through getter methods (e.g.
+/// GetCpuUsageRatios()).
+class SystemStateInfo {
+ public:
+  SystemStateInfo();
+  /// Takes a snapshot of the current system resource usage and compute the usage ratios
+  /// for the time since the previous snapshot was taken.
+  void CaptureSystemStateSnapshot();
+
+  /// Ratios use basis points as their unit (1/100th of a percent, i.e. 0.0001).
+  struct CpuUsageRatios {
+    int32_t user;
+    int32_t system;
+    int32_t iowait;
+  };
+  /// Returns a struct containing the CPU usage ratios for the interval between the last
+  /// two calls to CaptureSystemStateSnapshot().
+  const CpuUsageRatios& GetCpuUsageRatios() { return cpu_ratios_; }
+
+ private:
+  int64_t ParseInt64(const std::string& val) const;
+  void ReadFirstLineFromFile(const char* path, std::string* out) const;
+
+  /// Rotates 'cur_val_idx_' and reads the current CPU usage values from /proc/stat into
+  /// the current set of values.
+  void ReadCurrentProcStat();
+
+  /// Rotates 'cur_val_idx_' and reads the CPU usage values from 'stat_string' into the
+  /// current set of values.
+  void ReadProcStatString(const string& stat_string);
+
+  /// Computes the CPU usage ratios for the interval between the last two calls to
+  /// CaptureSystemStateSnapshot() and stores the result in 'cpu_ratios_'.
+  void ComputeCpuRatios();
+
+  enum PROC_STAT_CPU_VALUES {
+    CPU_USER = 0,
+    CPU_NICE,
+    CPU_SYSTEM,
+    CPU_IDLE,
+    CPU_IOWAIT,
+    NUM_CPU_VALUES
+  };
+
+  /// We store the CPU usage values in an array so that we can iterate over them, e.g.
+  /// when reading them from a file or summing them up.
+  typedef std::array<int64_t, NUM_CPU_VALUES> CpuValues;
+  /// Two buffers to keep the current and previous set of CPU usage values.
+  CpuValues cpu_values_[2];
+  int cur_val_idx_ = 0;
+
+  /// The computed CPU usage ratio between the current and previous snapshots in
+  /// cpu_values_. Updated in ComputeCpuRatios().
+  CpuUsageRatios cpu_ratios_;
+
+  FRIEND_TEST(SystemStateInfoTest, ComputeCpuRatios);
+  FRIEND_TEST(SystemStateInfoTest, ParseProcStat);
+  FRIEND_TEST(SystemStateInfoTest, ReadProcStat);
+};
+
+} // namespace impala
diff --git a/bin/parse-thrift-profile.py b/bin/parse-thrift-profile.py
index 5f8485f..55cf1f2 100755
--- a/bin/parse-thrift-profile.py
+++ b/bin/parse-thrift-profile.py
@@ -39,14 +39,8 @@
 # 2018-04-13T15:06:34.144000 e44af7f93edb8cd6:1b1f801600000000 TRuntimeProfileTree(nodes=[TRuntimeProf...
 
 
-from thrift.protocol import TCompactProtocol
-from thrift.TSerialization import deserialize
-from RuntimeProfile.ttypes import TRuntimeProfileTree
-
-import base64
-import datetime
+from impala_py_lib import profiles
 import sys
-import zlib
 
 if len(sys.argv) == 1 or sys.argv[1] == "-":
   input_data = sys.stdin
@@ -57,23 +51,5 @@ else:
   sys.exit(1)
 
 for line in input_data:
-  space_separated = line.split(" ")
-  if len(space_separated) == 3:
-    ts = int(space_separated[0])
-    print datetime.datetime.fromtimestamp(ts/1000.0).isoformat(), space_separated[1],
-    base64_encoded = space_separated[2]
-  elif len(space_separated) == 1:
-    base64_encoded = space_separated[0]
-  else:
-    raise Exception("Unexpected line: " + line)
-  possibly_compressed = base64.b64decode(base64_encoded)
-  # Handle both compressed and uncompressed Thrift profiles
-  try:
-    thrift = zlib.decompress(possibly_compressed)
-  except zlib.error:
-    thrift = possibly_compressed
-
-  tree = TRuntimeProfileTree()
-  deserialize(tree, thrift, protocol_factory=TCompactProtocol.TCompactProtocolFactory())
-  tree.validate()
+  tree = profiles.decode_profile_line(line)
   print tree
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index a190d83..ee8f369 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -311,6 +311,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   74: optional string client_identifier;
+
+  // See comment in ImpalaService.thrift
+  75: optional double resource_trace_ratio = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
@@ -453,6 +456,10 @@ struct TQueryCtx {
   // stats and key column predicate selectivity. Generally only disabled
   // for testing.
   20: optional bool disable_hbase_row_est = false;
+
+  // Flag to enable tracing of resource usage consumption for all fragment instances of a
+  // query. Set in ImpalaServer::PrepareQueryContext().
+  21: required bool trace_resource_usage = false
 }
 
 // Specification of one output destination of a plan fragment
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 2986cd9..bb3b0af 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -350,7 +350,12 @@ enum TImpalaQueryOptions {
   // An opaque string, not used by Impala itself, that can be used to identify
   // the client, like a User-Agent in HTTP. Drivers should set this to
   // their version number. May also be used by tests to help identify queries.
-  CLIENT_IDENTIFIER
+  CLIENT_IDENTIFIER,
+
+  // Probability to enable tracing of resource usage consumption on all fragment instance
+  // executors of a query. Must be between 0 and 1 inclusive, 0 means no query will be
+  // traced, 1 means all queries will be traced.
+  RESOURCE_TRACE_RATIO,
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Metrics.thrift b/common/thrift/Metrics.thrift
index 4f2c7f2..f97eda0 100644
--- a/common/thrift/Metrics.thrift
+++ b/common/thrift/Metrics.thrift
@@ -29,6 +29,9 @@ enum TUnit {
   BYTES_PER_SECOND,
   TIME_NS,
   DOUBLE_VALUE,
+  // 100th of a percent, used to express ratios etc., range from 0 to 10000, pretty
+  // printed as integer percentages from 0 to 100.
+  BASIS_POINTS,
   // No units at all, may not be a numerical quantity
   NONE,
   TIME_MS,
diff --git a/common/thrift/RuntimeProfile.thrift b/common/thrift/RuntimeProfile.thrift
index f131564..d47afa6 100644
--- a/common/thrift/RuntimeProfile.thrift
+++ b/common/thrift/RuntimeProfile.thrift
@@ -65,6 +65,12 @@ struct TTimeSeriesCounter {
 
   // The sampled values.
   4: required list<i64> values
+
+  // The index of the first value in this series (this is equal to the total number of
+  // values contained in previous updates for this counter). Values > 0 mean that this
+  // series contains an interval of a larger series. For values > 0, period_ms should be
+  // ignored, as chunked counters don't resample their values.
+  5: optional i64 start_index
 }
 
 // Thrift version of RuntimeProfile::SummaryStatsCounter.
@@ -133,4 +139,5 @@ struct TRuntimeProfileTree {
 // A list of TRuntimeProfileTree structures.
 struct TRuntimeProfileForest {
   1: required list<TRuntimeProfileTree> profile_trees
+  2: optional TRuntimeProfileTree host_profile
 }
diff --git a/bin/parse-thrift-profile.py b/lib/python/impala_py_lib/profiles.py
old mode 100755
new mode 100644
similarity index 58%
copy from bin/parse-thrift-profile.py
copy to lib/python/impala_py_lib/profiles.py
index 5f8485f..a2f6aeb
--- a/bin/parse-thrift-profile.py
+++ b/lib/python/impala_py_lib/profiles.py
@@ -1,5 +1,4 @@
 #!/usr/bin/env impala-python
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,51 +15,22 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-# Parses a base64-encoded profile provided via stdin. It accepts
-# three common formats:
-#
-# 1. Impala profile logs of the format
-#    "<ts> <queryid> <base64encoded, compressed thrift profile>"
-# 2. Just the base64-encoded compressed thrift profile
-# 3. Base-64 encoded uncompressed thrift profile.
-#
-# In all cases, the script expects one profile per line.
-#
-# For example:
-#
-# $ cat logs/cluster_test/custom_cluster_tests/profiles/impala_profile_log \
-#      | head -n 1 | awk '{ print $3 }' | parse-profile.py
-# TRuntimeProfileTree(nodes=[TRuntimeProfileNode(info_strings_display_order=....
-#
-# or
-#
-# $ bin/parse-thrift-profile.py logs/custom_cluster_tests/profiles/impala_profile_log_1.1-1523657191158
-# 2018-04-13T15:06:34.144000 e44af7f93edb8cd6:1b1f801600000000 TRuntimeProfileTree(nodes=[TRuntimeProf...
 
-
-from thrift.protocol import TCompactProtocol
-from thrift.TSerialization import deserialize
-from RuntimeProfile.ttypes import TRuntimeProfileTree
+# This file contains library functions to decode and access Impala query profiles.
 
 import base64
 import datetime
-import sys
 import zlib
+from thrift.protocol import TCompactProtocol
+from thrift.TSerialization import deserialize
+from RuntimeProfile.ttypes import TRuntimeProfileTree
 
-if len(sys.argv) == 1 or sys.argv[1] == "-":
-  input_data = sys.stdin
-elif len(sys.argv) == 2:
-  input_data = file(sys.argv[1])
-else:
-  print >> sys.stderr, "Usage: %s [file]" % (sys.argv[0],)
-  sys.exit(1)
 
-for line in input_data:
+def decode_profile_line(line):
   space_separated = line.split(" ")
   if len(space_separated) == 3:
     ts = int(space_separated[0])
-    print datetime.datetime.fromtimestamp(ts/1000.0).isoformat(), space_separated[1],
+    print datetime.datetime.fromtimestamp(ts / 1000.0).isoformat(), space_separated[1]
     base64_encoded = space_separated[2]
   elif len(space_separated) == 1:
     base64_encoded = space_separated[0]
@@ -76,4 +46,5 @@ for line in input_data:
   tree = TRuntimeProfileTree()
   deserialize(tree, thrift, protocol_factory=TCompactProtocol.TCompactProtocolFactory())
   tree.validate()
-  print tree
+
+  return tree
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index 4056011..08f0c72 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -63,6 +63,7 @@ class ImpalaBeeswaxException(Exception):
 class ImpalaBeeswaxResult(object):
   def __init__(self, **kwargs):
     self.query = kwargs.get('query', None)
+    self.query_id = kwargs['query_id']
     self.success = kwargs.get('success', False)
     # Insert returns an int, convert into list to have a uniform data type.
     # TODO: We should revisit this if we have more datatypes to deal with.
@@ -435,7 +436,8 @@ class ImpalaBeeswaxClient(object):
     if query_type == 'use':
       # TODO: "use <database>" does not currently throw an error. Need to update this
       # to handle the error case once that behavior has been changed.
-      return ImpalaBeeswaxResult(query=query_string, success=True, data=[])
+      return ImpalaBeeswaxResult(query=query_string, query_id=query_handle.id,
+                                 success=True, data=[])
 
     # Result fetching for insert is different from other queries.
     exec_result = None
@@ -459,7 +461,8 @@ class ImpalaBeeswaxClient(object):
         break
 
     # The query executed successfully and all the data was fetched.
-    exec_result = ImpalaBeeswaxResult(success=True, data=result_rows, schema=schema)
+    exec_result = ImpalaBeeswaxResult(query_id=handle.id, success=True, data=result_rows,
+                                      schema=schema)
     exec_result.summary = 'Returned %d rows' % (len(result_rows))
     return exec_result
 
@@ -469,7 +472,7 @@ class ImpalaBeeswaxClient(object):
     # The insert was successful
     num_rows = sum(map(int, result.rows_modified.values()))
     data = ["%s: %s" % row for row in result.rows_modified.iteritems()]
-    exec_result = ImpalaBeeswaxResult(success=True, data=data)
+    exec_result = ImpalaBeeswaxResult(query_id=handle.id, success=True, data=data)
     exec_result.summary = "Inserted %d rows" % (num_rows,)
     return exec_result
 
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 9c85e2e..d25170f 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -56,7 +56,7 @@ class TestObservability(ImpalaTestSuite):
 
   def test_broadcast_num_rows(self):
     """Regression test for IMPALA-3002 - checks that the num_rows for a broadcast node
-    in the exec summaty is correctly set as the max over all instances, not the sum."""
+    in the exec summary is correctly set as the max over all instances, not the sum."""
     query = """select distinct a.int_col, a.string_col from functional.alltypes a
         inner join functional.alltypessmall b on (a.id = b.id)
         where a.year = 2009 and b.month = 2"""
@@ -424,58 +424,94 @@ class TestObservability(ImpalaTestSuite):
     assert counters["TotalBytesSent"] == (counters["TotalScanBytesSent"] +
                                           counters["TotalInnerBytesSent"])
 
+  def test_query_profile_contains_host_resource_usage(self):
+    """Tests that the profile contains a sub-profile with per node resource usage."""
+    result = self.execute_query("select count(*), sleep(1000) from functional.alltypes")
+    profile = result.runtime_profile
+    expected_str = "Per Node Profiles:"
+    assert any(expected_str in line for line in profile.splitlines())
+
+  def test_query_profile_host_cpu_usage_off(self):
+    """Tests that the query profile does not contain CPU metrics by default or when
+    disabled explicitly."""
+    query = "select count(*), sleep(1000) from functional.alltypes"
+    for query_opts in [None, {'resource_trace_ratio': 0.0}]:
+      profile = self.execute_query(query, query_opts).runtime_profile
+      # Assert that no CPU counters exist in the profile
+      for line in profile.splitlines():
+        assert not re.search("HostCpu.*Percentage", line)
+
+  def test_query_profile_contains_host_cpu_usage(self):
+    """Tests that the query profile contains various CPU metrics."""
+    query_opts = {'resource_trace_ratio': 1.0}
+    query = "select count(*), sleep(1000) from functional.alltypes"
+    profile = self.execute_query(query, query_opts).runtime_profile
+    # We check for 500ms because a query with 1s duration won't hit the 64 values limit.
+    expected_strs = ["HostCpuIoWaitPercentage (500.000ms):",
+                     "HostCpuSysPercentage (500.000ms):",
+                     "HostCpuUserPercentage (500.000ms):"]
+
+    # Assert that all expected counters exist in the profile.
+    for expected_str in expected_strs:
+      assert any(expected_str in line for line in profile.splitlines()), expected_str
+
+    # Check that there are some values for each counter.
+    for line in profile.splitlines():
+      if not any(key in line for key in expected_strs):
+        continue
+      values = line.split(':')[1].strip().split(',')
+      assert len(values) > 0
+
+  def _find_ts_counters_in_thrift_profile(self, profile, name):
+    """Finds all time series counters in 'profile' with a matching name."""
+    counters = []
+    for node in profile.nodes:
+      for counter in node.time_series_counters or []:
+        if counter.name == name:
+          counters.append(counter)
+    return counters
+
+  def _get_thrift_profile(self, query_id, timeout=MAX_THRIFT_PROFILE_WAIT_TIME_S):
+    """Downloads a thrift profile and asserts that a profile was retrieved within the
+       specified timeout. If you see unexpected timeouts, try running the calling test
+       serially."""
+    thrift_profile = self.impalad_test_service.get_thrift_profile(query_id,
+                                                                  timeout=timeout)
+    assert thrift_profile, "Debug thrift profile for query {0} not available in {1} \
+        seconds".format(query_id, timeout)
+    return thrift_profile
 
-class TestThriftProfile(ImpalaTestSuite):
-  @classmethod
-  def get_workload(self):
-    return 'functional-query'
+  @pytest.mark.execute_serially
+  def test_thrift_profile_contains_cpu_usage(self):
+    """Tests that the thrift profile contains a time series counter for CPU resource
+       usage."""
+    query_opts = {'resource_trace_ratio': 1.0}
+    result = self.execute_query("select sleep(2000)", query_opts)
+    thrift_profile = self._get_thrift_profile(result.query_id)
+
+    cpu_key = "HostCpuUserPercentage"
+    cpu_counters = self._find_ts_counters_in_thrift_profile(thrift_profile, cpu_key)
+    # The query will run on a single node, we will only find the counter once.
+    assert len(cpu_counters) == 1
+    cpu_counter = cpu_counters[0]
+    assert len(cpu_counter.values) > 0
 
-  # IMPALA-6399: Run this test serially to avoid a delay over the wait time in fetching
-  # the profile.
-  # This test needs to call self.client.close() to force computation of query end time,
-  # so it has to be in its own suite (IMPALA-6498).
   @pytest.mark.execute_serially
   def test_query_profile_thrift_timestamps(self):
     """Test that the query profile start and end time date-time strings have
     nanosecond precision. Nanosecond precision is expected by management API clients
     that consume Impala debug webpages."""
     query = "select sleep(5)"
-    handle = self.client.execute_async(query)
-    query_id = handle.get_handle().id
-    results = self.client.fetch(query, handle)
-    self.client.close()
-
-    start = time()
-    end = start + MAX_THRIFT_PROFILE_WAIT_TIME_S
-    while time() <= end:
-      # Sleep before trying to fetch the profile. This helps to prevent a warning when the
-      # profile is not yet available immediately. It also makes it less likely to
-      # introduce an error below in future changes by forgetting to sleep.
-      sleep(1)
-      tree = self.impalad_test_service.get_thrift_profile(query_id)
-      if not tree:
-        continue
-
-      # tree.nodes[1] corresponds to ClientRequestState::summary_profile_
-      # See be/src/service/client-request-state.[h|cc].
-      start_time = tree.nodes[1].info_strings["Start Time"]
-      end_time = tree.nodes[1].info_strings["End Time"]
-      # Start and End Times are of the form "2017-12-07 22:26:52.167711000"
-      start_time_sub_sec_str = start_time.split('.')[-1]
-      end_time_sub_sec_str = end_time.split('.')[-1]
-      if len(end_time_sub_sec_str) == 0:
-        elapsed = time() - start
-        logging.info("end_time_sub_sec_str hasn't shown up yet, elapsed=%d", elapsed)
-        continue
-
-      assert len(end_time_sub_sec_str) == 9, end_time
-      assert len(start_time_sub_sec_str) == 9, start_time
-      return True
-
-    # If we're here, we didn't get the final thrift profile from the debug web page.
-    # This could happen due to heavy system load. The test is then inconclusive.
-    # Log a message and fail this run.
+    result = self.execute_query(query)
 
-    dbg_str = "Debug thrift profile for query {0} not available in {1} seconds".format(
-        query_id, MAX_THRIFT_PROFILE_WAIT_TIME_S)
-    assert False, dbg_str
+    tree = self._get_thrift_profile(result.query_id)
+    # tree.nodes[1] corresponds to ClientRequestState::summary_profile_
+    # See be/src/service/client-request-state.[h|cc].
+    start_time = tree.nodes[1].info_strings["Start Time"]
+    end_time = tree.nodes[1].info_strings["End Time"]
+    # Start and End Times are of the form "2017-12-07 22:26:52.167711000"
+    start_time_sub_sec_str = start_time.split('.')[-1]
+    end_time_sub_sec_str = end_time.split('.')[-1]
+
+    assert len(end_time_sub_sec_str) == 9, end_time
+    assert len(start_time_sub_sec_str) == 9, start_time


[impala] 07/08: IMPALA-8151: Use sizeof() in HiveUdfCall to specify non-primitive type's size

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ae96a9fb19e0a2e0a5529f2f36d3b5ee0d336f69
Author: poojanilangekar <po...@cloudera.com>
AuthorDate: Mon Feb 4 14:19:44 2019 -0800

    IMPALA-8151: Use sizeof() in HiveUdfCall to specify non-primitive type's size
    
    Previously, data type sizes were hardcoded in
    HiveUdfCall::Evaluate(). Since IMPALA-7367 removed the padding
    from STRING and VARCHAR types, it could read past the end of the
    actual value and cause a crash. This change replaces the hardcoded
    values with  sizeof() calls to determine the size of non-primitive
    types (STRING, VARCHAR and TIMESTAMP) to avoid similar issues in
    the future.
    
    Testing:
    Ran test_udfs.py on an ASAN build.
    Added logs to manually verify the size of bytes copied.
    
    Change-Id: I919c330546fa86b474ab66245b20ceb1f5525b41
    Reviewed-on: http://gerrit.cloudera.org:8080/12355
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exprs/hive-udf-call.cc | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index 22683ca..3d24671 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -117,9 +117,11 @@ AnyVal* HiveUdfCall::Evaluate(ScalarExprEvaluator* eval, const TupleRow* row) co
           memcpy(input_ptr, v, 8);
           break;
         case TYPE_TIMESTAMP:
+          memcpy(input_ptr, v, sizeof(TimestampValue));
+          break;
         case TYPE_STRING:
         case TYPE_VARCHAR:
-          memcpy(input_ptr, v, 16);
+          memcpy(input_ptr, v, sizeof(StringValue));
           break;
         default:
           DCHECK(false) << "NYI";


[impala] 03/08: IMPALA-8091 addendum: use absolute path for ntp-wait

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 72b845279edabfc536a331d6a34eafd2160c2097
Author: Michael Brown <mi...@cloudera.com>
AuthorDate: Thu Jan 31 09:52:55 2019 -0800

    IMPALA-8091 addendum: use absolute path for ntp-wait
    
    While ntp-wait is in the default $PATH on Ubuntu, it's not on CentOS.
    Use the absolute path of the binary (same in both flavors).
    
    Testing:
    - Ran privately on CentOS both with and without ntp-wait installed; GVO
      will cover Ubuntu.
    
    Change-Id: I5d709d121a71e62b8ee4027db81b53108f389fdd
    Reviewed-on: http://gerrit.cloudera.org:8080/12369
    Reviewed-by: Philip Zeyliger <ph...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 testdata/cluster/admin | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/testdata/cluster/admin b/testdata/cluster/admin
index 22e7ab3..a407d2c 100755
--- a/testdata/cluster/admin
+++ b/testdata/cluster/admin
@@ -335,9 +335,11 @@ function start_cluster {
         NTP_CANARY=pool.ntp.org
         if ! ping -c 1 -w 5 "${NTP_CANARY}" >/dev/null 2>/dev/null; then
           echo "WARNING: cannot reach ${NTP_CANARY}; ntp sync recommended for Kudu"
-        elif ! ntp-wait --help >/dev/null 2>/dev/null; then
+        # ntp-wait is in /usr/sbin on both CentOS and Debian-based systems, but regular
+        # users don't always have /usr/sbin in $PATH. Use the absolute path.
+        elif ! /usr/sbin/ntp-wait --help >/dev/null 2>/dev/null; then
           echo "WARNING: ntp-wait not installed; ntp sync recommended for Kudu"
-        elif ! ntp-wait -v; then
+        elif ! /usr/sbin/ntp-wait -v; then
           echo "ntp-wait failed; cannot start kudu"
           return 1
         fi